From 147dd389bfc05a5a3e9390fbaabbf060c24c7149 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 22 May 2020 20:51:42 +0200 Subject: [PATCH] minor fix --- .../doiboost/SparkGenerateDoiBoost.scala | 59 +++++++++++++------ .../dhp/doiboost/oozie_app/workflow.xml | 2 +- 2 files changed, 41 insertions(+), 20 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala index 79c246d17..4d8c4a342 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala @@ -23,21 +23,20 @@ object SparkGenerateDoiBoost { .master(parser.get("master")).getOrCreate() - - val crossrefPublicationPath =parser.get("crossrefPublicationPath") - val crossrefDatasetPath =parser.get("crossrefDatasetPath") - val uwPublicationPath =parser.get("uwPublicationPath") - val magPublicationPath =parser.get("magPublicationPath") - val orcidPublicationPath =parser.get("orcidPublicationPath") - val workingDirPath =parser.get("workingDirPath") + val crossrefPublicationPath = parser.get("crossrefPublicationPath") + val crossrefDatasetPath = parser.get("crossrefDatasetPath") + val uwPublicationPath = parser.get("uwPublicationPath") + val magPublicationPath = parser.get("magPublicationPath") + val orcidPublicationPath = parser.get("orcidPublicationPath") + val workingDirPath = parser.get("workingDirPath") logger.info("Phase 1) repartition and move all the dataset in a same working folder") - spark.read.load(crossrefPublicationPath).as(Encoders.bean(classOf[Publication])).map(s=>s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefPublication") - spark.read.load(crossrefDatasetPath).as(Encoders.bean(classOf[OafDataset])).map(s=>s)(Encoders.kryo[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefDataset") - spark.read.load(uwPublicationPath).as(Encoders.bean(classOf[Publication])).map(s=>s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/uwPublication") - spark.read.load(orcidPublicationPath).as(Encoders.bean(classOf[Publication])).map(s=>s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/orcidPublication") - spark.read.load(magPublicationPath).as(Encoders.bean(classOf[Publication])).map(s=>s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/magPublication") + spark.read.load(crossrefPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefPublication") + spark.read.load(crossrefDatasetPath).as(Encoders.bean(classOf[OafDataset])).map(s => s)(Encoders.kryo[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefDataset") + spark.read.load(uwPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/uwPublication") + spark.read.load(orcidPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/orcidPublication") + spark.read.load(magPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/magPublication") implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset] @@ -45,17 +44,39 @@ object SparkGenerateDoiBoost { logger.info("Phase 2) Join Crossref with UnpayWall") - val crossrefPublication:Dataset[(String,Publication)] = spark.read.load(s"$workingDirPath/crossrefPublication").as[Publication].map(p =>(p.getId, p) ) - val uwPublication:Dataset[(String,Publication)] = spark.read.load(s"$workingDirPath/crossrefPublication").as[Publication].map(p =>(p.getId, p) ) + val crossrefPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/crossrefPublication").as[Publication].map(p => (p.getId, p)) + val uwPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/uwPublication").as[Publication].map(p => (p.getId, p)) - crossrefPublication.joinWith(uwPublication, crossrefPublication("_1").equalTo(uwPublication("_1")),"left").map(item => { + def applyMerge(item:((String, Publication), (String, Publication))) : Publication = + { val crossrefPub = item._1._2 - val unpayWallPub = item._1._2 - if(unpayWallPub!= null) { - crossrefPub.mergeFrom(unpayWallPub) + if (item._2!= null) { + val otherPub = item._2._2 + if (otherPub != null) { + crossrefPub.mergeFrom(otherPub) + } } crossrefPub - }).write.save(s"$workingDirPath/firstJoin") + } + crossrefPublication.joinWith(uwPublication, crossrefPublication("_1").equalTo(uwPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/firstJoin") + logger.info("Phase 3) Join Result with ORCID") + val fj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/firstJoin").as[Publication].map(p => (p.getId, p)) + val orcidPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/orcidPublication").as[Publication].map(p => (p.getId, p)) + fj.joinWith(orcidPublication, fj("_1").equalTo(orcidPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/secondJoin") + + logger.info("Phase 3) Join Result with MAG") + val sj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/secondJoin").as[Publication].map(p => (p.getId, p)) + + sj.where(sj("_1").like()) + + val magPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/magPublication").as[Publication].map(p => (p.getId, p)) + sj.joinWith(magPublication, sj("_1").equalTo(magPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublication") + + + + + + } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml index 639fb4d98..f07ed944e 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/oozie_app/workflow.xml @@ -40,7 +40,7 @@ - +