diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala index da9da22b6..78477ae4d 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala @@ -41,30 +41,34 @@ object SparkGenerateDOIBoostActionSet { val workingDirPath = parser.get("targetPath") val sequenceFilePath = parser.get("sFilePath") - spark.read.load(dbDatasetPath).as[OafDataset] + val asDataset = spark.read.load(dbDatasetPath).as[OafDataset] .map(d =>DoiBoostMappingUtil.fixResult(d)) .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) - .write.mode(SaveMode.Overwrite).save(s"$workingDirPath/actionSet") +// .write.mode(SaveMode.Overwrite).save(s"$workingDirPath/actionSet") - spark.read.load(dbPublicationPath).as[Publication] + val asPublication =spark.read.load(dbPublicationPath).as[Publication] .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) - .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") +// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") - spark.read.load(dbOrganizationPath).as[Organization] + val asOrganization = spark.read.load(dbOrganizationPath).as[Organization] .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) - .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") +// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") - spark.read.load(crossRefRelation).as[Relation] + val asCRelation = spark.read.load(crossRefRelation).as[Relation] .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) - .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") +// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") - spark.read.load(dbaffiliationRelationPath).as[Relation] + val asRelAffiliation = spark.read.load(dbaffiliationRelationPath).as[Relation] .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) - .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") +// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") - val d: Dataset[(String, String)] =spark.read.load(s"$workingDirPath/actionSet").as[(String,String)] + + + val d: Dataset[(String, String)] = asDataset.union(asPublication).union(asOrganization).union(asCRelation).union(asRelAffiliation) + +// spark.read.load(s"$workingDirPath/actionSet").as[(String,String)] d.rdd.repartition(6000).map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$sequenceFilePath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])