From 683fe093cff2a76fb42d27c52c5c61831f48c268 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 3 Nov 2021 15:51:26 +0100 Subject: [PATCH] [DOIBoost - Mapping] Remove the addition of the instance to the MAG publication record --- .../doiboost/mag/SparkProcessMAG.scala | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala index ecb389af8..c011cbd20 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkProcessMAG.scala @@ -111,22 +111,24 @@ object SparkProcessMAG { .map(item => ConversionUtil.updatePubsWithConferenceInfo(item)) .write .mode(SaveMode.Overwrite) - .save(s"$workingPath/merge_step_2_conference") - - - magPubs= spark.read.load(s"$workingPath/merge_step_2_conference").as[Publication] - .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] - - val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl] - - - logger.info("Phase 5) enrich publication with URL and Instances") - magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left") - .map { a: ((String, Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2)) } - .write.mode(SaveMode.Overwrite) .save(s"$workingPath/merge_step_3") + //no more needed to add the instance to mag records +// magPubs= spark.read.load(s"$workingPath/merge_step_2_conference").as[Publication] +// .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] +// +// val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl] +// +// +// +// logger.info("Phase 5) enrich publication with URL and Instances") +// magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left") +// .map { a: ((String, Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2)) } +// .write.mode(SaveMode.Overwrite) +// .save(s"$workingPath/merge_step_3") + + // logger.info("Phase 6) Enrich Publication with description") // val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract] // pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")