diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java index d588920276..5be30fdda9 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java @@ -18,6 +18,7 @@ import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.util.LongAccumulator; +import org.mortbay.log.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +31,8 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.orcid.AuthorData; +import eu.dnetlib.dhp.schema.orcid.AuthorSummary; +import eu.dnetlib.dhp.schema.orcid.Work; import eu.dnetlib.dhp.schema.orcid.WorkDetail; import eu.dnetlib.doiboost.orcid.json.JsonHelper; import eu.dnetlib.doiboost.orcidnodoi.oaf.PublicationToOaf; @@ -61,8 +64,6 @@ public class SparkGenEnrichedOrcidWorks { .orElse(Boolean.TRUE); final String workingPath = parser.get("workingPath"); final String outputEnrichedWorksPath = parser.get("outputEnrichedWorksPath"); - final String outputWorksPath = parser.get("outputWorksPath"); - final String hdfsServerUri = parser.get("hdfsServerUri"); SparkConf conf = new SparkConf(); runWithSparkSession( @@ -71,26 +72,39 @@ public class SparkGenEnrichedOrcidWorks { spark -> { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaPairRDD summariesRDD = sc - .sequenceFile(workingPath + "authors/authors.seq", Text.class, Text.class); - Dataset summariesDataset = spark + Dataset authorDataset = spark .createDataset( - summariesRDD.map(seq -> loadAuthorFromJson(seq._1(), seq._2())).rdd(), + sc + .textFile(workingPath.concat("last_orcid_dataset/authors/*")) + .map(item -> OBJECT_MAPPER.readValue(item, AuthorSummary.class)) + .filter(authorSummary -> authorSummary.getAuthorData() != null) + .map(authorSummary -> authorSummary.getAuthorData()) + .rdd(), Encoders.bean(AuthorData.class)); - logger.info("Authors data loaded: " + summariesDataset.count()); + logger.info("Authors data loaded: " + authorDataset.count()); - JavaPairRDD activitiesRDD = sc - .sequenceFile(workingPath + outputWorksPath + "*.seq", Text.class, Text.class); - Dataset activitiesDataset = spark + Dataset workDataset = spark .createDataset( - activitiesRDD.map(seq -> loadWorkFromJson(seq._1(), seq._2())).rdd(), + sc + .textFile(workingPath.concat("last_orcid_dataset/works/*")) + .map(item -> OBJECT_MAPPER.readValue(item, Work.class)) + .filter(work -> work.getWorkDetail() != null) + .map(work -> work.getWorkDetail()) + .filter(work -> work.getErrorCode() == null) + .filter( + work -> work + .getExtIds() + .stream() + .filter(e -> e.getType() != null) + .noneMatch(e -> e.getType().equalsIgnoreCase("doi"))) + .rdd(), Encoders.bean(WorkDetail.class)); - logger.info("Works data loaded: " + activitiesDataset.count()); + logger.info("Works data loaded: " + workDataset.count()); - JavaRDD> enrichedWorksRDD = activitiesDataset + JavaRDD> enrichedWorksRDD = workDataset .joinWith( - summariesDataset, - activitiesDataset.col("oid").equalTo(summariesDataset.col("oid")), "inner") + authorDataset, + workDataset.col("oid").equalTo(authorDataset.col("oid")), "inner") .map( (MapFunction, Tuple2>) value -> { WorkDetail w = value._1; @@ -150,31 +164,4 @@ public class SparkGenEnrichedOrcidWorks { logger.info("errorsInvalidType: " + errorsInvalidType.value().toString()); }); } - - private static AuthorData loadAuthorFromJson(Text orcidId, Text json) { - AuthorData authorData = new AuthorData(); - authorData.setOid(orcidId.toString()); - JsonElement jElement = new JsonParser().parse(json.toString()); - authorData.setName(getJsonValue(jElement, "name")); - authorData.setSurname(getJsonValue(jElement, "surname")); - authorData.setCreditName(getJsonValue(jElement, "creditname")); - return authorData; - } - - private static WorkDetail loadWorkFromJson(Text orcidId, Text json) { - - WorkDetail workData = new Gson().fromJson(json.toString(), WorkDetail.class); - return workData; - } - - private static String getJsonValue(JsonElement jElement, String property) { - if (jElement.getAsJsonObject().has(property)) { - JsonElement name = null; - name = jElement.getAsJsonObject().get(property); - if (name != null && !name.isJsonNull()) { - return name.getAsString(); - } - } - return new String(""); - } } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcidnodoi/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcidnodoi/oozie_app/workflow.xml index 6cec48a6d3..610b7cc50a 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcidnodoi/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcidnodoi/oozie_app/workflow.xml @@ -1,17 +1,18 @@ + + spark2GenNoDoiDatasetMaxExecutors + 40 + sparkDriverMemory memory for driver process - sparkExecutorMemory + spark2GenNoDoiDatasetExecutorMemory + 2G memory for individual executor - - sparkExecutorCores - number of cores used by single executor - oozieActionShareLibForSpark2 oozie action sharelib for spark 2.* @@ -73,8 +74,9 @@ eu.dnetlib.doiboost.orcidnodoi.SparkGenEnrichedOrcidWorks dhp-doiboost-${projectVersion}.jar - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} + --conf spark.dynamicAllocation.enabled=true + --conf spark.dynamicAllocation.maxExecutors=${spark2GenNoDoiDatasetMaxExecutors} + --executor-memory=${spark2GenNoDoiDatasetExecutorMemory} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}