enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
2 changed files with 41 additions and 20 deletions
Showing only changes of commit 147dd389bf - Show all commits

View File

@ -23,21 +23,20 @@ object SparkGenerateDoiBoost {
.master(parser.get("master")).getOrCreate() .master(parser.get("master")).getOrCreate()
val crossrefPublicationPath = parser.get("crossrefPublicationPath")
val crossrefPublicationPath =parser.get("crossrefPublicationPath") val crossrefDatasetPath = parser.get("crossrefDatasetPath")
val crossrefDatasetPath =parser.get("crossrefDatasetPath") val uwPublicationPath = parser.get("uwPublicationPath")
val uwPublicationPath =parser.get("uwPublicationPath") val magPublicationPath = parser.get("magPublicationPath")
val magPublicationPath =parser.get("magPublicationPath") val orcidPublicationPath = parser.get("orcidPublicationPath")
val orcidPublicationPath =parser.get("orcidPublicationPath") val workingDirPath = parser.get("workingDirPath")
val workingDirPath =parser.get("workingDirPath")
logger.info("Phase 1) repartition and move all the dataset in a same working folder") logger.info("Phase 1) repartition and move all the dataset in a same working folder")
spark.read.load(crossrefPublicationPath).as(Encoders.bean(classOf[Publication])).map(s=>s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefPublication") spark.read.load(crossrefPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefPublication")
spark.read.load(crossrefDatasetPath).as(Encoders.bean(classOf[OafDataset])).map(s=>s)(Encoders.kryo[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefDataset") spark.read.load(crossrefDatasetPath).as(Encoders.bean(classOf[OafDataset])).map(s => s)(Encoders.kryo[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefDataset")
spark.read.load(uwPublicationPath).as(Encoders.bean(classOf[Publication])).map(s=>s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/uwPublication") spark.read.load(uwPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/uwPublication")
spark.read.load(orcidPublicationPath).as(Encoders.bean(classOf[Publication])).map(s=>s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/orcidPublication") spark.read.load(orcidPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/orcidPublication")
spark.read.load(magPublicationPath).as(Encoders.bean(classOf[Publication])).map(s=>s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/magPublication") spark.read.load(magPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/magPublication")
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset] implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
@ -45,17 +44,39 @@ object SparkGenerateDoiBoost {
logger.info("Phase 2) Join Crossref with UnpayWall") logger.info("Phase 2) Join Crossref with UnpayWall")
val crossrefPublication:Dataset[(String,Publication)] = spark.read.load(s"$workingDirPath/crossrefPublication").as[Publication].map(p =>(p.getId, p) ) val crossrefPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/crossrefPublication").as[Publication].map(p => (p.getId, p))
val uwPublication:Dataset[(String,Publication)] = spark.read.load(s"$workingDirPath/crossrefPublication").as[Publication].map(p =>(p.getId, p) ) val uwPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/uwPublication").as[Publication].map(p => (p.getId, p))
crossrefPublication.joinWith(uwPublication, crossrefPublication("_1").equalTo(uwPublication("_1")),"left").map(item => { def applyMerge(item:((String, Publication), (String, Publication))) : Publication =
{
val crossrefPub = item._1._2 val crossrefPub = item._1._2
val unpayWallPub = item._1._2 if (item._2!= null) {
if(unpayWallPub!= null) { val otherPub = item._2._2
crossrefPub.mergeFrom(unpayWallPub) if (otherPub != null) {
crossrefPub.mergeFrom(otherPub)
}
} }
crossrefPub crossrefPub
}).write.save(s"$workingDirPath/firstJoin") }
crossrefPublication.joinWith(uwPublication, crossrefPublication("_1").equalTo(uwPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/firstJoin")
logger.info("Phase 3) Join Result with ORCID")
val fj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/firstJoin").as[Publication].map(p => (p.getId, p))
val orcidPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/orcidPublication").as[Publication].map(p => (p.getId, p))
fj.joinWith(orcidPublication, fj("_1").equalTo(orcidPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/secondJoin")
logger.info("Phase 3) Join Result with MAG")
val sj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/secondJoin").as[Publication].map(p => (p.getId, p))
sj.where(sj("_1").like())
val magPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/magPublication").as[Publication].map(p => (p.getId, p))
sj.joinWith(magPublication, sj("_1").equalTo(magPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublication")
} }

View File

@ -40,7 +40,7 @@
<start to="ResetWorkingPath"/> <start to="CreateDOIBoost"/>
<kill name="Kill"> <kill name="Kill">