From c7e0730720cb2245134307c4a8d9e24942457363 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 18 Mar 2020 09:34:57 +0100 Subject: [PATCH] compress the output produced by migration steps 1 and 2 --- .../dhp/migration/step2/GenerateEntitiesApplication.java | 3 ++- .../dhp/migration/step3/DispatchEntitiesApplication.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java index 775e5e7d8..7f907b0c8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java @@ -15,6 +15,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; @@ -87,7 +88,7 @@ public class GenerateEntitiesApplication { .map(oaf -> oaf.getClass().getSimpleName().toLowerCase() + "|" + convertToJson(oaf))); } - inputRdd.saveAsTextFile(targetPath); + inputRdd.saveAsTextFile(targetPath, GzipCodec.class); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java index 4f10068e7..4ee24cba0 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java @@ -4,6 +4,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; @@ -60,7 +61,7 @@ public class DispatchEntitiesApplication { sc.textFile(sourcePath) .filter(l -> isEntityType(l, type)) .map(l -> StringUtils.substringAfter(l, "|")) - .saveAsTextFile(targetPath + "/" + type); // use repartition(XXX) ??? + .saveAsTextFile(targetPath + "/" + type, GzipCodec.class); // use repartition(XXX) ??? } private static boolean isEntityType(final String line, final String type) {