try to introduce intermediate serialization on hdfs to avoid OOM

This commit is contained in:
Claudio Atzori 2020-04-15 18:35:35 +02:00
parent 5689d49689
commit 8eedfefc98
1 changed files with 14 additions and 6 deletions

View File

@ -77,7 +77,7 @@ public class SparkPropagateRelation extends AbstractSparkAction {
JavaRDD<String> relations = sc.textFile(DedupUtility.createEntityPath(graphBasePath, "relation")); JavaRDD<String> relations = sc.textFile(DedupUtility.createEntityPath(graphBasePath, "relation"));
JavaRDD<String> newRels = relations.mapToPair( relations.mapToPair(
(PairFunction<String, String, String>) s -> (PairFunction<String, String, String>) s ->
new Tuple2<String, String>(MapDocumentUtil.getJPathString(SOURCEJSONPATH, s), s)) new Tuple2<String, String>(MapDocumentUtil.getJPathString(SOURCEJSONPATH, s), s))
.leftOuterJoin(mergedIds) .leftOuterJoin(mergedIds)
@ -97,10 +97,11 @@ public class SparkPropagateRelation extends AbstractSparkAction {
} }
return v1._2()._1(); return v1._2()._1();
}).filter(SparkPropagateRelation::containsDedup) }).filter(SparkPropagateRelation::containsDedup)
.repartition(500); .repartition(500)
.saveAsTextFile(DedupUtility.createEntityPath(workingPath, "newRels"), GzipCodec.class);
//update deleted by inference //update deleted by inference
relations = relations.mapToPair( relations.mapToPair(
(PairFunction<String, String, String>) s -> (PairFunction<String, String, String>) s ->
new Tuple2<String, String>(MapDocumentUtil.getJPathString(SOURCEJSONPATH, s), s)) new Tuple2<String, String>(MapDocumentUtil.getJPathString(SOURCEJSONPATH, s), s))
.leftOuterJoin(mergedIds) .leftOuterJoin(mergedIds)
@ -120,9 +121,16 @@ public class SparkPropagateRelation extends AbstractSparkAction {
} }
return v1._2()._1(); return v1._2()._1();
}) })
.repartition(500); .repartition(500)
.saveAsTextFile(DedupUtility.createEntityPath(workingPath, "updated"), GzipCodec.class);
newRels.union(relations).repartition(1000) JavaRDD<String> newRels = sc
.textFile(DedupUtility.createEntityPath(workingPath, "newRels"));
sc
.textFile(DedupUtility.createEntityPath(workingPath, "updated"))
.union(newRels)
.repartition(1000)
.saveAsTextFile(DedupUtility.createEntityPath(dedupGraphPath, "relation"), GzipCodec.class); .saveAsTextFile(DedupUtility.createEntityPath(dedupGraphPath, "relation"), GzipCodec.class);
} }