1
0
Fork 0

WIP SparkCreateMergeRels distinct relations

This commit is contained in:
Claudio Atzori 2020-07-13 15:54:51 +02:00
parent 7dd91edf43
commit c8284bab06
3 changed files with 7 additions and 5 deletions

View File

@ -36,11 +36,11 @@ public class RelationAggregator extends Aggregator<Relation, Relation, Relation>
@Override @Override
public Encoder<Relation> bufferEncoder() { public Encoder<Relation> bufferEncoder() {
return Encoders.bean(Relation.class); return Encoders.kryo(Relation.class);
} }
@Override @Override
public Encoder<Relation> outputEncoder() { public Encoder<Relation> outputEncoder() {
return Encoders.bean(Relation.class); return Encoders.kryo(Relation.class);
} }
} }

View File

@ -98,10 +98,10 @@ public class SparkPropagateRelation extends AbstractSparkAction {
getDeletedFn()); getDeletedFn());
save( save(
newRels distinctRelations(newRels
.union(updated) .union(updated)
.union(mergeRels) .union(mergeRels)
.map((MapFunction<Relation, Relation>) r -> r, Encoders.kryo(Relation.class)), .map((MapFunction<Relation, Relation>) r -> r, Encoders.kryo(Relation.class))),
outputRelationPath, SaveMode.Overwrite); outputRelationPath, SaveMode.Overwrite);
} }

View File

@ -77,11 +77,13 @@ public class SparkDedupTest implements Serializable {
FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
final SparkConf conf = new SparkConf();
conf.set("spark.sql.shuffle.partitions", "200");
spark = SparkSession spark = SparkSession
.builder() .builder()
.appName(SparkDedupTest.class.getSimpleName()) .appName(SparkDedupTest.class.getSimpleName())
.master("local[*]") .master("local[*]")
.config(new SparkConf()) .config(conf)
.getOrCreate(); .getOrCreate();
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());