fixed swithch of parameters

This commit is contained in:
Miriam Baglioni 2020-05-07 18:22:32 +02:00
parent e218360f8a
commit f95d288681
2 changed files with 28 additions and 21 deletions

View File

@ -62,8 +62,8 @@ public class SparkRemoveBlacklistedRelationJob {
spark -> {
removeBlacklistedRelations(
spark,
inputPath,
blacklistPath,
inputPath,
outputPath,
mergesPath);
});
@ -76,30 +76,34 @@ public class SparkRemoveBlacklistedRelationJob {
Dataset<Relation> inputRelation = readRelations(spark, inputPath);
Dataset<Relation> mergesRelation = readRelations(spark, mergesPath);
log.info("InputRelationCount: {}", inputRelation.count());
Dataset<Relation> dedupSource = blackListed
.joinWith(mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")), "left_outer")
.map(c -> {
Optional<Relation> merged = Optional.ofNullable(c._2());
Relation bl = c._1();
if (merged.isPresent()) {
bl.setSource(merged.get().getSource());
}
return bl;
Optional
.ofNullable(c._2())
.ifPresent(mr -> c._1().setSource(mr.getSource()));
return c._1();
}, Encoders.bean(Relation.class));
Dataset<Relation> dedupBL = dedupSource
.joinWith(mergesRelation, dedupSource.col("target").equalTo(mergesRelation.col("target")), "left_outer")
.map(c -> {
Optional<Relation> merged = Optional.ofNullable(c._2());
Relation bl = c._1();
if (merged.isPresent()) {
bl.setTarget(merged.get().getSource());
}
return bl;
Optional
.ofNullable(c._2())
.ifPresent(mr -> c._1().setTarget(mr.getSource()));
return c._1();
}, Encoders.bean(Relation.class));
inputRelation
.joinWith(dedupBL, inputRelation.col("source").equalTo(dedupBL.col("source")), "left_outer")
dedupBL
.write()
.json(blacklistPath + "/deduped");
Dataset<Relation> tmp = inputRelation
.joinWith(
dedupBL, inputRelation.col("source").equalTo(dedupBL.col("source")),
"left_outer")
.map(c -> {
Relation ir = c._1();
Optional<Relation> obl = Optional.ofNullable(c._2());
@ -111,12 +115,15 @@ public class SparkRemoveBlacklistedRelationJob {
return ir;
}, Encoders.bean(Relation.class))
.filter(r -> !(r == null))
.toJSON()
.filter(r -> r != null);
log.info("NumberOfRelationAfterBlacklisting: {} ", tmp.count());
tmp
.write()
.mode(SaveMode.Overwrite)
.option("conpression", "gzip")
.text(outputPath);
.option("compression", "gzip")
.json(outputPath);
}

View File

@ -62,7 +62,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/mergesRelation</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
</spark>
@ -86,7 +86,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/blacklist</arg>
<arg>--mergesPath</arg><arg>${workingDir}/mergesRelation</arg>