forked from D-Net/dnet-hadoop
removed serialization points
This commit is contained in:
parent
1d35836a58
commit
5ec8c49ad5
|
@ -79,8 +79,6 @@ public class SparkRemoveBlacklistedRelationJob {
|
||||||
Dataset<Relation> inputRelation = readRelations(spark, inputPath);
|
Dataset<Relation> inputRelation = readRelations(spark, inputPath);
|
||||||
Dataset<Relation> mergesRelation = readRelations(spark, mergesPath);
|
Dataset<Relation> mergesRelation = readRelations(spark, mergesPath);
|
||||||
|
|
||||||
log.info("InputRelationCount: {}", inputRelation.count());
|
|
||||||
|
|
||||||
Dataset<Relation> dedupSource = blackListed
|
Dataset<Relation> dedupSource = blackListed
|
||||||
.joinWith(
|
.joinWith(
|
||||||
mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")),
|
mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")),
|
||||||
|
@ -103,11 +101,6 @@ public class SparkRemoveBlacklistedRelationJob {
|
||||||
return c._1();
|
return c._1();
|
||||||
}, Encoders.bean(Relation.class));
|
}, Encoders.bean(Relation.class));
|
||||||
|
|
||||||
dedupBL
|
|
||||||
.write()
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.json(blacklistPath + "/deduped");
|
|
||||||
|
|
||||||
inputRelation
|
inputRelation
|
||||||
.joinWith(
|
.joinWith(
|
||||||
dedupBL, (inputRelation
|
dedupBL, (inputRelation
|
||||||
|
|
Loading…
Reference in New Issue