[dedup] use common saveParquet
and save
methods to ensure outputs are compressed #349
|
@ -7,6 +7,7 @@ import java.util.Optional;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SaveMode;
|
import org.apache.spark.sql.SaveMode;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
@ -77,13 +78,12 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
|
||||||
|
|
||||||
log.info("Number of Openorgs Merge Relations collected: {}", mergeRelsRDD.count());
|
log.info("Number of Openorgs Merge Relations collected: {}", mergeRelsRDD.count());
|
||||||
|
|
||||||
spark
|
final Dataset<Relation> relations = spark
|
||||||
.createDataset(
|
.createDataset(
|
||||||
mergeRelsRDD.rdd(),
|
mergeRelsRDD.rdd(),
|
||||||
Encoders.bean(Relation.class))
|
Encoders.bean(Relation.class));
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Append)
|
saveParquet(relations, outputPath, SaveMode.Append);
|
||||||
.parquet(outputPath);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isMergeRel(Relation rel) {
|
private boolean isMergeRel(Relation rel) {
|
||||||
|
|
|
@ -67,12 +67,7 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction {
|
||||||
log.debug("Number of non-Openorgs relations collected: {}", simRels.count());
|
log.debug("Number of non-Openorgs relations collected: {}", simRels.count());
|
||||||
}
|
}
|
||||||
|
|
||||||
spark
|
save(spark.createDataset(simRels.rdd(), Encoders.bean(Relation.class)), outputPath, SaveMode.Overwrite);
|
||||||
.createDataset(simRels.rdd(), Encoders.bean(Relation.class))
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.json(outputPath);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -155,7 +155,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
|
||||||
(FlatMapFunction<ConnectedComponent, Relation>) cc -> ccToMergeRel(cc, dedupConf),
|
(FlatMapFunction<ConnectedComponent, Relation>) cc -> ccToMergeRel(cc, dedupConf),
|
||||||
Encoders.bean(Relation.class));
|
Encoders.bean(Relation.class));
|
||||||
|
|
||||||
mergeRels.write().mode(SaveMode.Overwrite).parquet(mergeRelPath);
|
saveParquet(mergeRels, mergeRelPath, SaveMode.Overwrite);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,11 +72,7 @@ public class SparkCreateOrgsDedupRecord extends AbstractSparkAction {
|
||||||
|
|
||||||
final String mergeRelsPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
|
final String mergeRelsPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
|
||||||
|
|
||||||
rootOrganization(spark, entityPath, mergeRelsPath)
|
save(rootOrganization(spark, entityPath, mergeRelsPath), outputPath, SaveMode.Overwrite);
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.json(outputPath);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -82,8 +82,6 @@ public class SparkCreateSimRels extends AbstractSparkAction {
|
||||||
final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity);
|
final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity);
|
||||||
removeOutputDir(spark, outputPath);
|
removeOutputDir(spark, outputPath);
|
||||||
|
|
||||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
|
||||||
|
|
||||||
SparkDeduper deduper = new SparkDeduper(dedupConf);
|
SparkDeduper deduper = new SparkDeduper(dedupConf);
|
||||||
|
|
||||||
Dataset<?> simRels = spark
|
Dataset<?> simRels = spark
|
||||||
|
|
|
@ -67,8 +67,6 @@ public class SparkWhitelistSimRels extends AbstractSparkAction {
|
||||||
log.info("workingPath: '{}'", workingPath);
|
log.info("workingPath: '{}'", workingPath);
|
||||||
log.info("whiteListPath: '{}'", whiteListPath);
|
log.info("whiteListPath: '{}'", whiteListPath);
|
||||||
|
|
||||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
|
||||||
|
|
||||||
// file format: source####target
|
// file format: source####target
|
||||||
Dataset<Row> whiteListRels = spark
|
Dataset<Row> whiteListRels = spark
|
||||||
.read()
|
.read()
|
||||||
|
|
Loading…
Reference in New Issue