diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenerateDoiAuthorList.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenerateDoiAuthorList.java new file mode 100644 index 0000000000..5c609cb097 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkGenerateDoiAuthorList.java @@ -0,0 +1,114 @@ + +package eu.dnetlib.doiboost.orcid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.util.LongAccumulator; +import org.mortbay.log.Log; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation; +import eu.dnetlib.doiboost.orcid.model.AuthorData; +import eu.dnetlib.doiboost.orcid.model.DownloadedRecordData; +import eu.dnetlib.doiboost.orcid.model.WorkData; +import scala.Tuple2; + +public class SparkGenerateDoiAuthorList { + + public static void main(String[] args) throws IOException, Exception { + Logger logger = LoggerFactory.getLogger(SparkGenerateDoiAuthorList.class); + logger.info("[ SparkGenerateDoiAuthorList STARTED]"); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkGenerateDoiAuthorList.class + .getResourceAsStream( + "/eu/dnetlib/dhp/doiboost/gen_doi_author_list_orcid_parameters.json"))); + parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + logger.info("isSparkSessionManaged: {}", isSparkSessionManaged); + final String workingPath = parser.get("workingPath"); + logger.info("workingPath: ", workingPath); + final String outputDoiAuthorListPath = parser.get("outputDoiAuthorListPath"); + logger.info("outputDoiAuthorListPath: ", outputDoiAuthorListPath); + + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaPairRDD summariesRDD = sc.sequenceFile(workingPath + "../orcid_summaries/output/authors.seq", Text.class, Text.class); + Dataset summariesDataset = spark + .createDataset(summariesRDD.map(seq -> loadAuthorFromJson(seq._1(), seq._2())).rdd(), + Encoders.bean(AuthorData.class)); + + JavaPairRDD activitiesRDD = sc.sequenceFile(workingPath + "/output/*.seq", Text.class, Text.class); + Dataset activitiesDataset = spark + .createDataset(activitiesRDD.map(seq -> loadWorkFromJson(seq._1(), seq._2())).rdd(), + Encoders.bean(WorkData.class)); + + }); + + } + + private static AuthorData loadAuthorFromJson(Text orcidId, Text json) { + AuthorData authorData = new AuthorData(); + authorData.setOid(orcidId.toString()); + JsonElement jElement = new JsonParser().parse(json.toString()); + authorData.setName(getJsonValue(jElement, "name")); + authorData.setSurname(getJsonValue(jElement, "surname")); + authorData.setCreditName(getJsonValue(jElement, "creditname")); + return authorData; + } + + private static WorkData loadWorkFromJson(Text orcidId, Text json) { + WorkData workData = new WorkData(); + workData.setOid(orcidId.toString()); + JsonElement jElement = new JsonParser().parse(json.toString()); + workData.setDoi(getJsonValue(jElement, "doi")); + return workData; + } + + private static String getJsonValue(JsonElement jElement, String property) { + if (jElement.getAsJsonObject().has(property)) { + JsonElement name = null; + name = jElement.getAsJsonObject().get(property); + if (name != null && name.isJsonObject()) { + return name.getAsString(); + } + } + return null; + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_doi_author_list_orcid_parameters.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_doi_author_list_orcid_parameters.json new file mode 100644 index 0000000000..b894177b3b --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_doi_author_list_orcid_parameters.json @@ -0,0 +1,3 @@ +[{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working path", "paramRequired": true}, + {"paramName":"o", "paramLongName":"outputDoiAuthorListPath", "paramDescription": "the relative folder of the sequencial file to write the data", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/config-default.xml new file mode 100644 index 0000000000..3726022cbf --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + hadoop-rm3.garr-pa1.d4science.org:8032 + + + nameNode + hdfs://hadoop-rm1.garr-pa1.d4science.org:8020 + + + queueName + default + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/workflow.xml new file mode 100644 index 0000000000..df3a586a88 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_doi_author_list/oozie_app/workflow.xml @@ -0,0 +1,55 @@ + + + + workingPath + the working dir base path + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + yarn + cluster + Gen_Doi_Author_List + eu.dnetlib.doiboost.orcid.SparkGenerateDoiAuthorList + dhp-doiboost-1.2.1-SNAPSHOT.jar + --num-executors 1 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + + -w${workingPath}/ + -odoi_author_list/ + + + + + + + \ No newline at end of file