From 8eedfefc98ce53bd08ea35e2d814bddc9d3c8f86 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 15 Apr 2020 18:35:35 +0200 Subject: [PATCH] try to introduce intermediate serialization on hdfs to avoid OOM --- .../dhp/oa/dedup/SparkPropagateRelation.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index e00c8767e6..47a2acf373 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -77,7 +77,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { JavaRDD relations = sc.textFile(DedupUtility.createEntityPath(graphBasePath, "relation")); - JavaRDD newRels = relations.mapToPair( + relations.mapToPair( (PairFunction) s -> new Tuple2(MapDocumentUtil.getJPathString(SOURCEJSONPATH, s), s)) .leftOuterJoin(mergedIds) @@ -97,10 +97,11 @@ public class SparkPropagateRelation extends AbstractSparkAction { } return v1._2()._1(); }).filter(SparkPropagateRelation::containsDedup) - .repartition(500); + .repartition(500) + .saveAsTextFile(DedupUtility.createEntityPath(workingPath, "newRels"), GzipCodec.class); //update deleted by inference - relations = relations.mapToPair( + relations.mapToPair( (PairFunction) s -> new Tuple2(MapDocumentUtil.getJPathString(SOURCEJSONPATH, s), s)) .leftOuterJoin(mergedIds) @@ -120,9 +121,16 @@ public class SparkPropagateRelation extends AbstractSparkAction { } return v1._2()._1(); }) - .repartition(500); + .repartition(500) + .saveAsTextFile(DedupUtility.createEntityPath(workingPath, "updated"), GzipCodec.class); - newRels.union(relations).repartition(1000) + JavaRDD newRels = sc + .textFile(DedupUtility.createEntityPath(workingPath, "newRels")); + + sc + .textFile(DedupUtility.createEntityPath(workingPath, "updated")) + .union(newRels) + .repartition(1000) .saveAsTextFile(DedupUtility.createEntityPath(dedupGraphPath, "relation"), GzipCodec.class); } @@ -169,4 +177,4 @@ public class SparkPropagateRelation extends AbstractSparkAction { throw new RuntimeException("Unable to convert json", e); } } -} +} \ No newline at end of file