1
0
Fork 0

removed unusefule serialization points

This commit is contained in:
Miriam Baglioni 2020-05-11 14:28:37 +02:00
parent b35d57a1ac
commit 757bae53ea
1 changed files with 5 additions and 21 deletions

View File

@ -80,8 +80,6 @@ public class SparkRemoveBlacklistedRelationJob {
log.info("InputRelationCount: {}", inputRelation.count()); log.info("InputRelationCount: {}", inputRelation.count());
log.info("NumberOfBlacklistedRelations: {}", blackListed.count());
Dataset<Relation> dedupSource = blackListed Dataset<Relation> dedupSource = blackListed
.joinWith( .joinWith(
mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")), mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")),
@ -109,9 +107,8 @@ public class SparkRemoveBlacklistedRelationJob {
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(blacklistPath + "/deduped"); .json(blacklistPath + "/deduped");
log.info("number of dedupedBL: {}", dedupBL.count());
Dataset<Tuple2<Relation, Relation>> tmp = inputRelation inputRelation
.joinWith( .joinWith(
dedupBL, (inputRelation dedupBL, (inputRelation
.col("source") .col("source")
@ -120,26 +117,19 @@ public class SparkRemoveBlacklistedRelationJob {
inputRelation inputRelation
.col("target") .col("target")
.equalTo(dedupBL.col("target")))), .equalTo(dedupBL.col("target")))),
"left_outer"); "left_outer")
.map(c -> {
log.info("numberOfRelationAfterJoin: {}", tmp.count());
Dataset<Relation> tmp1 = tmp.map(c -> {
Relation ir = c._1(); Relation ir = c._1();
Optional<Relation> obl = Optional.ofNullable(c._2()); Optional<Relation> obl = Optional.ofNullable(c._2());
if (obl.isPresent()) { if (obl.isPresent()) {
if (areEquals(ir, obl.get())) { if (ir.equals(obl.get())) {
return null; return null;
} }
} }
return ir; return ir;
}, Encoders.bean(Relation.class)) }, Encoders.bean(Relation.class))
.filter(Objects::nonNull); .filter(Objects::nonNull)
log.info("NumberOfRelationAfterBlacklisting: {} ", tmp1.count());
tmp1
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
@ -147,12 +137,6 @@ public class SparkRemoveBlacklistedRelationJob {
} }
private static boolean areEquals(Relation ir, Relation bl) {
return ir.getRelClass().equals(bl.getRelClass()) &&
ir.getRelType().equals(bl.getRelType()) &&
ir.getSubRelType().equals(bl.getSubRelType()) &&
ir.getRelClass().equals(bl.getRelClass());
}
public static org.apache.spark.sql.Dataset<Relation> readRelations( public static org.apache.spark.sql.Dataset<Relation> readRelations(
SparkSession spark, String inputPath) { SparkSession spark, String inputPath) {