diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala
index e48f68a7f..b11e2d8de 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/GenerateCrossrefDataset.scala
@@ -1,6 +1,11 @@
package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.doiboost.crossref.CrossrefDataset.to_item
+import eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries.getClass
+import org.apache.hadoop.io.{IntWritable, Text}
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.json4s
@@ -17,13 +22,6 @@ object GenerateCrossrefDataset {
implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT]
- def extractDump(input:String):List[String] = {
- implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
- lazy val json: json4s.JValue = parse(input)
-
- val a = (json \ "items").extract[JArray]
- a.arr.map(s => compact(render(s)))
- }
def crossrefElement(meta: String): CrossrefDT = {
@@ -44,7 +42,7 @@ object GenerateCrossrefDataset {
val targetPath = parser.get("targetPath")
val spark: SparkSession = SparkSession.builder().config(conf)
- .appName(GenerateCrossrefDataset.getClass.getSimpleName)
+ .appName(UnpackCrtossrefEntries.getClass.getSimpleName)
.master(master)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
@@ -52,19 +50,14 @@ object GenerateCrossrefDataset {
import spark.implicits._
- def extractDump(input:String):List[String] = {
- implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
- lazy val json: json4s.JValue = parse(input)
+ val tmp : RDD[String] = sc.textFile(sourcePath,6000)
- val a = (json \ "items").extract[JArray]
- a.arr.map(s => compact(render(s)))
- }
-
-
- sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2))
- .map(meta => crossrefElement(meta))
- .toDS()//.as[CrossrefDT]
- .write.mode(SaveMode.Overwrite).save(targetPath)
+ spark.createDataset(tmp)
+ .map(entry => crossrefElement(entry))
+ .write.mode(SaveMode.Overwrite).save(targetPath)
+// .map(meta => crossrefElement(meta))
+// .toDS.as[CrossrefDT]
+// .write.mode(SaveMode.Overwrite).save(targetPath)
}
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala
index 0272cb1a6..57acaf404 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala
@@ -4,9 +4,9 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
import org.apache.commons.io.IOUtils
-import org.apache.hadoop.io.{IntWritable, Text}
+
import org.apache.spark.SparkConf
-import org.apache.spark.rdd.RDD
+
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
@@ -17,6 +17,7 @@ object SparkMapDumpIntoOAF {
def main(args: Array[String]): Unit = {
+ implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT]
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
val conf: SparkConf = new SparkConf()
@@ -35,7 +36,6 @@ object SparkMapDumpIntoOAF {
implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset]
val targetPath = parser.get("targetPath")
- import spark.implicits._
spark.read.load(parser.get("sourcePath")).as[CrossrefDT]
.flatMap(k => Crossref2Oaf.convert(k.json))
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala
new file mode 100644
index 000000000..95ecb568b
--- /dev/null
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/UnpackCrtossrefEntries.scala
@@ -0,0 +1,54 @@
+package eu.dnetlib.doiboost.crossref
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
+import org.json4s
+import org.json4s.DefaultFormats
+import org.json4s.JsonAST.JArray
+import org.json4s.jackson.JsonMethods.{compact, parse, render}
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.io.Source
+
+object UnpackCrtossrefEntries {
+
+ val log: Logger = LoggerFactory.getLogger(UnpackCrtossrefEntries.getClass)
+
+
+
+
+ def extractDump(input:String):List[String] = {
+ implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+ lazy val json: json4s.JValue = parse(input)
+
+ val a = (json \ "items").extract[JArray]
+ a.arr.map(s => compact(render(s)))
+
+
+ }
+
+
+
+ def main(args: Array[String]): Unit = {
+ val conf = new SparkConf
+ val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json")).mkString)
+ parser.parseArgument(args)
+ val master = parser.get("master")
+ val sourcePath = parser.get("sourcePath")
+ val targetPath = parser.get("targetPath")
+
+ val spark: SparkSession = SparkSession.builder().config(conf)
+ .appName(UnpackCrtossrefEntries.getClass.getSimpleName)
+ .master(master)
+ .getOrCreate()
+ val sc: SparkContext = spark.sparkContext
+
+ sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2))
+ .saveAsTextFile(targetPath, classOf[GzipCodec])
+
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml
index c7dc8bed4..506d86a08 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_dump_reader/oozie_app/workflow.xml
@@ -24,7 +24,7 @@
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
@@ -40,11 +40,37 @@
--workingPath${crossrefDumpPath}
--outputPath${workingDir}/files/
-
+
-
+
+
+ yarn-cluster
+ cluster
+ SparkGenerateCrossrefDataset
+ eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries
+ dhp-doiboost-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.sql.shuffle.partitions=3840
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn-cluster
+ --sourcePath${crossrefDumpPath}/files
+ --targetPath${inputPathCrossref}/crossref_ds
+
+
+
+
+
+
+
yarn-cluster
cluster
@@ -62,31 +88,31 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--masteryarn-cluster
- --sourcePath${workingDir}/files
- --targetPath${inputPathCrossref}/crossref_ds_updated
+ --sourcePath${inputPathCrossref}/crossref_ds
+ --targetPath${inputPathCrossref}/crossref_ds_updates
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml
index 7bd7d107f..fa47e142d 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml
@@ -142,6 +142,32 @@
--workingPath${crossrefDumpPath}
--outputPath${crossrefDumpPath}/files/
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ SparkUnpackCrossrefEntries
+ eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries
+ dhp-doiboost-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.sql.shuffle.partitions=3840
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn-cluster
+ --sourcePath${crossrefDumpPath}/files
+ --targetPath${crossrefDumpPath}/crossref_unpack/
+
+
@@ -154,9 +180,9 @@
eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset
dhp-doiboost-${projectVersion}.jar
- --executor-memory=${sparkExecutorMemory}
+ --executor-memory=7G
--executor-cores=2
- --driver-memory=${sparkDriverMemory}
+ --driver-memory=7G
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@@ -164,7 +190,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--masteryarn-cluster
- --sourcePath${crossrefDumpPath}/files/
+ --sourcePath${crossrefDumpPath}/crossref_unpack/
--targetPath${inputPathCrossref}/crossref_ds
@@ -174,7 +200,8 @@
-
+
+
diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala
index cc112528e..cb543b4d7 100644
--- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala
+++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala
@@ -395,4 +395,71 @@ class CrossrefMappingTest {
}
+
+
+ @Test
+ def testSetDateOfAcceptanceCrossRef2Oaf(): Unit = {
+
+ val json = Source.fromInputStream(getClass.getResourceAsStream("dump_file.json")).mkString
+ assertNotNull(json)
+
+ assertFalse(json.isEmpty);
+
+ val resultList: List[Oaf] = Crossref2Oaf.convert(json)
+
+ assertTrue(resultList.nonEmpty)
+
+ val items = resultList.filter(p => p.isInstanceOf[Publication])
+
+ assert(items.nonEmpty)
+ assert(items.size == 1)
+ val result: Result = items.head.asInstanceOf[Publication]
+ assertNotNull(result)
+
+ logger.info(mapper.writeValueAsString(result));
+
+// assertNotNull(result.getDataInfo, "Datainfo test not null Failed");
+// assertNotNull(
+// result.getDataInfo.getProvenanceaction,
+// "DataInfo/Provenance test not null Failed");
+// assertFalse(
+// result.getDataInfo.getProvenanceaction.getClassid.isEmpty,
+// "DataInfo/Provenance/classId test not null Failed");
+// assertFalse(
+// result.getDataInfo.getProvenanceaction.getClassname.isEmpty,
+// "DataInfo/Provenance/className test not null Failed");
+// assertFalse(
+// result.getDataInfo.getProvenanceaction.getSchemeid.isEmpty,
+// "DataInfo/Provenance/SchemeId test not null Failed");
+// assertFalse(
+// result.getDataInfo.getProvenanceaction.getSchemename.isEmpty,
+// "DataInfo/Provenance/SchemeName test not null Failed");
+//
+// assertNotNull(result.getCollectedfrom, "CollectedFrom test not null Failed");
+// assertFalse(result.getCollectedfrom.isEmpty);
+//
+// val collectedFromList = result.getCollectedfrom.asScala
+// assert(collectedFromList.exists(c => c.getKey.equalsIgnoreCase("10|openaire____::081b82f96300b6a6e3d282bad31cb6e2")), "Wrong collected from assertion")
+//
+// assert(collectedFromList.exists(c => c.getValue.equalsIgnoreCase("crossref")), "Wrong collected from assertion")
+//
+//
+// val relevantDates = result.getRelevantdate.asScala
+//
+// assert(relevantDates.exists(d => d.getQualifier.getClassid.equalsIgnoreCase("created")), "Missing relevant date of type created")
+//
+// val rels = resultList.filter(p => p.isInstanceOf[Relation]).asInstanceOf[List[Relation]]
+// assertFalse(rels.isEmpty)
+// rels.foreach(relation => {
+// assertNotNull(relation)
+// assertFalse(relation.getSource.isEmpty)
+// assertFalse(relation.getTarget.isEmpty)
+// assertFalse(relation.getRelClass.isEmpty)
+// assertFalse(relation.getRelType.isEmpty)
+// assertFalse(relation.getSubRelType.isEmpty)
+//
+// })
+ }
+
+
}
diff --git a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/dump_file.json b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/dump_file.json
new file mode 100644
index 000000000..a59a4ef25
--- /dev/null
+++ b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/dump_file.json
@@ -0,0 +1 @@
+{"indexed":{"date-parts":[[2020,4,10]],"date-time":"2020-04-10T00:38:19Z","timestamp":1586479099385},"reference-count":0,"publisher":"American Medical Association (AMA)","issue":"4","content-domain":{"domain":[],"crossmark-restriction":false},"short-container-title":["Archives of Internal Medicine"],"published-print":{"date-parts":[[2006,2,27]]},"DOI":"10.1001/.389","type":"journal-article","created":{"date-parts":[[2006,2,27]],"date-time":"2006-02-27T21:28:23Z","timestamp":1141075703000},"page":"389-390","source":"Crossref","is-referenced-by-count":0,"title":["Decision Making at the Fringe of Evidence: Take What You Can Get"],"prefix":"10.1001","volume":"166","author":[{"given":"N. F.","family":"Col","affiliation":[]}],"member":"10","container-title":["Archives of Internal Medicine"],"original-title":[],"deposited":{"date-parts":[[2007,2,13]],"date-time":"2007-02-13T20:56:13Z","timestamp":1171400173000},"score":null,"subtitle":[],"short-title":[],"issued":{"date-parts":[[2006,2,27]]},"references-count":0,"URL":"http://dx.doi.org/10.1001/.389","relation":{},"ISSN":["0003-9926"],"issn-type":[{"value":"0003-9926","type":"print"}],"subject":["Internal Medicine"]}
\ No newline at end of file