dateOfCollection taken from orcid last_update.txt on hdfs; cleaned wf parameters

This commit is contained in:
Enrico Ottonello 2021-02-25 18:43:29 +01:00
parent d43ea88caf
commit 53d7023460
4 changed files with 21 additions and 10 deletions

View File

@ -8,7 +8,9 @@ import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import eu.dnetlib.doiboost.orcid.util.HDFSUtil;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -57,26 +59,33 @@ public class SparkGenEnrichedOrcidWorks {
.toString( .toString(
SparkGenEnrichedOrcidWorks.class SparkGenEnrichedOrcidWorks.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works_parameters.json"))); "/eu/dnetlib/dhp/doiboost/gen_orcid-no-doi_params.json")));
parser.parseArgument(args); parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged")) .ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf) .map(Boolean::valueOf)
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
final String hdfsServerUri = parser.get("hdfsServerUri");
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
final String outputEnrichedWorksPath = parser.get("outputEnrichedWorksPath"); final String outputEnrichedWorksPath = parser.get("outputEnrichedWorksPath");
final String orcidDataFolder = parser.get("orcidDataFolder");
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
String lastUpdate = HDFSUtil.readFromTextFile(hdfsServerUri, workingPath, "last_update.txt");
if (StringUtils.isBlank(lastUpdate)) {
throw new RuntimeException("last update info not found");
}
final String dateOfCollection = lastUpdate.substring(0, 10);
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Dataset<AuthorData> authorDataset = spark Dataset<AuthorData> authorDataset = spark
.createDataset( .createDataset(
sc sc
.textFile(workingPath.concat("last_orcid_dataset/authors/*")) .textFile(workingPath.concat(orcidDataFolder).concat("/authors/*"))
.map(item -> OBJECT_MAPPER.readValue(item, AuthorSummary.class)) .map(item -> OBJECT_MAPPER.readValue(item, AuthorSummary.class))
.filter(authorSummary -> authorSummary.getAuthorData() != null) .filter(authorSummary -> authorSummary.getAuthorData() != null)
.map(authorSummary -> authorSummary.getAuthorData()) .map(authorSummary -> authorSummary.getAuthorData())
@ -87,7 +96,7 @@ public class SparkGenEnrichedOrcidWorks {
Dataset<WorkDetail> workDataset = spark Dataset<WorkDetail> workDataset = spark
.createDataset( .createDataset(
sc sc
.textFile(workingPath.concat("last_orcid_dataset/works/*")) .textFile(workingPath.concat(orcidDataFolder).concat("/works/*"))
.map(item -> OBJECT_MAPPER.readValue(item, Work.class)) .map(item -> OBJECT_MAPPER.readValue(item, Work.class))
.filter(work -> work.getWorkDetail() != null) .filter(work -> work.getWorkDetail() != null)
.map(work -> work.getWorkDetail()) .map(work -> work.getWorkDetail())
@ -134,7 +143,8 @@ public class SparkGenEnrichedOrcidWorks {
errorsGeneric, errorsGeneric,
errorsInvalidTitle, errorsInvalidTitle,
errorsNotFoundAuthors, errorsNotFoundAuthors,
errorsInvalidType); errorsInvalidType,
dateOfCollection);
JavaRDD<Publication> oafPublicationRDD = enrichedWorksRDD JavaRDD<Publication> oafPublicationRDD = enrichedWorksRDD
.map( .map(
e -> { e -> {

View File

@ -36,6 +36,7 @@ public class PublicationToOaf implements Serializable {
public static final String OPENAIRE_PREFIX = "openaire____"; public static final String OPENAIRE_PREFIX = "openaire____";
public static final String SEPARATOR = "::"; public static final String SEPARATOR = "::";
private String dateOfCollection = "";
private final LongAccumulator parsedPublications; private final LongAccumulator parsedPublications;
private final LongAccumulator enrichedPublications; private final LongAccumulator enrichedPublications;
private final LongAccumulator errorsGeneric; private final LongAccumulator errorsGeneric;
@ -49,13 +50,15 @@ public class PublicationToOaf implements Serializable {
LongAccumulator errorsGeneric, LongAccumulator errorsGeneric,
LongAccumulator errorsInvalidTitle, LongAccumulator errorsInvalidTitle,
LongAccumulator errorsNotFoundAuthors, LongAccumulator errorsNotFoundAuthors,
LongAccumulator errorsInvalidType) { LongAccumulator errorsInvalidType,
String dateOfCollection) {
this.parsedPublications = parsedPublications; this.parsedPublications = parsedPublications;
this.enrichedPublications = enrichedPublications; this.enrichedPublications = enrichedPublications;
this.errorsGeneric = errorsGeneric; this.errorsGeneric = errorsGeneric;
this.errorsInvalidTitle = errorsInvalidTitle; this.errorsInvalidTitle = errorsInvalidTitle;
this.errorsNotFoundAuthors = errorsNotFoundAuthors; this.errorsNotFoundAuthors = errorsNotFoundAuthors;
this.errorsInvalidType = errorsInvalidType; this.errorsInvalidType = errorsInvalidType;
this.dateOfCollection = dateOfCollection;
} }
public PublicationToOaf() { public PublicationToOaf() {
@ -137,7 +140,7 @@ public class PublicationToOaf implements Serializable {
publication.setLastupdatetimestamp(new Date().getTime()); publication.setLastupdatetimestamp(new Date().getTime());
publication.setDateofcollection("2020-10-14"); publication.setDateofcollection(dateOfCollection);
publication.setDateoftransformation(DumpToActionsUtility.now_ISO8601()); publication.setDateoftransformation(DumpToActionsUtility.now_ISO8601());
// Adding external ids // Adding external ids

View File

@ -1,7 +1,6 @@
[ [
{"paramName":"n", "paramLongName":"hdfsServerUri", "paramDescription": "the server uri", "paramRequired": true}, {"paramName":"n", "paramLongName":"hdfsServerUri", "paramDescription": "the server uri", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the default work path", "paramRequired": true}, {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the default work path", "paramRequired": true},
{"paramName":"f", "paramLongName":"activitiesFileNameTarGz", "paramDescription": "the name of the activities orcid file", "paramRequired": true}, {"paramName":"i", "paramLongName":"orcidDataFolder", "paramDescription": "the folder of orcid data", "paramRequired": true},
{"paramName":"ow", "paramLongName":"outputWorksPath", "paramDescription": "the relative folder of the sequencial file to write", "paramRequired": true},
{"paramName":"oew", "paramLongName":"outputEnrichedWorksPath", "paramDescription": "the relative folder of the sequencial file to write the data", "paramRequired": true} {"paramName":"oew", "paramLongName":"outputEnrichedWorksPath", "paramDescription": "the relative folder of the sequencial file to write the data", "paramRequired": true}
] ]

View File

@ -85,8 +85,7 @@
</spark-opts> </spark-opts>
<arg>-w</arg><arg>${workingPath}/</arg> <arg>-w</arg><arg>${workingPath}/</arg>
<arg>-n</arg><arg>${nameNode}</arg> <arg>-n</arg><arg>${nameNode}</arg>
<arg>-f</arg><arg>-</arg> <arg>-i</arg><arg>last_orcid_dataset</arg>
<arg>-ow</arg><arg>no_doi_works/</arg>
<arg>-oew</arg><arg>no_doi_dataset</arg> <arg>-oew</arg><arg>no_doi_dataset</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>