From b1b589ada19e9ed11334b73d0e11290c9e8ef365 Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Mon, 7 Dec 2020 11:02:32 +0100 Subject: [PATCH] wf to generate orcid dataset --- .../eu/dnetlib/dhp/schema/orcid/Summary.java | 79 ++++++++++ .../orcid/SparkUpdateOrcidDatasets.java | 140 ++++++++++++++++++ .../doiboost/orcid/xml/XMLRecordParser.java | 44 ++++++ .../orcid_update/oozie_app/workflow.xml | 92 ++++++++++++ .../orcid/xml/XMLRecordParserTest.java | 7 +- 5 files changed, 360 insertions(+), 2 deletions(-) create mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/Summary.java create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidDatasets.java create mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_update/oozie_app/workflow.xml diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/Summary.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/Summary.java new file mode 100644 index 0000000000..ffebf50210 --- /dev/null +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/Summary.java @@ -0,0 +1,79 @@ + +package eu.dnetlib.dhp.schema.orcid; + +import java.io.Serializable; + +public class Summary implements Serializable { + private String creationMethod; + private String completionDate; + private String submissionDate; + private String lastModifiedDate; + private boolean claimed; + private String deactivationDate; + private boolean verifiedEmail; + private boolean verifiedPrimaryEmail; + + public String getCreationMethod() { + return creationMethod; + } + + public void setCreationMethod(String creationMethod) { + this.creationMethod = creationMethod; + } + + public String getCompletionDate() { + return completionDate; + } + + public void setCompletionDate(String completionDate) { + this.completionDate = completionDate; + } + + public String getSubmissionDate() { + return submissionDate; + } + + public void setSubmissionDate(String submissionDate) { + this.submissionDate = submissionDate; + } + + public String getLastModifiedDate() { + return lastModifiedDate; + } + + public void setLastModifiedDate(String lastModifiedDate) { + this.lastModifiedDate = lastModifiedDate; + } + + public boolean isClaimed() { + return claimed; + } + + public void setClaimed(boolean claimed) { + this.claimed = claimed; + } + + public String getDeactivationDate() { + return deactivationDate; + } + + public void setDeactivationDate(String deactivationDate) { + this.deactivationDate = deactivationDate; + } + + public boolean isVerifiedEmail() { + return verifiedEmail; + } + + public void setVerifiedEmail(boolean verifiedEmail) { + this.verifiedEmail = verifiedEmail; + } + + public boolean isVerifiedPrimaryEmail() { + return verifiedPrimaryEmail; + } + + public void setVerifiedPrimaryEmail(boolean verifiedPrimaryEmail) { + this.verifiedPrimaryEmail = verifiedPrimaryEmail; + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidDatasets.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidDatasets.java new file mode 100644 index 0000000000..ed7114b272 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkUpdateOrcidDatasets.java @@ -0,0 +1,140 @@ + +package eu.dnetlib.doiboost.orcid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.esotericsoftware.minlog.Log; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.orcid.AuthorData; +import eu.dnetlib.doiboost.orcid.model.WorkData; +import eu.dnetlib.doiboost.orcid.xml.XMLRecordParser; +import eu.dnetlib.doiboost.orcidnodoi.json.JsonWriter; +import eu.dnetlib.doiboost.orcidnodoi.model.WorkDataNoDoi; +import eu.dnetlib.doiboost.orcidnodoi.xml.XMLRecordParserNoDoi; +import scala.Tuple2; + +public class SparkUpdateOrcidDatasets { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws IOException, Exception { + Logger logger = LoggerFactory.getLogger(SparkUpdateOrcidDatasets.class); + logger.info("[ SparkUpdateOrcidDatasets STARTED]"); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkUpdateOrcidDatasets.class + .getResourceAsStream( + "/eu/dnetlib/dhp/doiboost/download_orcid_data.json"))); + parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + logger.info("isSparkSessionManaged: {}", isSparkSessionManaged); + final String workingPath = parser.get("workingPath"); + logger.info("workingPath: ", workingPath); +// final String outputPath = parser.get("outputPath"); +// logger.info("outputPath: ", outputPath); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true"); + + JavaPairRDD xmlSummariesRDD = sc + .sequenceFile(workingPath.concat("xml/authors/xml_authors.seq"), Text.class, Text.class); + xmlSummariesRDD + .repartition(5) + .map(seq -> XMLRecordParser.VTDParseAuthorData(seq._2().toString().getBytes())) + .filter(summary -> summary != null) + .mapToPair( + summary -> new Tuple2<>(summary.getOid(), + OBJECT_MAPPER.writeValueAsString(summary))) + .mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2()))) + .saveAsNewAPIHadoopFile( + workingPath.concat("orcid_dataset/authors"), + Text.class, + Text.class, + SequenceFileOutputFormat.class, + sc.hadoopConfiguration()); + + JavaPairRDD xmlWorksRDD = sc + .sequenceFile(workingPath.concat("xml/works/*"), Text.class, Text.class); + + xmlWorksRDD + .map(seq -> XMLRecordParserNoDoi.VTDParseWorkData(seq._2().toString().getBytes())) + .filter(work -> work != null) + .mapToPair( + work -> new Tuple2<>(work.getOid().concat("_").concat(work.getId()), + OBJECT_MAPPER.writeValueAsString(work))) + .mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2()))) + .saveAsNewAPIHadoopFile( + workingPath.concat("orcid_dataset/works"), + Text.class, + Text.class, + SequenceFileOutputFormat.class, + sc.hadoopConfiguration()); + }); + + } + + private static AuthorData loadAuthorFromJson(Text orcidId, Text json) { + AuthorData authorData = new AuthorData(); + authorData.setOid(orcidId.toString()); + JsonElement jElement = new JsonParser().parse(json.toString()); + authorData.setName(getJsonValue(jElement, "name")); + authorData.setSurname(getJsonValue(jElement, "surname")); + authorData.setCreditName(getJsonValue(jElement, "creditname")); + return authorData; + } + + private static WorkData loadWorkFromJson(Text orcidId, Text json) { + WorkData workData = new WorkData(); + workData.setOid(orcidId.toString()); + JsonElement jElement = new JsonParser().parse(json.toString()); + workData.setDoi(getJsonValue(jElement, "doi")); + return workData; + } + + private static String getJsonValue(JsonElement jElement, String property) { + if (jElement.getAsJsonObject().has(property)) { + JsonElement name = null; + name = jElement.getAsJsonObject().get(property); + if (name != null && !name.isJsonNull()) { + return name.getAsString(); + } + } + return null; + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParser.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParser.java index 8787a8dd28..b6acadb725 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParser.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParser.java @@ -4,6 +4,7 @@ package eu.dnetlib.doiboost.orcid.xml; import java.io.IOException; import java.util.*; +import org.apache.commons.lang3.StringUtils; import org.mortbay.log.Log; import com.ximpleware.*; @@ -31,6 +32,8 @@ public class XMLRecordParser { private static final String NS_ACTIVITIES_URL = "http://www.orcid.org/ns/activities"; private static final String NS_WORK = "work"; private static final String NS_WORK_URL = "http://www.orcid.org/ns/work"; + private static final String NS_HISTORY = "history"; + private static final String NS_HISTORY_URL = "http://www.orcid.org/ns/history"; private static final String NS_ERROR = "error"; @@ -47,6 +50,7 @@ public class XMLRecordParser { ap.declareXPathNameSpace(NS_OTHER, NS_OTHER_URL); ap.declareXPathNameSpace(NS_RECORD, NS_RECORD_URL); ap.declareXPathNameSpace(NS_ERROR, NS_ERROR_URL); + ap.declareXPathNameSpace(NS_HISTORY, NS_HISTORY_URL); AuthorData authorData = new AuthorData(); final List errors = VtdUtilityParser.getTextValue(ap, vn, "//error:response-code"); @@ -85,6 +89,46 @@ public class XMLRecordParser { authorData.setOtherNames(otherNames); } +// final String creationMethod = VtdUtilityParser.getSingleValue(ap, vn, "//history:creation-method"); +// if (StringUtils.isNoneBlank(creationMethod)) { +// authorData.setCreationMethod(creationMethod); +// } +// +// final String completionDate = VtdUtilityParser.getSingleValue(ap, vn, "//history:completion-date"); +// if (StringUtils.isNoneBlank(completionDate)) { +// authorData.setCompletionDate(completionDate); +// } +// +// final String submissionDate = VtdUtilityParser.getSingleValue(ap, vn, "//history:submission-date"); +// if (StringUtils.isNoneBlank(submissionDate)) { +// authorData.setSubmissionDate(submissionDate); +// } +// +// final String claimed = VtdUtilityParser.getSingleValue(ap, vn, "//history:claimed"); +// if (StringUtils.isNoneBlank(claimed)) { +// authorData.setClaimed(Boolean.parseBoolean(claimed)); +// } +// +// final String verifiedEmail = VtdUtilityParser.getSingleValue(ap, vn, "//history:verified-email"); +// if (StringUtils.isNoneBlank(verifiedEmail)) { +// authorData.setVerifiedEmail(Boolean.parseBoolean(verifiedEmail)); +// } +// +// final String verifiedPrimaryEmail = VtdUtilityParser.getSingleValue(ap, vn, "//history:verified-primary-email"); +// if (StringUtils.isNoneBlank(verifiedPrimaryEmail)) { +// authorData.setVerifiedPrimaryEmail(Boolean.parseBoolean(verifiedPrimaryEmail)); +// } +// +// final String deactivationDate = VtdUtilityParser.getSingleValue(ap, vn, "//history:deactivation-date"); +// if (StringUtils.isNoneBlank(deactivationDate)) { +// authorData.setDeactivationDate(deactivationDate); +// } +// +// final String lastModifiedDate = VtdUtilityParser +// .getSingleValue(ap, vn, "//history:history/common:last-modified-date"); +// if (StringUtils.isNoneBlank(lastModifiedDate)) { +// authorData.setLastModifiedDate(lastModifiedDate); +// } return authorData; } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_update/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_update/oozie_app/workflow.xml new file mode 100644 index 0000000000..d2238a3783 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcid_update/oozie_app/workflow.xml @@ -0,0 +1,92 @@ + + + + spark2MaxExecutors + 5 + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + workingPath + the working dir base path + + + + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn-cluster + cluster + UpdateOrcidDatasets + eu.dnetlib.doiboost.orcid.SparkUpdateOrcidDatasets + dhp-doiboost-${projectVersion}.jar + + --conf spark.dynamicAllocation.enabled=true + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + + -w${workingPath}/ + -n${nameNode} + -f- + -o- + -t- + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParserTest.java b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParserTest.java index aeb9400a6a..722e9fd342 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParserTest.java +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParserTest.java @@ -12,6 +12,7 @@ import java.util.Map; import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.Test; +import com.fasterxml.jackson.databind.ObjectMapper; import com.ximpleware.*; import eu.dnetlib.dhp.schema.orcid.AuthorData; @@ -25,9 +26,10 @@ public class XMLRecordParserTest { private static final String NS_WORK_URL = "http://www.orcid.org/ns/work"; private static final String NS_COMMON_URL = "http://www.orcid.org/ns/common"; private static final String NS_COMMON = "common"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @Test - private void testOrcidAuthorDataXMLParser() throws Exception { + public void testOrcidAuthorDataXMLParser() throws Exception { String xml = IOUtils.toString(this.getClass().getResourceAsStream("summary_0000-0001-6828-479X.xml")); @@ -39,6 +41,7 @@ public class XMLRecordParserTest { System.out.println("name: " + authorData.getName()); assertNotNull(authorData.getSurname()); System.out.println("surname: " + authorData.getSurname()); + OrcidClientTest.logToFile(OBJECT_MAPPER.writeValueAsString(authorData)); } @Test @@ -86,7 +89,7 @@ public class XMLRecordParserTest { } @Test - public void testWorkIdLastModifiedDateXMLParser() throws Exception { + private void testWorkIdLastModifiedDateXMLParser() throws Exception { String xml = IOUtils .toString( this.getClass().getResourceAsStream("record_0000-0001-5004-5918.xml"));