From ceb1f299bf299ea5e8374298e2c7b50c3931a686 Mon Sep 17 00:00:00 2001
From: "miriam.baglioni" <miriam.baglioni@isti.cnr.it>
Date: Tue, 14 Apr 2020 16:45:12 +0200
Subject: [PATCH] minor changes

---
 .../PrepareProjectResultsAssociation.java     | 26 +++++++++++++------
 1 file changed, 18 insertions(+), 8 deletions(-)

diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java
index 39adf92a4..f94d67734 100644
--- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java
+++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java
@@ -55,8 +55,8 @@ public class PrepareProjectResultsAssociation {
 
         runWithSparkHiveSession(conf, isSparkSessionManaged,
                 spark -> {
-                    removeOutputDir(spark, potentialUpdatePath);
-                    removeOutputDir(spark, alreadyLinkedPath);
+//                    removeOutputDir(spark, potentialUpdatePath);
+//                    removeOutputDir(spark, alreadyLinkedPath);
                     prepareResultProjProjectResults(spark, inputPath, potentialUpdatePath, alreadyLinkedPath, allowedsemrel);
 
                 });
@@ -91,9 +91,14 @@ public class PrepareProjectResultsAssociation {
                 "GROUP BY projectId ";
 
         spark.sql(query).as(Encoders.bean(ProjectResultSet.class))
-                .toJavaRDD()
-                .map(r -> OBJECT_MAPPER.writeValueAsString(r))
-                .saveAsTextFile(potentialUpdatePath, GzipCodec.class);
+                .toJSON()
+                .write()
+                .mode(SaveMode.Overwrite)
+                .option("compression","gzip")
+                .text(potentialUpdatePath);
+//                .toJavaRDD()
+//                .map(r -> OBJECT_MAPPER.writeValueAsString(r))
+//                .saveAsTextFile(potentialUpdatePath, GzipCodec.class);
 
 
         query = "SELECT target projectId, collect_set(source) resultSet " +
@@ -102,9 +107,14 @@ public class PrepareProjectResultsAssociation {
 
         spark.sql(query)
                 .as(Encoders.bean(ProjectResultSet.class))
-                .toJavaRDD()
-                .map(r -> OBJECT_MAPPER.writeValueAsString(r))
-                .saveAsTextFile(alreadyLinkedPath, GzipCodec.class);
+                .toJSON()
+                .write()
+                .mode(SaveMode.Overwrite)
+                .option("compression","gzip")
+                .text(alreadyLinkedPath);
+//                .toJavaRDD()
+//                .map(r -> OBJECT_MAPPER.writeValueAsString(r))
+//                .saveAsTextFile(alreadyLinkedPath, GzipCodec.class);
 
 
     }