From 934ad570e07b7fe2de17b4aacc009171340e6180 Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Tue, 19 May 2020 12:57:21 +0200 Subject: [PATCH] joined summaries and activities dataset --- .../orcid/SparkGenerateDoiAuthorList.java | 34 +++++++++++++------ .../oozie_app/workflow.xml | 4 +-- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenerateDoiAuthorList.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenerateDoiAuthorList.java index 5c609cb09..2f476a73f 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenerateDoiAuthorList.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenerateDoiAuthorList.java @@ -2,6 +2,8 @@ package eu.dnetlib.doiboost.orcid; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.collect_list; import java.io.IOException; import java.text.SimpleDateFormat; @@ -22,7 +24,9 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.RelationalGroupedDataset; import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.TypedColumn; import org.apache.spark.util.LongAccumulator; import org.mortbay.log.Log; import org.slf4j.Logger; @@ -60,7 +64,6 @@ public class SparkGenerateDoiAuthorList { logger.info("workingPath: ", workingPath); final String outputDoiAuthorListPath = parser.get("outputDoiAuthorListPath"); logger.info("outputDoiAuthorListPath: ", outputDoiAuthorListPath); - SparkConf conf = new SparkConf(); runWithSparkSession( @@ -69,20 +72,31 @@ public class SparkGenerateDoiAuthorList { spark -> { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaPairRDD summariesRDD = sc.sequenceFile(workingPath + "../orcid_summaries/output/authors.seq", Text.class, Text.class); + JavaPairRDD summariesRDD = sc + .sequenceFile(workingPath + "../orcid_summaries/output/authors.seq", Text.class, Text.class); Dataset summariesDataset = spark - .createDataset(summariesRDD.map(seq -> loadAuthorFromJson(seq._1(), seq._2())).rdd(), + .createDataset( + summariesRDD.map(seq -> loadAuthorFromJson(seq._1(), seq._2())).rdd(), Encoders.bean(AuthorData.class)); - - JavaPairRDD activitiesRDD = sc.sequenceFile(workingPath + "/output/*.seq", Text.class, Text.class); + + JavaPairRDD activitiesRDD = sc + .sequenceFile(workingPath + "/output/*.seq", Text.class, Text.class); Dataset activitiesDataset = spark - .createDataset(activitiesRDD.map(seq -> loadWorkFromJson(seq._1(), seq._2())).rdd(), - Encoders.bean(WorkData.class)); + .createDataset( + activitiesRDD.map(seq -> loadWorkFromJson(seq._1(), seq._2())).rdd(), + Encoders.bean(WorkData.class)); + + RelationalGroupedDataset group = activitiesDataset + .where("oid='0000-0002-9710-779X'") + .joinWith( + summariesDataset, + activitiesDataset.col("oid").equalTo(summariesDataset.col("oid")), "inner") + .groupBy(col("doi")); }); } - + private static AuthorData loadAuthorFromJson(Text orcidId, Text json) { AuthorData authorData = new AuthorData(); authorData.setOid(orcidId.toString()); @@ -92,7 +106,7 @@ public class SparkGenerateDoiAuthorList { authorData.setCreditName(getJsonValue(jElement, "creditname")); return authorData; } - + private static WorkData loadWorkFromJson(Text orcidId, Text json) { WorkData workData = new WorkData(); workData.setOid(orcidId.toString()); @@ -100,7 +114,7 @@ public class SparkGenerateDoiAuthorList { workData.setDoi(getJsonValue(jElement, "doi")); return workData; } - + private static String getJsonValue(JsonElement jElement, String property) { if (jElement.getAsJsonObject().has(property)) { JsonElement name = null; diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/workflow.xml index df3a586a8..ed4f92047 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + workingPath @@ -42,7 +42,7 @@ Gen_Doi_Author_List eu.dnetlib.doiboost.orcid.SparkGenerateDoiAuthorList dhp-doiboost-1.2.1-SNAPSHOT.jar - --num-executors 1 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --num-executors 5 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} -w${workingPath}/ -odoi_author_list/