From 2550a73981edad0e0a09d28f091009ade889f400 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 16 Jun 2021 10:04:41 +0200 Subject: [PATCH 1/7] - --- .../oozie_app/workflow.xml | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml index c7dc8bed45..d4c6ade434 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml @@ -24,7 +24,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -63,30 +63,30 @@ --masteryarn-cluster --sourcePath${workingDir}/files - --targetPath${inputPathCrossref}/crossref_ds_updated + --targetPath${inputPathCrossref}/crossref_ds - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + \ No newline at end of file From 95885bcf1226fbb82a7690d89151f5bc756364ee Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 16 Jun 2021 10:17:52 +0200 Subject: [PATCH 2/7] forces executor Executor memory and driver executor memory to be 7G (trying to avoid OOM) --- .../eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml index 7bd7d107f5..5a943c673d 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml @@ -154,9 +154,9 @@ eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset dhp-doiboost-${projectVersion}.jar - --executor-memory=${sparkExecutorMemory} - --executor-cores=2 - --driver-memory=${sparkDriverMemory} + --executor-memory=7G + --executor-cores=4 + --driver-memory=7G --conf spark.sql.shuffle.partitions=3840 --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} From 3585e53da3818289db8e783f92642fa9520ae1a7 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 18 Jun 2021 09:41:23 +0200 Subject: [PATCH 3/7] changed to split in two steps the generation of the crossref dataset --- .../dhp/doiboost/oozie_app/workflow.xml | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml index 5a943c673d..fa47e142d0 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml @@ -142,6 +142,32 @@ --workingPath${crossrefDumpPath} --outputPath${crossrefDumpPath}/files/ + + + + + + + yarn-cluster + cluster + SparkUnpackCrossrefEntries + eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn-cluster + --sourcePath${crossrefDumpPath}/files + --targetPath${crossrefDumpPath}/crossref_unpack/ + + @@ -155,7 +181,7 @@ dhp-doiboost-${projectVersion}.jar --executor-memory=7G - --executor-cores=4 + --executor-cores=2 --driver-memory=7G --conf spark.sql.shuffle.partitions=3840 --conf spark.extraListeners=${spark2ExtraListeners} @@ -164,7 +190,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --masteryarn-cluster - --sourcePath${crossrefDumpPath}/files/ + --sourcePath${crossrefDumpPath}/crossref_unpack/ --targetPath${inputPathCrossref}/crossref_ds @@ -174,7 +200,8 @@ - + + From 6aca0d8ebb0eab94befbb16c8807a508b9c1a319 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 18 Jun 2021 09:42:07 +0200 Subject: [PATCH 4/7] added kryo encoding for input files --- .../eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala index 0272cb1a64..57acaf404d 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala @@ -4,9 +4,9 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset} import org.apache.commons.io.IOUtils -import org.apache.hadoop.io.{IntWritable, Text} + import org.apache.spark.SparkConf -import org.apache.spark.rdd.RDD + import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} @@ -17,6 +17,7 @@ object SparkMapDumpIntoOAF { def main(args: Array[String]): Unit = { + implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) val conf: SparkConf = new SparkConf() @@ -35,7 +36,6 @@ object SparkMapDumpIntoOAF { implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset] val targetPath = parser.get("targetPath") - import spark.implicits._ spark.read.load(parser.get("sourcePath")).as[CrossrefDT] .flatMap(k => Crossref2Oaf.convert(k.json)) From 464c2ddde363a2f4f0a3ebdbf58cef27486281fd Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 18 Jun 2021 09:42:31 +0200 Subject: [PATCH 5/7] changed to split in two steps the generation of the crossref dataset --- .../crossref/GenerateCrossrefDataset.scala | 33 +++++------- .../crossref/UnpackCrtossrefEntries.scala | 54 +++++++++++++++++++ 2 files changed, 67 insertions(+), 20 deletions(-) create mode 100644 dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala index e48f68a7f7..b11e2d8de0 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala @@ -1,6 +1,11 @@ package eu.dnetlib.doiboost.crossref import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.doiboost.crossref.CrossrefDataset.to_item +import eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries.getClass +import org.apache.hadoop.io.{IntWritable, Text} +import org.apache.hadoop.io.compress.GzipCodec +import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.json4s @@ -17,13 +22,6 @@ object GenerateCrossrefDataset { implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] - def extractDump(input:String):List[String] = { - implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - lazy val json: json4s.JValue = parse(input) - - val a = (json \ "items").extract[JArray] - a.arr.map(s => compact(render(s))) - } def crossrefElement(meta: String): CrossrefDT = { @@ -44,7 +42,7 @@ object GenerateCrossrefDataset { val targetPath = parser.get("targetPath") val spark: SparkSession = SparkSession.builder().config(conf) - .appName(GenerateCrossrefDataset.getClass.getSimpleName) + .appName(UnpackCrtossrefEntries.getClass.getSimpleName) .master(master) .getOrCreate() val sc: SparkContext = spark.sparkContext @@ -52,19 +50,14 @@ object GenerateCrossrefDataset { import spark.implicits._ - def extractDump(input:String):List[String] = { - implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - lazy val json: json4s.JValue = parse(input) + val tmp : RDD[String] = sc.textFile(sourcePath,6000) - val a = (json \ "items").extract[JArray] - a.arr.map(s => compact(render(s))) - } - - - sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2)) - .map(meta => crossrefElement(meta)) - .toDS()//.as[CrossrefDT] - .write.mode(SaveMode.Overwrite).save(targetPath) + spark.createDataset(tmp) + .map(entry => crossrefElement(entry)) + .write.mode(SaveMode.Overwrite).save(targetPath) +// .map(meta => crossrefElement(meta)) +// .toDS.as[CrossrefDT] +// .write.mode(SaveMode.Overwrite).save(targetPath) } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala new file mode 100644 index 0000000000..95ecb568bd --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala @@ -0,0 +1,54 @@ +package eu.dnetlib.doiboost.crossref + +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import org.apache.hadoop.io.compress.GzipCodec +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.JsonAST.JArray +import org.json4s.jackson.JsonMethods.{compact, parse, render} +import org.slf4j.{Logger, LoggerFactory} + +import scala.io.Source + +object UnpackCrtossrefEntries { + + val log: Logger = LoggerFactory.getLogger(UnpackCrtossrefEntries.getClass) + + + + + def extractDump(input:String):List[String] = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + lazy val json: json4s.JValue = parse(input) + + val a = (json \ "items").extract[JArray] + a.arr.map(s => compact(render(s))) + + + } + + + + def main(args: Array[String]): Unit = { + val conf = new SparkConf + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json")).mkString) + parser.parseArgument(args) + val master = parser.get("master") + val sourcePath = parser.get("sourcePath") + val targetPath = parser.get("targetPath") + + val spark: SparkSession = SparkSession.builder().config(conf) + .appName(UnpackCrtossrefEntries.getClass.getSimpleName) + .master(master) + .getOrCreate() + val sc: SparkContext = spark.sparkContext + + sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2)) + .saveAsTextFile(targetPath, classOf[GzipCodec]) + + + } + +} From b486ae498f4c7c5db45399ea9ba9fcd069f26b47 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 18 Jun 2021 09:43:32 +0200 Subject: [PATCH 6/7] added test and test resource to verify the generation of the date of acceptance from the input extracted from the dump --- .../crossref/CrossrefMappingTest.scala | 67 +++++++++++++++++++ .../dnetlib/doiboost/crossref/dump_file.json | 1 + 2 files changed, 68 insertions(+) create mode 100644 dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/dump_file.json diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala index cc112528e6..cb543b4d76 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala @@ -395,4 +395,71 @@ class CrossrefMappingTest { } + + + @Test + def testSetDateOfAcceptanceCrossRef2Oaf(): Unit = { + + val json = Source.fromInputStream(getClass.getResourceAsStream("dump_file.json")).mkString + assertNotNull(json) + + assertFalse(json.isEmpty); + + val resultList: List[Oaf] = Crossref2Oaf.convert(json) + + assertTrue(resultList.nonEmpty) + + val items = resultList.filter(p => p.isInstanceOf[Publication]) + + assert(items.nonEmpty) + assert(items.size == 1) + val result: Result = items.head.asInstanceOf[Publication] + assertNotNull(result) + + logger.info(mapper.writeValueAsString(result)); + +// assertNotNull(result.getDataInfo, "Datainfo test not null Failed"); +// assertNotNull( +// result.getDataInfo.getProvenanceaction, +// "DataInfo/Provenance test not null Failed"); +// assertFalse( +// result.getDataInfo.getProvenanceaction.getClassid.isEmpty, +// "DataInfo/Provenance/classId test not null Failed"); +// assertFalse( +// result.getDataInfo.getProvenanceaction.getClassname.isEmpty, +// "DataInfo/Provenance/className test not null Failed"); +// assertFalse( +// result.getDataInfo.getProvenanceaction.getSchemeid.isEmpty, +// "DataInfo/Provenance/SchemeId test not null Failed"); +// assertFalse( +// result.getDataInfo.getProvenanceaction.getSchemename.isEmpty, +// "DataInfo/Provenance/SchemeName test not null Failed"); +// +// assertNotNull(result.getCollectedfrom, "CollectedFrom test not null Failed"); +// assertFalse(result.getCollectedfrom.isEmpty); +// +// val collectedFromList = result.getCollectedfrom.asScala +// assert(collectedFromList.exists(c => c.getKey.equalsIgnoreCase("10|openaire____::081b82f96300b6a6e3d282bad31cb6e2")), "Wrong collected from assertion") +// +// assert(collectedFromList.exists(c => c.getValue.equalsIgnoreCase("crossref")), "Wrong collected from assertion") +// +// +// val relevantDates = result.getRelevantdate.asScala +// +// assert(relevantDates.exists(d => d.getQualifier.getClassid.equalsIgnoreCase("created")), "Missing relevant date of type created") +// +// val rels = resultList.filter(p => p.isInstanceOf[Relation]).asInstanceOf[List[Relation]] +// assertFalse(rels.isEmpty) +// rels.foreach(relation => { +// assertNotNull(relation) +// assertFalse(relation.getSource.isEmpty) +// assertFalse(relation.getTarget.isEmpty) +// assertFalse(relation.getRelClass.isEmpty) +// assertFalse(relation.getRelType.isEmpty) +// assertFalse(relation.getSubRelType.isEmpty) +// +// }) + } + + } diff --git a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/dump_file.json b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/dump_file.json new file mode 100644 index 0000000000..a59a4ef254 --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/dump_file.json @@ -0,0 +1 @@ +{"indexed":{"date-parts":[[2020,4,10]],"date-time":"2020-04-10T00:38:19Z","timestamp":1586479099385},"reference-count":0,"publisher":"American Medical Association (AMA)","issue":"4","content-domain":{"domain":[],"crossmark-restriction":false},"short-container-title":["Archives of Internal Medicine"],"published-print":{"date-parts":[[2006,2,27]]},"DOI":"10.1001/.389","type":"journal-article","created":{"date-parts":[[2006,2,27]],"date-time":"2006-02-27T21:28:23Z","timestamp":1141075703000},"page":"389-390","source":"Crossref","is-referenced-by-count":0,"title":["Decision Making at the Fringe of Evidence: Take What You Can Get"],"prefix":"10.1001","volume":"166","author":[{"given":"N. F.","family":"Col","affiliation":[]}],"member":"10","container-title":["Archives of Internal Medicine"],"original-title":[],"deposited":{"date-parts":[[2007,2,13]],"date-time":"2007-02-13T20:56:13Z","timestamp":1171400173000},"score":null,"subtitle":[],"short-title":[],"issued":{"date-parts":[[2006,2,27]]},"references-count":0,"URL":"http://dx.doi.org/10.1001/.389","relation":{},"ISSN":["0003-9926"],"issn-type":[{"value":"0003-9926","type":"print"}],"subject":["Internal Medicine"]} \ No newline at end of file From 13c96622c935cc6191cad6b40ef0a40b1ceeb262 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 18 Jun 2021 09:45:16 +0200 Subject: [PATCH 7/7] - --- .../oozie_app/workflow.xml | 36 ++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml index d4c6ade434..506d86a081 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml @@ -24,7 +24,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -40,11 +40,37 @@ --workingPath${crossrefDumpPath} --outputPath${workingDir}/files/ - + - + + + yarn-cluster + cluster + SparkGenerateCrossrefDataset + eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.sql.shuffle.partitions=3840 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --masteryarn-cluster + --sourcePath${crossrefDumpPath}/files + --targetPath${inputPathCrossref}/crossref_ds + + + + + + + yarn-cluster cluster @@ -62,8 +88,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --masteryarn-cluster - --sourcePath${workingDir}/files - --targetPath${inputPathCrossref}/crossref_ds + --sourcePath${inputPathCrossref}/crossref_ds + --targetPath${inputPathCrossref}/crossref_ds_updates