1
0
Fork 0

compress the output produced by migration steps 1 and 2

This commit is contained in:
Claudio Atzori 2020-03-18 09:34:57 +01:00
parent 2f11e37602
commit c7e0730720
2 changed files with 4 additions and 2 deletions

View File

@ -15,6 +15,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@ -87,7 +88,7 @@ public class GenerateEntitiesApplication {
.map(oaf -> oaf.getClass().getSimpleName().toLowerCase() + "|" + convertToJson(oaf))); .map(oaf -> oaf.getClass().getSimpleName().toLowerCase() + "|" + convertToJson(oaf)));
} }
inputRdd.saveAsTextFile(targetPath); inputRdd.saveAsTextFile(targetPath, GzipCodec.class);
} }

View File

@ -4,6 +4,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@ -60,7 +61,7 @@ public class DispatchEntitiesApplication {
sc.textFile(sourcePath) sc.textFile(sourcePath)
.filter(l -> isEntityType(l, type)) .filter(l -> isEntityType(l, type))
.map(l -> StringUtils.substringAfter(l, "|")) .map(l -> StringUtils.substringAfter(l, "|"))
.saveAsTextFile(targetPath + "/" + type); // use repartition(XXX) ??? .saveAsTextFile(targetPath + "/" + type, GzipCodec.class); // use repartition(XXX) ???
} }
private static boolean isEntityType(final String line, final String type) { private static boolean isEntityType(final String line, final String type) {