diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
index 096217a55..579ce8d42 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
@@ -14,7 +14,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.matching.Regex
-case class CrossrefDT(doi: String, json:String) {}
+case class CrossrefDT(doi: String, json:String, timestamp: Long) {}
case class mappingAffiliation(name: String) {}
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala
index fac4c90b4..08319058c 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala
@@ -29,69 +29,90 @@ object SparkMapDumpIntoOAF {
.appName(SparkMapDumpIntoOAF.getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
+ implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.kryo[Relation]
implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset]
val sc = spark.sparkContext
val targetPath = parser.get("targetPath")
+ import spark.implicits._
- sc.sequenceFile(parser.get("sourcePath"), classOf[IntWritable], classOf[Text])
- .map(k => k._2.toString).map(CrossrefImporter.decompressBlob)
- .flatMap(k => Crossref2Oaf.convert(k)).saveAsObjectFile(s"${targetPath}/mixObject")
-
- val inputRDD = sc.objectFile[Oaf](s"${targetPath}/mixObject").filter(p=> p!= null)
-
- val distinctPubs:RDD[Publication] = inputRDD.filter(k => k != null && k.isInstanceOf[Publication])
- .map(k => k.asInstanceOf[Publication]).map { p: Publication => Tuple2(p.getId, p) }.reduceByKey { case (p1: Publication, p2: Publication) =>
- var r = if (p1 == null) p2 else p1
- if (p1 != null && p2 != null) {
- if (p1.getLastupdatetimestamp != null && p2.getLastupdatetimestamp != null) {
- if (p1.getLastupdatetimestamp < p2.getLastupdatetimestamp)
- r = p2
- else
- r = p1
- } else {
- r = if (p1.getLastupdatetimestamp == null) p2 else p1
- }
- }
- r
- }.map(_._2)
-
- val pubs:Dataset[Publication] = spark.createDataset(distinctPubs)
- pubs.write.mode(SaveMode.Overwrite).save(s"${targetPath}/publication")
+ spark.read.load(parser.get("sourcePath")).as[CrossrefDT]
+ .flatMap(k => Crossref2Oaf.convert(k.json))
+ .filter(o => o != null)
+ .write.mode(SaveMode.Overwrite).save(s"$targetPath/mixObject")
- val distincDatasets:RDD[OafDataset] = inputRDD.filter(k => k != null && k.isInstanceOf[OafDataset])
- .map(k => k.asInstanceOf[OafDataset]).map(p => Tuple2(p.getId, p)).reduceByKey { case (p1: OafDataset, p2: OafDataset) =>
- var r = if (p1 == null) p2 else p1
- if (p1 != null && p2 != null) {
- if (p1.getLastupdatetimestamp != null && p2.getLastupdatetimestamp != null) {
- if (p1.getLastupdatetimestamp < p2.getLastupdatetimestamp)
- r = p2
- else
- r = p1
- } else {
- r = if (p1.getLastupdatetimestamp == null) p2 else p1
- }
- }
- r
- }.map(_._2)
+ val ds:Dataset[Oaf] = spark.read.load(s"$targetPath/mixObject").as[Oaf]
- spark.createDataset(distincDatasets).write.mode(SaveMode.Overwrite).save(s"${targetPath}/dataset")
+ ds.filter(o => o.isInstanceOf[Publication]).map(o => o.asInstanceOf[Publication]).write.save(s"$targetPath/publication")
+
+ ds.filter(o => o.isInstanceOf[Relation]).map(o => o.asInstanceOf[Relation]).write.save(s"$targetPath/relation")
+
+ ds.filter(o => o.isInstanceOf[OafDataset]).map(o => o.asInstanceOf[OafDataset]).write.save(s"$targetPath/dataset")
- val distinctRels =inputRDD.filter(k => k != null && k.isInstanceOf[Relation])
- .map(k => k.asInstanceOf[Relation]).map(r=> (s"${r.getSource}::${r.getTarget}",r))
- .reduceByKey { case (p1: Relation, p2: Relation) =>
- if (p1 == null) p2 else p1
- }.map(_._2)
-
- val rels: Dataset[Relation] = spark.createDataset(distinctRels)
-
- rels.write.mode(SaveMode.Overwrite).save(s"${targetPath}/relations")
+//
+//
+//
+// sc.sequenceFile(parser.get("sourcePath"), classOf[IntWritable], classOf[Text])
+// .map(k => k._2.toString).map(CrossrefImporter.decompressBlob)
+// .flatMap(k => Crossref2Oaf.convert(k)).saveAsObjectFile(s"${targetPath}/mixObject")
+//
+// val inputRDD = sc.objectFile[Oaf](s"${targetPath}/mixObject").filter(p=> p!= null)
+//
+// val distinctPubs:RDD[Publication] = inputRDD.filter(k => k != null && k.isInstanceOf[Publication])
+// .map(k => k.asInstanceOf[Publication]).map { p: Publication => Tuple2(p.getId, p) }.reduceByKey { case (p1: Publication, p2: Publication) =>
+// var r = if (p1 == null) p2 else p1
+// if (p1 != null && p2 != null) {
+// if (p1.getLastupdatetimestamp != null && p2.getLastupdatetimestamp != null) {
+// if (p1.getLastupdatetimestamp < p2.getLastupdatetimestamp)
+// r = p2
+// else
+// r = p1
+// } else {
+// r = if (p1.getLastupdatetimestamp == null) p2 else p1
+// }
+// }
+// r
+// }.map(_._2)
+//
+// val pubs:Dataset[Publication] = spark.createDataset(distinctPubs)
+// pubs.write.mode(SaveMode.Overwrite).save(s"${targetPath}/publication")
+//
+//
+// val distincDatasets:RDD[OafDataset] = inputRDD.filter(k => k != null && k.isInstanceOf[OafDataset])
+// .map(k => k.asInstanceOf[OafDataset]).map(p => Tuple2(p.getId, p)).reduceByKey { case (p1: OafDataset, p2: OafDataset) =>
+// var r = if (p1 == null) p2 else p1
+// if (p1 != null && p2 != null) {
+// if (p1.getLastupdatetimestamp != null && p2.getLastupdatetimestamp != null) {
+// if (p1.getLastupdatetimestamp < p2.getLastupdatetimestamp)
+// r = p2
+// else
+// r = p1
+// } else {
+// r = if (p1.getLastupdatetimestamp == null) p2 else p1
+// }
+// }
+// r
+// }.map(_._2)
+//
+// spark.createDataset(distincDatasets).write.mode(SaveMode.Overwrite).save(s"${targetPath}/dataset")
+//
+//
+//
+// val distinctRels =inputRDD.filter(k => k != null && k.isInstanceOf[Relation])
+// .map(k => k.asInstanceOf[Relation]).map(r=> (s"${r.getSource}::${r.getTarget}",r))
+// .reduceByKey { case (p1: Relation, p2: Relation) =>
+// if (p1 == null) p2 else p1
+// }.map(_._2)
+//
+// val rels: Dataset[Relation] = spark.createDataset(distinctRels)
+//
+// rels.write.mode(SaveMode.Overwrite).save(s"${targetPath}/relations")
}
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml
index be4a45afe..a9cc9ea3c 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml
@@ -16,10 +16,10 @@
sparkExecutorCores
number of cores used by single executor
-
- timestamp
- Timestamp for incremental Harvesting
-
+
+
+
+
@@ -30,29 +30,29 @@
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
-
-
- ${jobTracker}
- ${nameNode}
- eu.dnetlib.doiboost.crossref.CrossrefImporter
- -t${workingPath}/input/crossref/index_dump_1
- -n${nameNode}
- -ts${timestamp}
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
@@ -68,7 +68,7 @@
--driver-memory=${sparkDriverMemory}
${sparkExtraOPT}
- --sourcePath${workingPath}/input/crossref/index_dump,${workingPath}/input/crossref/index_dump_1,${workingPath}/crossref/index_dump
+ --sourcePath${workingPath}/input/crossref/crossref_ds
--targetPath${workingPath}/input/crossref
--masteryarn-cluster
@@ -78,26 +78,26 @@
-
-
- yarn-cluster
- cluster
- ExtractCrossrefToOAF
- eu.dnetlib.doiboost.crossref.CrossrefDataset
- dhp-doiboost-${projectVersion}.jar
-
- --executor-memory=${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- ${sparkExtraOPT}
-
- --sourcePath/data/doiboost/crossref/cr_dataset
- --targetPath/data/doiboost/crossref/crossrefDataset
- --masteryarn-cluster
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file