diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml
index b04d62dd2..73efeabb4 100644
--- a/dhp-schemas/pom.xml
+++ b/dhp-schemas/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp
1.2.4-SNAPSHOT
- ../
+ ../pom.xml
dhp-schemas
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidDOI.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidDOI.java
new file mode 100644
index 000000000..11bce26c8
--- /dev/null
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/orcid/OrcidDOI.java
@@ -0,0 +1,24 @@
+package eu.dnetlib.dhp.schema.orcid;
+
+import java.util.List;
+
+public class OrcidDOI {
+ private String doi;
+ private List authors;
+
+ public String getDoi() {
+ return doi;
+ }
+
+ public void setDoi(String doi) {
+ this.doi = doi;
+ }
+
+ public List getAuthors() {
+ return authors;
+ }
+
+ public void setAuthors(List authors) {
+ this.authors = authors;
+ }
+}
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
index 1adb7465e..5ba01357e 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala
@@ -200,7 +200,7 @@ case object Crossref2Oaf {
a.setSurname(family)
a.setFullname(s"$given $family")
if (StringUtils.isNotBlank(orcid))
- a.setPid(List(createSP(orcid, ORCID, PID_TYPES)).asJava)
+ a.setPid(List(createSP(orcid, ORCID, PID_TYPES, generateDataInfo())).asJava)
a
}
@@ -248,7 +248,7 @@ case object Crossref2Oaf {
def snsfRule(award:String): String = {
- var tmp1 = StringUtils.substringAfter(award,"_")
+ val tmp1 = StringUtils.substringAfter(award,"_")
val tmp2 = StringUtils.substringBefore(tmp1,"/")
logger.debug(s"From $award to $tmp2")
tmp2
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala
index 996ba5585..4a39a2987 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/CrossrefDataset.scala
@@ -2,6 +2,7 @@ package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.commons.io.IOUtils
+import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
@@ -12,21 +13,23 @@ import org.slf4j.{Logger, LoggerFactory}
object CrossrefDataset {
+ val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
- def extractTimestamp(input:String): Long = {
+
+ def to_item(input:String):CrossrefDT = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input)
-
- (json\"indexed"\"timestamp").extractOrElse[Long](0)
+ val ts:Long = (json \ "indexed" \ "timestamp").extract[Long]
+ val doi:String = (json \ "DOI").extract[String]
+ CrossrefDT(doi, input, ts)
}
-
def main(args: Array[String]): Unit = {
- val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
+
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(CrossrefDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json")))
parser.parseArgument(args)
@@ -49,9 +52,8 @@ object CrossrefDataset {
if (a == null)
return b
- val tb = extractTimestamp(b.json)
- val ta = extractTimestamp(a.json)
- if(ta >tb) {
+
+ if(a.timestamp >b.timestamp) {
return a
}
b
@@ -63,9 +65,7 @@ object CrossrefDataset {
if (a == null)
return b
- val tb = extractTimestamp(b.json)
- val ta = extractTimestamp(a.json)
- if(ta >tb) {
+ if(a.timestamp >b.timestamp) {
return a
}
b
@@ -78,15 +78,21 @@ object CrossrefDataset {
override def finish(reduction: CrossrefDT): CrossrefDT = reduction
}
- val sourcePath:String = parser.get("sourcePath")
- val targetPath:String = parser.get("targetPath")
+ val workingPath:String = parser.get("workingPath")
- val ds:Dataset[CrossrefDT] = spark.read.load(sourcePath).as[CrossrefDT]
- ds.groupByKey(_.doi)
+ val main_ds:Dataset[CrossrefDT] = spark.read.load(s"$workingPath/crossref_ds").as[CrossrefDT]
+
+
+ val update =
+ spark.createDataset(spark.sparkContext.sequenceFile(s"$workingPath/index_update", classOf[IntWritable], classOf[Text])
+ .map(i =>CrossrefImporter.decompressBlob(i._2.toString))
+ .map(i =>to_item(i)))
+
+ main_ds.union(update).groupByKey(_.doi)
.agg(crossrefAggregator.toColumn)
.map(s=>s._2)
- .write.mode(SaveMode.Overwrite).save(targetPath)
+ .write.mode(SaveMode.Overwrite).save(s"$workingPath/crossref_ds_updated")
}
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala
index 08319058c..0272cb1a6 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/SparkMapDumpIntoOAF.scala
@@ -34,85 +34,21 @@ object SparkMapDumpIntoOAF {
implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.kryo[Relation]
implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset]
- val sc = spark.sparkContext
val targetPath = parser.get("targetPath")
import spark.implicits._
-
spark.read.load(parser.get("sourcePath")).as[CrossrefDT]
.flatMap(k => Crossref2Oaf.convert(k.json))
.filter(o => o != null)
.write.mode(SaveMode.Overwrite).save(s"$targetPath/mixObject")
-
val ds:Dataset[Oaf] = spark.read.load(s"$targetPath/mixObject").as[Oaf]
- ds.filter(o => o.isInstanceOf[Publication]).map(o => o.asInstanceOf[Publication]).write.save(s"$targetPath/publication")
+ ds.filter(o => o.isInstanceOf[Publication]).map(o => o.asInstanceOf[Publication]).write.mode(SaveMode.Overwrite).save(s"$targetPath/crossrefPublication")
- ds.filter(o => o.isInstanceOf[Relation]).map(o => o.asInstanceOf[Relation]).write.save(s"$targetPath/relation")
+ ds.filter(o => o.isInstanceOf[Relation]).map(o => o.asInstanceOf[Relation]).write.mode(SaveMode.Overwrite).save(s"$targetPath/crossrefRelation")
- ds.filter(o => o.isInstanceOf[OafDataset]).map(o => o.asInstanceOf[OafDataset]).write.save(s"$targetPath/dataset")
-
-
-
-//
-//
-//
-// sc.sequenceFile(parser.get("sourcePath"), classOf[IntWritable], classOf[Text])
-// .map(k => k._2.toString).map(CrossrefImporter.decompressBlob)
-// .flatMap(k => Crossref2Oaf.convert(k)).saveAsObjectFile(s"${targetPath}/mixObject")
-//
-// val inputRDD = sc.objectFile[Oaf](s"${targetPath}/mixObject").filter(p=> p!= null)
-//
-// val distinctPubs:RDD[Publication] = inputRDD.filter(k => k != null && k.isInstanceOf[Publication])
-// .map(k => k.asInstanceOf[Publication]).map { p: Publication => Tuple2(p.getId, p) }.reduceByKey { case (p1: Publication, p2: Publication) =>
-// var r = if (p1 == null) p2 else p1
-// if (p1 != null && p2 != null) {
-// if (p1.getLastupdatetimestamp != null && p2.getLastupdatetimestamp != null) {
-// if (p1.getLastupdatetimestamp < p2.getLastupdatetimestamp)
-// r = p2
-// else
-// r = p1
-// } else {
-// r = if (p1.getLastupdatetimestamp == null) p2 else p1
-// }
-// }
-// r
-// }.map(_._2)
-//
-// val pubs:Dataset[Publication] = spark.createDataset(distinctPubs)
-// pubs.write.mode(SaveMode.Overwrite).save(s"${targetPath}/publication")
-//
-//
-// val distincDatasets:RDD[OafDataset] = inputRDD.filter(k => k != null && k.isInstanceOf[OafDataset])
-// .map(k => k.asInstanceOf[OafDataset]).map(p => Tuple2(p.getId, p)).reduceByKey { case (p1: OafDataset, p2: OafDataset) =>
-// var r = if (p1 == null) p2 else p1
-// if (p1 != null && p2 != null) {
-// if (p1.getLastupdatetimestamp != null && p2.getLastupdatetimestamp != null) {
-// if (p1.getLastupdatetimestamp < p2.getLastupdatetimestamp)
-// r = p2
-// else
-// r = p1
-// } else {
-// r = if (p1.getLastupdatetimestamp == null) p2 else p1
-// }
-// }
-// r
-// }.map(_._2)
-//
-// spark.createDataset(distincDatasets).write.mode(SaveMode.Overwrite).save(s"${targetPath}/dataset")
-//
-//
-//
-// val distinctRels =inputRDD.filter(k => k != null && k.isInstanceOf[Relation])
-// .map(k => k.asInstanceOf[Relation]).map(r=> (s"${r.getSource}::${r.getTarget}",r))
-// .reduceByKey { case (p1: Relation, p2: Relation) =>
-// if (p1 == null) p2 else p1
-// }.map(_._2)
-//
-// val rels: Dataset[Relation] = spark.createDataset(distinctRels)
-//
-// rels.write.mode(SaveMode.Overwrite).save(s"${targetPath}/relations")
+ ds.filter(o => o.isInstanceOf[OafDataset]).map(o => o.asInstanceOf[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$targetPath/crossrefDataset")
}
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml
index a9cc9ea3c..63c2e9ef2 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref/oozie_app/workflow.xml
@@ -16,88 +16,86 @@
sparkExecutorCores
number of cores used by single executor
-
-
-
-
+
+ timestamp
+ Timestamp for incremental Harvesting
+
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
-
-
-
-
-
-
+
+
+ ${jobTracker}
+ ${nameNode}
+ eu.dnetlib.doiboost.crossref.CrossrefImporter
+ -t${workingPath}/input/crossref/index_update
+ -n${nameNode}
+ -ts${timestamp}
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ ExtractCrossrefToOAF
+ eu.dnetlib.doiboost.crossref.CrossrefDataset
+ dhp-doiboost-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.sql.shuffle.partitions=3840
+ ${sparkExtraOPT}
+
+ --workingPath/data/doiboost/input/crossref
+ --masteryarn-cluster
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
yarn-cluster
cluster
- ExtractCrossrefToOAF
+ ConvertCrossrefToOAF
eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF
dhp-doiboost-${projectVersion}.jar
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
+ --conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
--sourcePath${workingPath}/input/crossref/crossref_ds
- --targetPath${workingPath}/input/crossref
+ --targetPath${workingPath}/process/
--masteryarn-cluster
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json
index 312bd0751..23c0fdabc 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json
@@ -1,6 +1,5 @@
[
- {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
- {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true},
+ {"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working dir path", "paramRequired": true},
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
]
\ No newline at end of file
diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml
index f1167b184..190c9847e 100644
--- a/dhp-workflows/pom.xml
+++ b/dhp-workflows/pom.xml
@@ -7,7 +7,7 @@
eu.dnetlib.dhp
dhp
1.2.4-SNAPSHOT
- ../
+ ../pom.xml
dhp-workflows