From fb2eed9f0e1efd8b851008eb82a404ee9a1b8667 Mon Sep 17 00:00:00 2001 From: miconis Date: Tue, 19 Apr 2022 15:29:29 +0200 Subject: [PATCH] implementation of the java version of the graph processor --- .../test.properties | 3 +- .../src/main/java/eu/dnetlib/Deduper.java | 12 ++--- .../eu/dnetlib/graph/JavaGraphProcessor.java | 47 +++++++++++++++++++ .../eu/dnetlib/jobs/SparkCreateMergeRels.java | 11 ++--- .../java/eu/dnetlib/pace/DedupLocalTest.java | 8 +++- 5 files changed, 64 insertions(+), 17 deletions(-) create mode 100644 dnet-dedup-test/src/main/java/eu/dnetlib/graph/JavaGraphProcessor.java diff --git a/dhp-build/dhp-build-properties-maven-plugin/test.properties b/dhp-build/dhp-build-properties-maven-plugin/test.properties index 3156068..d17c4d8 100644 --- a/dhp-build/dhp-build-properties-maven-plugin/test.properties +++ b/dhp-build/dhp-build-properties-maven-plugin/test.properties @@ -1,2 +1 @@ -# Thu Mar 31 12:53:27 CEST 2022 -projectPropertyKey=projectPropertyValue +# Tue Apr 19 15:27:59 CEST 2022 diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/Deduper.java b/dnet-dedup-test/src/main/java/eu/dnetlib/Deduper.java index 6a5bdaf..5d5845e 100644 --- a/dnet-dedup-test/src/main/java/eu/dnetlib/Deduper.java +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/Deduper.java @@ -1,7 +1,7 @@ package eu.dnetlib; import com.google.common.hash.Hashing; -import eu.dnetlib.graph.GraphProcessor; +import eu.dnetlib.graph.JavaGraphProcessor; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.util.BlockProcessorForTesting; @@ -19,7 +19,6 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.graphx.Edge; -import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -138,16 +137,15 @@ public class Deduper implements Serializable { .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) .mapToPair((PairFunction) s -> new Tuple2<>(hash(s), s)); - final RDD> edgeRdd = spark + final JavaRDD> edgeRdd = spark .read() .load(simRelsPath) .as(Encoders.bean(Relation.class)) .javaRDD() - .map(Relation::toEdgeRdd) - .rdd(); + .map(Relation::toEdgeRdd); - JavaRDD ccs = GraphProcessor - .findCCs(vertexes.rdd(), edgeRdd, maxIterations) + JavaRDD ccs = JavaGraphProcessor + .findCCs(vertexes, edgeRdd, maxIterations) .toJavaRDD(); JavaRDD mergeRel = ccs diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/graph/JavaGraphProcessor.java b/dnet-dedup-test/src/main/java/eu/dnetlib/graph/JavaGraphProcessor.java new file mode 100644 index 0000000..ff6d52b --- /dev/null +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/graph/JavaGraphProcessor.java @@ -0,0 +1,47 @@ +package eu.dnetlib.graph; + +import com.google.common.collect.Sets; +import eu.dnetlib.support.ConnectedComponent; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.graphx.*; +import org.apache.spark.rdd.RDD; +import org.apache.spark.storage.StorageLevel; +import scala.Tuple2; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; + +public class JavaGraphProcessor { + + public static RDD findCCs(JavaPairRDD vertexes, JavaRDD> edges, int maxIterations) { + + ClassTag stringTag = ClassTag$.MODULE$.apply(String.class); + Graph graph = + Graph.apply( + vertexes.rdd(), + edges.rdd(), + "", + StorageLevel.MEMORY_ONLY(), + StorageLevel.MEMORY_ONLY(), + stringTag, + stringTag + ); + + GraphOps graphOps = new GraphOps<>(graph, stringTag, stringTag); + JavaRDD> cc = graphOps.connectedComponents(maxIterations).vertices().toJavaRDD(); + + JavaPairRDD joinResult = vertexes + .leftOuterJoin(cc.mapToPair(x -> x)) + .mapToPair(x -> { + if (!x._2()._2().isPresent()) { + return new Tuple2<>(x._1(), x._2()._1()); + } else { + return new Tuple2<>(x._2()._2(), x._2()._1()); + } + }); + + return joinResult.groupByKey().map(x -> new ConnectedComponent(Sets.newHashSet(x._2()))).rdd(); + + } + +} diff --git a/dnet-dedup-test/src/main/java/eu/dnetlib/jobs/SparkCreateMergeRels.java b/dnet-dedup-test/src/main/java/eu/dnetlib/jobs/SparkCreateMergeRels.java index 644eebe..8b0ec5d 100644 --- a/dnet-dedup-test/src/main/java/eu/dnetlib/jobs/SparkCreateMergeRels.java +++ b/dnet-dedup-test/src/main/java/eu/dnetlib/jobs/SparkCreateMergeRels.java @@ -1,7 +1,7 @@ package eu.dnetlib.jobs; import eu.dnetlib.Deduper; -import eu.dnetlib.graph.GraphProcessor; +import eu.dnetlib.graph.JavaGraphProcessor; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.util.MapDocumentUtil; import eu.dnetlib.pace.utils.Utility; @@ -78,16 +78,15 @@ public class SparkCreateMergeRels extends AbstractSparkJob { .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) .mapToPair((PairFunction) s -> new Tuple2<>(hash(s), s)); - final RDD> edgeRdd = spark + final JavaRDD> edgeRdd = spark .read() .load(workingPath + "/simrels") .as(Encoders.bean(Relation.class)) .javaRDD() - .map(Relation::toEdgeRdd) - .rdd(); + .map(Relation::toEdgeRdd); - JavaRDD ccs = GraphProcessor - .findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()) + JavaRDD ccs = JavaGraphProcessor + .findCCs(vertexes, edgeRdd, dedupConf.getWf().getMaxIterations()) .toJavaRDD(); JavaRDD mergeRel = ccs diff --git a/dnet-dedup-test/src/test/java/eu/dnetlib/pace/DedupLocalTest.java b/dnet-dedup-test/src/test/java/eu/dnetlib/pace/DedupLocalTest.java index 02a2879..76ab518 100644 --- a/dnet-dedup-test/src/test/java/eu/dnetlib/pace/DedupLocalTest.java +++ b/dnet-dedup-test/src/test/java/eu/dnetlib/pace/DedupLocalTest.java @@ -1,6 +1,7 @@ package eu.dnetlib.pace; import eu.dnetlib.Deduper; +import eu.dnetlib.graph.JavaGraphProcessor; import eu.dnetlib.jobs.SparkCreateDedupEntity; import eu.dnetlib.jobs.SparkCreateMergeRels; import eu.dnetlib.jobs.SparkCreateSimRels; @@ -11,6 +12,7 @@ import eu.dnetlib.pace.util.MapDocumentUtil; import eu.dnetlib.pace.utils.Utility; import eu.dnetlib.support.ArgumentApplicationParser; import eu.dnetlib.support.Block; +import eu.dnetlib.support.ConnectedComponent; import eu.dnetlib.support.Relation; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; @@ -21,6 +23,8 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.graphx.Edge; +import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; @@ -179,10 +183,10 @@ public class DedupLocalTest extends DedupTestUtils { //custom parameters for this test DedupConfig dedupConfig = DedupConfig.load(readFileFromHDFS( - Paths.get(DedupLocalTest.class.getResource("/eu/dnetlib/pace/config/pub.instancetype.tree.conf.json").toURI()).toFile().getAbsolutePath() + Paths.get(DedupLocalTest.class.getResource("/eu/dnetlib/pace/config/pubs.fdup.exp.json").toURI()).toFile().getAbsolutePath() )); - String inputPath = Paths.get(DedupLocalTest.class.getResource("/eu/dnetlib/pace/examples/publications.dump.1000.json").toURI()).toFile().getAbsolutePath(); + String inputPath = Paths.get(DedupLocalTest.class.getResource("/eu/dnetlib/pace/examples/publications.to.fix.json").toURI()).toFile().getAbsolutePath(); String simRelsPath = workingPath + "/simrels"; String mergeRelsPath = workingPath + "/mergerels";