From 7834a357683cf3a340794e03945825dd780f0c19 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 4 Jan 2021 17:54:57 +0100 Subject: [PATCH] avoid to save intermediate dataset before generation of Sequence file --- .../SparkGenerateDOIBoostActionSet.scala | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala index da9da22b6..78477ae4d 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala @@ -41,30 +41,34 @@ object SparkGenerateDOIBoostActionSet { val workingDirPath = parser.get("targetPath") val sequenceFilePath = parser.get("sFilePath") - spark.read.load(dbDatasetPath).as[OafDataset] + val asDataset = spark.read.load(dbDatasetPath).as[OafDataset] .map(d =>DoiBoostMappingUtil.fixResult(d)) .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) - .write.mode(SaveMode.Overwrite).save(s"$workingDirPath/actionSet") +// .write.mode(SaveMode.Overwrite).save(s"$workingDirPath/actionSet") - spark.read.load(dbPublicationPath).as[Publication] + val asPublication =spark.read.load(dbPublicationPath).as[Publication] .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) - .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") +// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") - spark.read.load(dbOrganizationPath).as[Organization] + val asOrganization = spark.read.load(dbOrganizationPath).as[Organization] .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) - .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") +// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") - spark.read.load(crossRefRelation).as[Relation] + val asCRelation = spark.read.load(crossRefRelation).as[Relation] .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) - .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") +// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") - spark.read.load(dbaffiliationRelationPath).as[Relation] + val asRelAffiliation = spark.read.load(dbaffiliationRelationPath).as[Relation] .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) - .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") +// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") - val d: Dataset[(String, String)] =spark.read.load(s"$workingDirPath/actionSet").as[(String,String)] + + + val d: Dataset[(String, String)] = asDataset.union(asPublication).union(asOrganization).union(asCRelation).union(asRelAffiliation) + +// spark.read.load(s"$workingDirPath/actionSet").as[(String,String)] d.rdd.repartition(6000).map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$sequenceFilePath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])