diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java index c4efaefba..82c69e927 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob2.java @@ -35,7 +35,7 @@ public class SparkResultToOrganizationFromIstRepoJob2 { public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils.toString(SparkCountryPropagationJob2.class + String jsonConfiguration = IOUtils.toString(SparkResultToOrganizationFromIstRepoJob2.class .getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -118,9 +118,14 @@ public class SparkResultToOrganizationFromIstRepoJob2 { .textFile(alreadylinked) .map(value -> OBJECT_MAPPER.readValue(value, ResultOrganizationSet.class), Encoders.bean(ResultOrganizationSet.class)), potentialUpdates) - .toJavaRDD() - .map(r -> OBJECT_MAPPER.writeValueAsString(r)) - .saveAsTextFile(outputPath , GzipCodec.class); + .toJSON() + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .text(outputPath); +// .toJavaRDD() +// .map(r -> OBJECT_MAPPER.writeValueAsString(r)) +// .saveAsTextFile(outputPath , GzipCodec.class); }