1
0
Fork 0

Merge pull request '[dedup] use common `saveParquet` and `save` methods to ensure outputs are compressed' (#349) from fix_dedup_not_compressed into beta

Reviewed-on: D-Net/dnet-hadoop#349
This commit is contained in:
Claudio Atzori 2023-10-16 11:56:18 +02:00
commit 389e3fcc59
6 changed files with 8 additions and 21 deletions

View File

@ -7,6 +7,7 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@ -77,13 +78,12 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
log.info("Number of Openorgs Merge Relations collected: {}", mergeRelsRDD.count()); log.info("Number of Openorgs Merge Relations collected: {}", mergeRelsRDD.count());
spark final Dataset<Relation> relations = spark
.createDataset( .createDataset(
mergeRelsRDD.rdd(), mergeRelsRDD.rdd(),
Encoders.bean(Relation.class)) Encoders.bean(Relation.class));
.write()
.mode(SaveMode.Append) saveParquet(relations, outputPath, SaveMode.Append);
.parquet(outputPath);
} }
private boolean isMergeRel(Relation rel) { private boolean isMergeRel(Relation rel) {

View File

@ -67,12 +67,7 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction {
log.debug("Number of non-Openorgs relations collected: {}", simRels.count()); log.debug("Number of non-Openorgs relations collected: {}", simRels.count());
} }
spark save(spark.createDataset(simRels.rdd(), Encoders.bean(Relation.class)), outputPath, SaveMode.Overwrite);
.createDataset(simRels.rdd(), Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Overwrite)
.json(outputPath);
} }
} }

View File

@ -155,7 +155,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
(FlatMapFunction<ConnectedComponent, Relation>) cc -> ccToMergeRel(cc, dedupConf), (FlatMapFunction<ConnectedComponent, Relation>) cc -> ccToMergeRel(cc, dedupConf),
Encoders.bean(Relation.class)); Encoders.bean(Relation.class));
mergeRels.write().mode(SaveMode.Overwrite).parquet(mergeRelPath); saveParquet(mergeRels, mergeRelPath, SaveMode.Overwrite);
} }
} }

View File

@ -72,11 +72,7 @@ public class SparkCreateOrgsDedupRecord extends AbstractSparkAction {
final String mergeRelsPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization"); final String mergeRelsPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, "organization");
rootOrganization(spark, entityPath, mergeRelsPath) save(rootOrganization(spark, entityPath, mergeRelsPath), outputPath, SaveMode.Overwrite);
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
} }

View File

@ -82,8 +82,6 @@ public class SparkCreateSimRels extends AbstractSparkAction {
final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity); final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity);
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
SparkDeduper deduper = new SparkDeduper(dedupConf); SparkDeduper deduper = new SparkDeduper(dedupConf);
Dataset<?> simRels = spark Dataset<?> simRels = spark

View File

@ -67,8 +67,6 @@ public class SparkWhitelistSimRels extends AbstractSparkAction {
log.info("workingPath: '{}'", workingPath); log.info("workingPath: '{}'", workingPath);
log.info("whiteListPath: '{}'", whiteListPath); log.info("whiteListPath: '{}'", whiteListPath);
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
// file format: source####target // file format: source####target
Dataset<Row> whiteListRels = spark Dataset<Row> whiteListRels = spark
.read() .read()