diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java index 7826f598b..c4efaefba 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java @@ -8,6 +8,7 @@ import eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob2; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -58,9 +59,6 @@ public class SparkResultToOrganizationFromIstRepoJob2 { final String alreadylinked = parser.get("alreadyLinkedPath"); log.info("alreadyLinkedPath: {}", alreadylinked); - final String resultorganizationsetpath = parser.get("resultOrganizationsetPath"); - log.info("resultOrganizationsetPath: {}", resultorganizationsetpath); - final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); @@ -93,9 +91,6 @@ public class SparkResultToOrganizationFromIstRepoJob2 { writeUpdates, saveGraph); }); - - - } private static void execPropagation(SparkSession spark, String datasourceorganization, String alreadylinked, String inputPath, @@ -122,17 +117,20 @@ public class SparkResultToOrganizationFromIstRepoJob2 { .read() .textFile(alreadylinked) .map(value -> OBJECT_MAPPER.readValue(value, ResultOrganizationSet.class), - Encoders.bean(ResultOrganizationSet.class)), potentialUpdates); + Encoders.bean(ResultOrganizationSet.class)), potentialUpdates) + .toJavaRDD() + .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + .saveAsTextFile(outputPath , GzipCodec.class); } } private static Dataset getNewRelations(Dataset alreadyLinked, Dataset potentialUpdates) { - return potentialUpdates - .joinWith(alreadyLinked, potentialUpdates.col("resultId") - .equalTo(alreadyLinked.col("resultId")), "left_outer") - .flatMap((FlatMapFunction, Relation>) value -> { + + return potentialUpdates + .joinWith(alreadyLinked, potentialUpdates.col("resultId") + .equalTo(alreadyLinked.col("resultId")), "left_outer").flatMap((FlatMapFunction, Relation>) value -> { List new_relations = new ArrayList<>(); ResultOrganizationSet potential_update = value._1(); Optional already_linked = Optional.ofNullable(value._2());