trying to overcome OOM errors during duplicate scan phase

This commit is contained in:
Claudio Atzori 2020-07-08 22:31:46 +02:00
parent c3d67f709a
commit 4365cf41d7
2 changed files with 17 additions and 10 deletions

View File

@ -10,6 +10,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
@ -100,8 +101,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final RDD<Edge<String>> edgeRdd = spark
.read()
.load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
.as(Encoders.bean(Relation.class))
.textFile(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
.map(
(MapFunction<String, Relation>) r -> OBJECT_MAPPER.readValue(r, Relation.class),
Encoders.bean(Relation.class))
.javaRDD()
.map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass()))
.rdd();

View File

@ -95,19 +95,23 @@ public class SparkCreateSimRels extends AbstractSparkAction {
});
// create blocks for deduplication
JavaPairRDD<String, Block> blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf);
JavaPairRDD<String, Block> blocks = Deduper
.createSortedBlocks(mapDocuments, dedupConf)
.repartition(10000);
// create relations by comparing only elements in the same group
JavaRDD<Relation> relations = Deduper
Deduper
.computeRelations(sc, blocks, dedupConf)
.map(t -> createSimRel(t._1(), t._2(), entity));
.map(t -> createSimRel(t._1(), t._2(), entity))
.repartition(10000)
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(outputPath);
// save the simrel in the workingdir
spark
.createDataset(relations.rdd(), Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Append)
.save(outputPath);
/*
* spark .createDataset(relations.rdd(), Encoders.bean(Relation.class)) .write() .mode(SaveMode.Append)
* .save(outputPath);
*/
}
}