diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java index 39adf92a4..f94d67734 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java @@ -55,8 +55,8 @@ public class PrepareProjectResultsAssociation { runWithSparkHiveSession(conf, isSparkSessionManaged, spark -> { - removeOutputDir(spark, potentialUpdatePath); - removeOutputDir(spark, alreadyLinkedPath); +// removeOutputDir(spark, potentialUpdatePath); +// removeOutputDir(spark, alreadyLinkedPath); prepareResultProjProjectResults(spark, inputPath, potentialUpdatePath, alreadyLinkedPath, allowedsemrel); }); @@ -91,9 +91,14 @@ public class PrepareProjectResultsAssociation { "GROUP BY projectId "; spark.sql(query).as(Encoders.bean(ProjectResultSet.class)) - .toJavaRDD() - .map(r -> OBJECT_MAPPER.writeValueAsString(r)) - .saveAsTextFile(potentialUpdatePath, GzipCodec.class); + .toJSON() + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .text(potentialUpdatePath); +// .toJavaRDD() +// .map(r -> OBJECT_MAPPER.writeValueAsString(r)) +// .saveAsTextFile(potentialUpdatePath, GzipCodec.class); query = "SELECT target projectId, collect_set(source) resultSet " + @@ -102,9 +107,14 @@ public class PrepareProjectResultsAssociation { spark.sql(query) .as(Encoders.bean(ProjectResultSet.class)) - .toJavaRDD() - .map(r -> OBJECT_MAPPER.writeValueAsString(r)) - .saveAsTextFile(alreadyLinkedPath, GzipCodec.class); + .toJSON() + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .text(alreadyLinkedPath); +// .toJavaRDD() +// .map(r -> OBJECT_MAPPER.writeValueAsString(r)) +// .saveAsTextFile(alreadyLinkedPath, GzipCodec.class); }