[dedup] use common saveParquet
and save
methods to ensure outputs are compressed #349
|
@ -7,6 +7,7 @@ import java.util.Optional;
|
|||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
@ -77,13 +78,12 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
|
|||
|
||||
log.info("Number of Openorgs Merge Relations collected: {}", mergeRelsRDD.count());
|
||||
|
||||
spark
|
||||
final Dataset<Relation> relations = spark
|
||||
.createDataset(
|
||||
mergeRelsRDD.rdd(),
|
||||
Encoders.bean(Relation.class))
|
||||
.write()
|
||||
.mode(SaveMode.Append)
|
||||
.parquet(outputPath);
|
||||
Encoders.bean(Relation.class));
|
||||
|
||||
saveParquet(relations, outputPath, SaveMode.Append);
|
||||
}
|
||||
|
||||
private boolean isMergeRel(Relation rel) {
|
||||
|
|
|
@ -67,12 +67,7 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction {
|
|||
log.debug("Number of non-Openorgs relations collected: {}", simRels.count());
|
||||
}
|
||||
|
||||
spark
|
||||
.createDataset(simRels.rdd(), Encoders.bean(Relation.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath);
|
||||
|
||||
save(spark.createDataset(simRels.rdd(), Encoders.bean(Relation.class)), outputPath, SaveMode.Overwrite);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -155,7 +155,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
|
|||
(FlatMapFunction<ConnectedComponent, Relation>) cc -> ccToMergeRel(cc, dedupConf),
|
||||
Encoders.bean(Relation.class));
|
||||
|
||||
mergeRels.write().mode(SaveMode.Overwrite).parquet(mergeRelPath);
|
||||
saveParquet(mergeRels, mergeRelPath, SaveMode.Overwrite);
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,11 +72,7 @@ public class SparkCreateOrgsDedupRecord extends AbstractSparkAction {
|
|||
|
||||
final String mergeRelsPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
|
||||
|
||||
rootOrganization(spark, entityPath, mergeRelsPath)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath);
|
||||
save(rootOrganization(spark, entityPath, mergeRelsPath), outputPath, SaveMode.Overwrite);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -82,8 +82,6 @@ public class SparkCreateSimRels extends AbstractSparkAction {
|
|||
final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity);
|
||||
removeOutputDir(spark, outputPath);
|
||||
|
||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
SparkDeduper deduper = new SparkDeduper(dedupConf);
|
||||
|
||||
Dataset<?> simRels = spark
|
||||
|
|
|
@ -67,8 +67,6 @@ public class SparkWhitelistSimRels extends AbstractSparkAction {
|
|||
log.info("workingPath: '{}'", workingPath);
|
||||
log.info("whiteListPath: '{}'", whiteListPath);
|
||||
|
||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
// file format: source####target
|
||||
Dataset<Row> whiteListRels = spark
|
||||
.read()
|
||||
|
|
Loading…
Reference in New Issue