diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala
index 79c246d17d..4d8c4a3423 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala
@@ -23,21 +23,20 @@ object SparkGenerateDoiBoost {
.master(parser.get("master")).getOrCreate()
-
- val crossrefPublicationPath =parser.get("crossrefPublicationPath")
- val crossrefDatasetPath =parser.get("crossrefDatasetPath")
- val uwPublicationPath =parser.get("uwPublicationPath")
- val magPublicationPath =parser.get("magPublicationPath")
- val orcidPublicationPath =parser.get("orcidPublicationPath")
- val workingDirPath =parser.get("workingDirPath")
+ val crossrefPublicationPath = parser.get("crossrefPublicationPath")
+ val crossrefDatasetPath = parser.get("crossrefDatasetPath")
+ val uwPublicationPath = parser.get("uwPublicationPath")
+ val magPublicationPath = parser.get("magPublicationPath")
+ val orcidPublicationPath = parser.get("orcidPublicationPath")
+ val workingDirPath = parser.get("workingDirPath")
logger.info("Phase 1) repartition and move all the dataset in a same working folder")
- spark.read.load(crossrefPublicationPath).as(Encoders.bean(classOf[Publication])).map(s=>s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefPublication")
- spark.read.load(crossrefDatasetPath).as(Encoders.bean(classOf[OafDataset])).map(s=>s)(Encoders.kryo[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefDataset")
- spark.read.load(uwPublicationPath).as(Encoders.bean(classOf[Publication])).map(s=>s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/uwPublication")
- spark.read.load(orcidPublicationPath).as(Encoders.bean(classOf[Publication])).map(s=>s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/orcidPublication")
- spark.read.load(magPublicationPath).as(Encoders.bean(classOf[Publication])).map(s=>s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/magPublication")
+ spark.read.load(crossrefPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefPublication")
+ spark.read.load(crossrefDatasetPath).as(Encoders.bean(classOf[OafDataset])).map(s => s)(Encoders.kryo[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefDataset")
+ spark.read.load(uwPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/uwPublication")
+ spark.read.load(orcidPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/orcidPublication")
+ spark.read.load(magPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/magPublication")
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
@@ -45,17 +44,39 @@ object SparkGenerateDoiBoost {
logger.info("Phase 2) Join Crossref with UnpayWall")
- val crossrefPublication:Dataset[(String,Publication)] = spark.read.load(s"$workingDirPath/crossrefPublication").as[Publication].map(p =>(p.getId, p) )
- val uwPublication:Dataset[(String,Publication)] = spark.read.load(s"$workingDirPath/crossrefPublication").as[Publication].map(p =>(p.getId, p) )
+ val crossrefPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/crossrefPublication").as[Publication].map(p => (p.getId, p))
+ val uwPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/uwPublication").as[Publication].map(p => (p.getId, p))
- crossrefPublication.joinWith(uwPublication, crossrefPublication("_1").equalTo(uwPublication("_1")),"left").map(item => {
+ def applyMerge(item:((String, Publication), (String, Publication))) : Publication =
+ {
val crossrefPub = item._1._2
- val unpayWallPub = item._1._2
- if(unpayWallPub!= null) {
- crossrefPub.mergeFrom(unpayWallPub)
+ if (item._2!= null) {
+ val otherPub = item._2._2
+ if (otherPub != null) {
+ crossrefPub.mergeFrom(otherPub)
+ }
}
crossrefPub
- }).write.save(s"$workingDirPath/firstJoin")
+ }
+ crossrefPublication.joinWith(uwPublication, crossrefPublication("_1").equalTo(uwPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/firstJoin")
+ logger.info("Phase 3) Join Result with ORCID")
+ val fj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/firstJoin").as[Publication].map(p => (p.getId, p))
+ val orcidPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/orcidPublication").as[Publication].map(p => (p.getId, p))
+ fj.joinWith(orcidPublication, fj("_1").equalTo(orcidPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/secondJoin")
+
+ logger.info("Phase 3) Join Result with MAG")
+ val sj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/secondJoin").as[Publication].map(p => (p.getId, p))
+
+ sj.where(sj("_1").like())
+
+ val magPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/magPublication").as[Publication].map(p => (p.getId, p))
+ sj.joinWith(magPublication, sj("_1").equalTo(magPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublication")
+
+
+
+
+
+
}
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml
index 639fb4d98a..f07ed944e9 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml
@@ -40,7 +40,7 @@
-
+