diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index c0503d991..19e60b520 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -10,6 +10,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.graphx.Edge; import org.apache.spark.rdd.RDD; @@ -100,8 +101,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction { final RDD> edgeRdd = spark .read() - .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) - .as(Encoders.bean(Relation.class)) + .textFile(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) + .map( + (MapFunction) r -> OBJECT_MAPPER.readValue(r, Relation.class), + Encoders.bean(Relation.class)) .javaRDD() .map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass())) .rdd(); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index 2cfe2e080..7bc77fe2b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -95,19 +95,23 @@ public class SparkCreateSimRels extends AbstractSparkAction { }); // create blocks for deduplication - JavaPairRDD blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf); + JavaPairRDD blocks = Deduper + .createSortedBlocks(mapDocuments, dedupConf) + .repartition(10000); // create relations by comparing only elements in the same group - JavaRDD relations = Deduper + Deduper .computeRelations(sc, blocks, dedupConf) - .map(t -> createSimRel(t._1(), t._2(), entity)); + .map(t -> createSimRel(t._1(), t._2(), entity)) + .repartition(10000) + .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + .saveAsTextFile(outputPath); // save the simrel in the workingdir - spark - .createDataset(relations.rdd(), Encoders.bean(Relation.class)) - .write() - .mode(SaveMode.Append) - .save(outputPath); + /* + * spark .createDataset(relations.rdd(), Encoders.bean(Relation.class)) .write() .mode(SaveMode.Append) + * .save(outputPath); + */ } }