1
0
Fork 0

[DOIBoost - Mapping] Remove the addition of the instance to the MAG publication record

This commit is contained in:
Miriam Baglioni 2021-11-03 15:51:26 +01:00
parent b2bb8d9d79
commit 683fe093cf
1 changed files with 15 additions and 13 deletions

View File

@ -111,22 +111,24 @@ object SparkProcessMAG {
.map(item => ConversionUtil.updatePubsWithConferenceInfo(item)) .map(item => ConversionUtil.updatePubsWithConferenceInfo(item))
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(s"$workingPath/merge_step_2_conference")
magPubs= spark.read.load(s"$workingPath/merge_step_2_conference").as[Publication]
.map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl]
logger.info("Phase 5) enrich publication with URL and Instances")
magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left")
.map { a: ((String, Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2)) }
.write.mode(SaveMode.Overwrite)
.save(s"$workingPath/merge_step_3") .save(s"$workingPath/merge_step_3")
//no more needed to add the instance to mag records
// magPubs= spark.read.load(s"$workingPath/merge_step_2_conference").as[Publication]
// .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
//
// val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl]
//
//
//
// logger.info("Phase 5) enrich publication with URL and Instances")
// magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left")
// .map { a: ((String, Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2)) }
// .write.mode(SaveMode.Overwrite)
// .save(s"$workingPath/merge_step_3")
// logger.info("Phase 6) Enrich Publication with description") // logger.info("Phase 6) Enrich Publication with description")
// val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract] // val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract]
// pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract") // pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")