From b31dd126fbb40a793cfe7284fa25fc160765f403 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 7 Dec 2020 10:42:38 +0100 Subject: [PATCH] fixed crossref workflow added common ORCID Class --- dhp-schemas/pom.xml | 2 +- .../eu/dnetlib/dhp/schema/orcid/OrcidDOI.java | 24 ++++ .../doiboost/crossref/Crossref2Oaf.scala | 4 +- .../doiboost/crossref/CrossrefDataset.scala | 38 ++++--- .../crossref/SparkMapDumpIntoOAF.scala | 70 +----------- .../doiboost/crossref/oozie_app/workflow.xml | 106 +++++++++--------- .../doiboost/crossref_to_dataset_params.json | 3 +- dhp-workflows/pom.xml | 2 +- 8 files changed, 106 insertions(+), 143 deletions(-) create mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidDOI.java diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml index b04d62dd2..73efeabb4 100644 --- a/dhp-schemas/pom.xml +++ b/dhp-schemas/pom.xml @@ -6,7 +6,7 @@ eu.dnetlib.dhp dhp 1.2.4-SNAPSHOT - ../ + ../pom.xml dhp-schemas diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidDOI.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidDOI.java new file mode 100644 index 000000000..11bce26c8 --- /dev/null +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidDOI.java @@ -0,0 +1,24 @@ +package eu.dnetlib.dhp.schema.orcid; + +import java.util.List; + +public class OrcidDOI { + private String doi; + private List authors; + + public String getDoi() { + return doi; + } + + public void setDoi(String doi) { + this.doi = doi; + } + + public List getAuthors() { + return authors; + } + + public void setAuthors(List authors) { + this.authors = authors; + } +} diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index 1adb7465e..5ba01357e 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -200,7 +200,7 @@ case object Crossref2Oaf { a.setSurname(family) a.setFullname(s"$given $family") if (StringUtils.isNotBlank(orcid)) - a.setPid(List(createSP(orcid, ORCID, PID_TYPES)).asJava) + a.setPid(List(createSP(orcid, ORCID, PID_TYPES, generateDataInfo())).asJava) a } @@ -248,7 +248,7 @@ case object Crossref2Oaf { def snsfRule(award:String): String = { - var tmp1 = StringUtils.substringAfter(award,"_") + val tmp1 = StringUtils.substringAfter(award,"_") val tmp2 = StringUtils.substringBefore(tmp1,"/") logger.debug(s"From $award to $tmp2") tmp2 diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala index 996ba5585..4a39a2987 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala @@ -2,6 +2,7 @@ package eu.dnetlib.doiboost.crossref import eu.dnetlib.dhp.application.ArgumentApplicationParser import org.apache.commons.io.IOUtils +import org.apache.hadoop.io.{IntWritable, Text} import org.apache.spark.SparkConf import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} @@ -12,21 +13,23 @@ import org.slf4j.{Logger, LoggerFactory} object CrossrefDataset { + val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) - def extractTimestamp(input:String): Long = { + + def to_item(input:String):CrossrefDT = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: json4s.JValue = parse(input) - - (json\"indexed"\"timestamp").extractOrElse[Long](0) + val ts:Long = (json \ "indexed" \ "timestamp").extract[Long] + val doi:String = (json \ "DOI").extract[String] + CrossrefDT(doi, input, ts) } - def main(args: Array[String]): Unit = { - val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) + val conf: SparkConf = new SparkConf() val parser = new ArgumentApplicationParser(IOUtils.toString(CrossrefDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json"))) parser.parseArgument(args) @@ -49,9 +52,8 @@ object CrossrefDataset { if (a == null) return b - val tb = extractTimestamp(b.json) - val ta = extractTimestamp(a.json) - if(ta >tb) { + + if(a.timestamp >b.timestamp) { return a } b @@ -63,9 +65,7 @@ object CrossrefDataset { if (a == null) return b - val tb = extractTimestamp(b.json) - val ta = extractTimestamp(a.json) - if(ta >tb) { + if(a.timestamp >b.timestamp) { return a } b @@ -78,15 +78,21 @@ object CrossrefDataset { override def finish(reduction: CrossrefDT): CrossrefDT = reduction } - val sourcePath:String = parser.get("sourcePath") - val targetPath:String = parser.get("targetPath") + val workingPath:String = parser.get("workingPath") - val ds:Dataset[CrossrefDT] = spark.read.load(sourcePath).as[CrossrefDT] - ds.groupByKey(_.doi) + val main_ds:Dataset[CrossrefDT] = spark.read.load(s"$workingPath/crossref_ds").as[CrossrefDT] + + + val update = + spark.createDataset(spark.sparkContext.sequenceFile(s"$workingPath/index_update", classOf[IntWritable], classOf[Text]) + .map(i =>CrossrefImporter.decompressBlob(i._2.toString)) + .map(i =>to_item(i))) + + main_ds.union(update).groupByKey(_.doi) .agg(crossrefAggregator.toColumn) .map(s=>s._2) - .write.mode(SaveMode.Overwrite).save(targetPath) + .write.mode(SaveMode.Overwrite).save(s"$workingPath/crossref_ds_updated") } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala index 08319058c..0272cb1a6 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala @@ -34,85 +34,21 @@ object SparkMapDumpIntoOAF { implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.kryo[Relation] implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset] - val sc = spark.sparkContext val targetPath = parser.get("targetPath") import spark.implicits._ - spark.read.load(parser.get("sourcePath")).as[CrossrefDT] .flatMap(k => Crossref2Oaf.convert(k.json)) .filter(o => o != null) .write.mode(SaveMode.Overwrite).save(s"$targetPath/mixObject") - val ds:Dataset[Oaf] = spark.read.load(s"$targetPath/mixObject").as[Oaf] - ds.filter(o => o.isInstanceOf[Publication]).map(o => o.asInstanceOf[Publication]).write.save(s"$targetPath/publication") + ds.filter(o => o.isInstanceOf[Publication]).map(o => o.asInstanceOf[Publication]).write.mode(SaveMode.Overwrite).save(s"$targetPath/crossrefPublication") - ds.filter(o => o.isInstanceOf[Relation]).map(o => o.asInstanceOf[Relation]).write.save(s"$targetPath/relation") + ds.filter(o => o.isInstanceOf[Relation]).map(o => o.asInstanceOf[Relation]).write.mode(SaveMode.Overwrite).save(s"$targetPath/crossrefRelation") - ds.filter(o => o.isInstanceOf[OafDataset]).map(o => o.asInstanceOf[OafDataset]).write.save(s"$targetPath/dataset") - - - -// -// -// -// sc.sequenceFile(parser.get("sourcePath"), classOf[IntWritable], classOf[Text]) -// .map(k => k._2.toString).map(CrossrefImporter.decompressBlob) -// .flatMap(k => Crossref2Oaf.convert(k)).saveAsObjectFile(s"${targetPath}/mixObject") -// -// val inputRDD = sc.objectFile[Oaf](s"${targetPath}/mixObject").filter(p=> p!= null) -// -// val distinctPubs:RDD[Publication] = inputRDD.filter(k => k != null && k.isInstanceOf[Publication]) -// .map(k => k.asInstanceOf[Publication]).map { p: Publication => Tuple2(p.getId, p) }.reduceByKey { case (p1: Publication, p2: Publication) => -// var r = if (p1 == null) p2 else p1 -// if (p1 != null && p2 != null) { -// if (p1.getLastupdatetimestamp != null && p2.getLastupdatetimestamp != null) { -// if (p1.getLastupdatetimestamp < p2.getLastupdatetimestamp) -// r = p2 -// else -// r = p1 -// } else { -// r = if (p1.getLastupdatetimestamp == null) p2 else p1 -// } -// } -// r -// }.map(_._2) -// -// val pubs:Dataset[Publication] = spark.createDataset(distinctPubs) -// pubs.write.mode(SaveMode.Overwrite).save(s"${targetPath}/publication") -// -// -// val distincDatasets:RDD[OafDataset] = inputRDD.filter(k => k != null && k.isInstanceOf[OafDataset]) -// .map(k => k.asInstanceOf[OafDataset]).map(p => Tuple2(p.getId, p)).reduceByKey { case (p1: OafDataset, p2: OafDataset) => -// var r = if (p1 == null) p2 else p1 -// if (p1 != null && p2 != null) { -// if (p1.getLastupdatetimestamp != null && p2.getLastupdatetimestamp != null) { -// if (p1.getLastupdatetimestamp < p2.getLastupdatetimestamp) -// r = p2 -// else -// r = p1 -// } else { -// r = if (p1.getLastupdatetimestamp == null) p2 else p1 -// } -// } -// r -// }.map(_._2) -// -// spark.createDataset(distincDatasets).write.mode(SaveMode.Overwrite).save(s"${targetPath}/dataset") -// -// -// -// val distinctRels =inputRDD.filter(k => k != null && k.isInstanceOf[Relation]) -// .map(k => k.asInstanceOf[Relation]).map(r=> (s"${r.getSource}::${r.getTarget}",r)) -// .reduceByKey { case (p1: Relation, p2: Relation) => -// if (p1 == null) p2 else p1 -// }.map(_._2) -// -// val rels: Dataset[Relation] = spark.createDataset(distinctRels) -// -// rels.write.mode(SaveMode.Overwrite).save(s"${targetPath}/relations") + ds.filter(o => o.isInstanceOf[OafDataset]).map(o => o.asInstanceOf[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$targetPath/crossrefDataset") } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml index a9cc9ea3c..63c2e9ef2 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml @@ -16,88 +16,86 @@ sparkExecutorCores number of cores used by single executor - - - - + + timestamp + Timestamp for incremental Harvesting + - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - + + + ${jobTracker} + ${nameNode} + eu.dnetlib.doiboost.crossref.CrossrefImporter + -t${workingPath}/input/crossref/index_update + -n${nameNode} + -ts${timestamp} + + + + + + + + yarn-cluster + cluster + ExtractCrossrefToOAF + eu.dnetlib.doiboost.crossref.CrossrefDataset + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + ${sparkExtraOPT} + + --workingPath/data/doiboost/input/crossref + --masteryarn-cluster + + + + + + + + + + + + + - - - - - - - - - - - - - - - - + yarn-cluster cluster - ExtractCrossrefToOAF + ConvertCrossrefToOAF eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF dhp-doiboost-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 ${sparkExtraOPT} --sourcePath${workingPath}/input/crossref/crossref_ds - --targetPath${workingPath}/input/crossref + --targetPath${workingPath}/process/ --masteryarn-cluster - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json index 312bd0751..23c0fdabc 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json @@ -1,6 +1,5 @@ [ - {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, - {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working dir path", "paramRequired": true}, {"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index f1167b184..190c9847e 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -7,7 +7,7 @@ eu.dnetlib.dhp dhp 1.2.4-SNAPSHOT - ../ + ../pom.xml dhp-workflows