From 4365cf41d70cd1a16b359340a4b2da2537237543 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 8 Jul 2020 22:31:46 +0200 Subject: [PATCH 01/14] trying to overcome OOM errors during duplicate scan phase --- .../dhp/oa/dedup/SparkCreateMergeRels.java | 7 +++++-- .../dhp/oa/dedup/SparkCreateSimRels.java | 20 +++++++++++-------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index c0503d991..19e60b520 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -10,6 +10,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.graphx.Edge; import org.apache.spark.rdd.RDD; @@ -100,8 +101,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction { final RDD> edgeRdd = spark .read() - .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) - .as(Encoders.bean(Relation.class)) + .textFile(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) + .map( + (MapFunction) r -> OBJECT_MAPPER.readValue(r, Relation.class), + Encoders.bean(Relation.class)) .javaRDD() .map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass())) .rdd(); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index 2cfe2e080..7bc77fe2b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -95,19 +95,23 @@ public class SparkCreateSimRels extends AbstractSparkAction { }); // create blocks for deduplication - JavaPairRDD blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf); + JavaPairRDD blocks = Deduper + .createSortedBlocks(mapDocuments, dedupConf) + .repartition(10000); // create relations by comparing only elements in the same group - JavaRDD relations = Deduper + Deduper .computeRelations(sc, blocks, dedupConf) - .map(t -> createSimRel(t._1(), t._2(), entity)); + .map(t -> createSimRel(t._1(), t._2(), entity)) + .repartition(10000) + .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + .saveAsTextFile(outputPath); // save the simrel in the workingdir - spark - .createDataset(relations.rdd(), Encoders.bean(Relation.class)) - .write() - .mode(SaveMode.Append) - .save(outputPath); + /* + * spark .createDataset(relations.rdd(), Encoders.bean(Relation.class)) .write() .mode(SaveMode.Append) + * .save(outputPath); + */ } } From 3c728aaa0c1c993b172bac1c792ff45a0bac75ea Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 8 Jul 2020 22:39:51 +0200 Subject: [PATCH 02/14] trying to overcome OOM errors during duplicate scan phase --- .../main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java | 1 + 1 file changed, 1 insertion(+) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index 7bc77fe2b..1be2b9e31 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -88,6 +88,7 @@ public class SparkCreateSimRels extends AbstractSparkAction { JavaPairRDD mapDocuments = sc .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) + .repartition(10000) .mapToPair( (PairFunction) s -> { MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); From 752d28f8eb955128eb59c5f71e3f0b652f003319 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 10 Jul 2020 15:09:50 +0200 Subject: [PATCH 03/14] make the relations produced by the dedup SparkPropagateRelation jon unique --- .../dhp/oa/dedup/SparkPropagateRelation.java | 45 ++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 516808511..e65eb7ab5 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -7,6 +7,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,7 +96,49 @@ public class SparkPropagateRelation extends AbstractSparkAction { FieldType.TARGET, getDeletedFn()); - save(newRels.union(updated).union(mergeRels), outputRelationPath, SaveMode.Overwrite); + save(distinctRelations(newRels.union(updated).union(mergeRels)), outputRelationPath, SaveMode.Overwrite); + } + + private Dataset distinctRelations(Dataset rels) { + return rels + .groupByKey((MapFunction) r -> ModelSupport.idFn().apply(r), Encoders.STRING()) + .agg(new RelationAggregator().toColumn()) + .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); + } + + class RelationAggregator extends Aggregator { + + @Override + public Relation zero() { + return new Relation(); + } + + @Override + public Relation reduce(Relation b, Relation a) { + b.mergeFrom(a); + return b; + } + + @Override + public Relation merge(Relation b, Relation a) { + b.mergeFrom(a); + return b; + } + + @Override + public Relation finish(Relation r) { + return r; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.bean(Relation.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.bean(Relation.class); + } } private static Dataset processDataset( From 770adc26e9d509837710de1d15bdeab88ed675a0 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 10 Jul 2020 19:35:10 +0200 Subject: [PATCH 04/14] WIP aggregator to make relationships unique --- .../dnetlib/dhp/oa/dedup/SparkPropagateRelation.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index e65eb7ab5..13d2e4cd7 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -96,14 +96,14 @@ public class SparkPropagateRelation extends AbstractSparkAction { FieldType.TARGET, getDeletedFn()); - save(distinctRelations(newRels.union(updated).union(mergeRels)), outputRelationPath, SaveMode.Overwrite); + save(newRels.union(updated).union(mergeRels).distinct(), outputRelationPath, SaveMode.Overwrite); } private Dataset distinctRelations(Dataset rels) { return rels - .groupByKey((MapFunction) r -> ModelSupport.idFn().apply(r), Encoders.STRING()) - .agg(new RelationAggregator().toColumn()) - .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); + .groupByKey((MapFunction) r -> ModelSupport.idFn().apply(r), Encoders.STRING()) + .agg(new RelationAggregator().toColumn()) + .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); } class RelationAggregator extends Aggregator { @@ -115,13 +115,11 @@ public class SparkPropagateRelation extends AbstractSparkAction { @Override public Relation reduce(Relation b, Relation a) { - b.mergeFrom(a); return b; } @Override public Relation merge(Relation b, Relation a) { - b.mergeFrom(a); return b; } From 7a3fd9f54cc7cf803e799fc430b0eb898e33f3c6 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 13 Jul 2020 10:11:36 +0200 Subject: [PATCH 05/14] dedup relation aggregator moved into dedicated class --- .../dhp/oa/dedup/RelationAggregator.java | 46 +++++++++++++++++++ .../dhp/oa/dedup/SparkPropagateRelation.java | 36 +-------------- 2 files changed, 47 insertions(+), 35 deletions(-) create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java new file mode 100644 index 000000000..0a29aa51b --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java @@ -0,0 +1,46 @@ + +package eu.dnetlib.dhp.oa.dedup; + +import java.util.Objects; + +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.expressions.Aggregator; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class RelationAggregator extends Aggregator { + + private static Relation ZERO = new Relation(); + + @Override + public Relation zero() { + return ZERO; + } + + @Override + public Relation reduce(Relation b, Relation a) { + return Objects.equals(a, ZERO) ? b : a; + } + + @Override + public Relation merge(Relation b, Relation a) { + b.mergeFrom(a); + return b; + } + + @Override + public Relation finish(Relation r) { + return r; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.bean(Relation.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.bean(Relation.class); + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 13d2e4cd7..c19769749 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -7,7 +7,6 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; -import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,7 +95,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { FieldType.TARGET, getDeletedFn()); - save(newRels.union(updated).union(mergeRels).distinct(), outputRelationPath, SaveMode.Overwrite); + save(distinctRelations(newRels.union(updated).union(mergeRels)), outputRelationPath, SaveMode.Overwrite); } private Dataset distinctRelations(Dataset rels) { @@ -106,39 +105,6 @@ public class SparkPropagateRelation extends AbstractSparkAction { .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); } - class RelationAggregator extends Aggregator { - - @Override - public Relation zero() { - return new Relation(); - } - - @Override - public Relation reduce(Relation b, Relation a) { - return b; - } - - @Override - public Relation merge(Relation b, Relation a) { - return b; - } - - @Override - public Relation finish(Relation r) { - return r; - } - - @Override - public Encoder bufferEncoder() { - return Encoders.bean(Relation.class); - } - - @Override - public Encoder outputEncoder() { - return Encoders.bean(Relation.class); - } - } - private static Dataset processDataset( Dataset rels, Dataset> mergedIds, From d561b2dd210eece82b30311d19d7746adad8a497 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 13 Jul 2020 14:18:42 +0200 Subject: [PATCH 06/14] implemented cut of connected component --- .../dhp/oa/dedup/SparkCreateMergeRels.java | 19 ++++++++++++++++++- .../oa/dedup/graph/ConnectedComponent.java | 9 ++++++++- .../dhp/oa/dedup/graph/GraphProcessor.scala | 8 ++++---- .../dhp/oa/dedup/createCC_parameters.json | 7 +++++++ 4 files changed, 37 insertions(+), 6 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index 19e60b520..0c31f5fa2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -76,6 +76,19 @@ public class SparkCreateMergeRels extends AbstractSparkAction { final String workingPath = parser.get("workingPath"); final String isLookUpUrl = parser.get("isLookUpUrl"); final String actionSetId = parser.get("actionSetId"); + int cut = 0; + try { + cut = Integer.parseInt(parser.get("cutConnectedComponent")); + + + } catch (Throwable e) { + log.error("unable to parse "+parser.get(" cut-off threshold")); + } + + + + + log.info("graphBasePath: '{}'", graphBasePath); log.info("isLookUpUrl: '{}'", isLookUpUrl); @@ -112,7 +125,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction { final Dataset mergeRels = spark .createDataset( GraphProcessor - .findCCs(vertexes.rdd(), edgeRdd, maxIterations) + .findCCs(vertexes.rdd(), edgeRdd, maxIterations, cut) .toJavaRDD() .filter(k -> k.getDocIds().size() > 1) .flatMap(cc -> ccToMergeRel(cc, dedupConf)) @@ -120,6 +133,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction { Encoders.bean(Relation.class)); mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath); + + + + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java index bfd2c25e2..fd6e70916 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.dedup.graph; import java.io.IOException; import java.io.Serializable; import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; import org.codehaus.jackson.annotate.JsonIgnore; @@ -21,9 +22,14 @@ public class ConnectedComponent implements Serializable { public ConnectedComponent() { } - public ConnectedComponent(Set docIds) { + + public ConnectedComponent(Set docIds, final int cut) { this.docIds = docIds; createID(); + if (cut > 0 && docIds.size() > cut){ + docIds = docIds.stream().filter(s -> !ccId.equalsIgnoreCase(s)).limit(cut -1).collect(Collectors.toSet()); + docIds.add(ccId); + } } public String createID() { @@ -41,6 +47,7 @@ public class ConnectedComponent implements Serializable { public String getMin() { final StringBuilder min = new StringBuilder(); + docIds .forEach( i -> { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala index e19bb7ff5..f4dd85d75 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala @@ -7,7 +7,7 @@ import scala.collection.JavaConversions; object GraphProcessor { - def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int): RDD[ConnectedComponent] = { + def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = { val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby val cc = graph.connectedComponents(maxIterations).vertices @@ -22,15 +22,15 @@ object GraphProcessor { } } val connectedComponents = joinResult.groupByKey() - .map[ConnectedComponent](cc => asConnectedComponent(cc)) + .map[ConnectedComponent](cc => asConnectedComponent(cc, cut)) connectedComponents } - def asConnectedComponent(group: (VertexId, Iterable[String])): ConnectedComponent = { + def asConnectedComponent(group: (VertexId, Iterable[String]), cut:Int): ConnectedComponent = { val docs = group._2.toSet[String] - val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs)); + val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), cut); connectedComponent } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json index 6eedd5432..9350cf22b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json @@ -17,6 +17,13 @@ "paramDescription": "the url for the lookup service", "paramRequired": true }, + { + "paramName": "cc", + "paramLongName": "cutConnectedComponent", + "paramDescription": "the number of maximum elements that belongs to a connected components", + "paramRequired": false + } +, { "paramName": "w", "paramLongName": "workingPath", From 9ef23850228ab7ece1d9e71c83fde76dc933c9cc Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 13 Jul 2020 15:28:17 +0200 Subject: [PATCH 07/14] implemented test for cut of connected component --- .../dhp/oa/dedup/SparkCreateMergeRels.java | 11 +- .../oa/dedup/graph/ConnectedComponent.java | 14 +-- .../dhp/oa/dedup/createCC_parameters.json | 3 +- .../dnetlib/dhp/oa/dedup/SparkDedupTest.java | 102 +++++++++++++++++- 4 files changed, 107 insertions(+), 23 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index 0c31f5fa2..0b44935d0 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -80,16 +80,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction { try { cut = Integer.parseInt(parser.get("cutConnectedComponent")); - } catch (Throwable e) { - log.error("unable to parse "+parser.get(" cut-off threshold")); + log.error("unable to parse " + parser.get(" cut-off threshold")); } - - - - - log.info("graphBasePath: '{}'", graphBasePath); log.info("isLookUpUrl: '{}'", isLookUpUrl); log.info("actionSetId: '{}'", actionSetId); @@ -134,9 +128,6 @@ public class SparkCreateMergeRels extends AbstractSparkAction { mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath); - - - } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java index fd6e70916..cd4f99f63 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java @@ -19,16 +19,16 @@ public class ConnectedComponent implements Serializable { private Set docIds; private String ccId; - public ConnectedComponent() { - } - - public ConnectedComponent(Set docIds, final int cut) { this.docIds = docIds; createID(); - if (cut > 0 && docIds.size() > cut){ - docIds = docIds.stream().filter(s -> !ccId.equalsIgnoreCase(s)).limit(cut -1).collect(Collectors.toSet()); - docIds.add(ccId); + if (cut > 0 && docIds.size() > cut) { + this.docIds = docIds + .stream() + .filter(s -> !ccId.equalsIgnoreCase(s)) + .limit(cut - 1) + .collect(Collectors.toSet()); + this.docIds.add(ccId); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json index 9350cf22b..b1df08535 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json @@ -22,8 +22,7 @@ "paramLongName": "cutConnectedComponent", "paramDescription": "the number of maximum elements that belongs to a connected components", "paramRequired": false - } -, + }, { "paramName": "w", "paramLongName": "workingPath", diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 88d5f24f9..66f0af176 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -3,6 +3,8 @@ package eu.dnetlib.dhp.oa.dedup; import static java.nio.file.Files.createTempDirectory; +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.count; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.lenient; @@ -11,6 +13,9 @@ import java.io.IOException; import java.io.Serializable; import java.net.URISyntaxException; import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -18,6 +23,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Dataset; @@ -190,6 +196,94 @@ public class SparkDedupTest implements Serializable { @Test @Order(2) + public void cutMergeRelsTest() throws Exception { + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkCreateMergeRels.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"))); + parser + .parseArgument( + new String[] { + "-i", + testGraphBasePath, + "-asi", + testActionSetId, + "-la", + "lookupurl", + "-w", + testOutputBasePath, + "-cc", + "3" + }); + + new SparkCreateMergeRels(parser, spark).run(isLookUpService); + + long orgs_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + + long pubs_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + long sw_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + + long ds_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + + long orp_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel") + .as(Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges")) + .groupBy("source") + .agg(count("target").alias("cnt")) + .select("source", "cnt") + .where("cnt > 3") + .count(); + + assertEquals(0, orgs_mergerel); + assertEquals(0, pubs_mergerel); + assertEquals(0, sw_mergerel); + assertEquals(0, ds_mergerel); + assertEquals(0, orp_mergerel); + } + + @Test + @Order(3) public void createMergeRelsTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -241,7 +335,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(3) + @Order(4) public void createDedupRecordTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -288,7 +382,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(4) + @Order(5) public void updateEntityTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -404,7 +498,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(5) + @Order(6) public void propagateRelationTest() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -454,7 +548,7 @@ public class SparkDedupTest implements Serializable { } @Test - @Order(6) + @Order(7) public void testRelations() throws Exception { testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10); testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2); From 8a612d861a89bb7dbe1da3992b9cc6730a4d5125 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 13 Jul 2020 15:30:57 +0200 Subject: [PATCH 08/14] WIP SparkCreateMergeRels distinct relations --- .../dhp/oa/dedup/SparkCreateMergeRels.java | 11 +------- .../dhp/oa/dedup/SparkCreateSimRels.java | 21 ++++---------- .../dhp/oa/dedup/SparkPropagateRelation.java | 18 +++++++++++- .../oa/dedup/graph/ConnectedComponent.java | 5 ++-- .../dnetlib/dhp/oa/dedup/SparkDedupTest.java | 28 +++++++++++++------ .../dnetlib/dhp/dedup/conf/ds.curr.conf.json | 7 +++-- .../dnetlib/dhp/dedup/conf/orp.curr.conf.json | 7 +++-- .../dnetlib/dhp/dedup/conf/pub.curr.conf.json | 7 +++-- .../dnetlib/dhp/dedup/conf/sw.curr.conf.json | 9 +++--- 9 files changed, 61 insertions(+), 52 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index 0c31f5fa2..0b44935d0 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -80,16 +80,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction { try { cut = Integer.parseInt(parser.get("cutConnectedComponent")); - } catch (Throwable e) { - log.error("unable to parse "+parser.get(" cut-off threshold")); + log.error("unable to parse " + parser.get(" cut-off threshold")); } - - - - - log.info("graphBasePath: '{}'", graphBasePath); log.info("isLookUpUrl: '{}'", isLookUpUrl); log.info("actionSetId: '{}'", actionSetId); @@ -134,9 +128,6 @@ public class SparkCreateMergeRels extends AbstractSparkAction { mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath); - - - } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index 1be2b9e31..2e96b3563 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -34,6 +34,8 @@ public class SparkCreateSimRels extends AbstractSparkAction { private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class); + public static final int NUM_PARTITIONS = 10000; + public SparkCreateSimRels(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); } @@ -48,13 +50,6 @@ public class SparkCreateSimRels extends AbstractSparkAction { parser.parseArgument(args); SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf - .registerKryoClasses( - new Class[] { - MapDocument.class, FieldListImpl.class, FieldValueImpl.class, Block.class - }); - new SparkCreateSimRels(parser, getSparkSession(conf)) .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } @@ -88,7 +83,7 @@ public class SparkCreateSimRels extends AbstractSparkAction { JavaPairRDD mapDocuments = sc .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) - .repartition(10000) + .repartition(NUM_PARTITIONS) .mapToPair( (PairFunction) s -> { MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); @@ -98,21 +93,15 @@ public class SparkCreateSimRels extends AbstractSparkAction { // create blocks for deduplication JavaPairRDD blocks = Deduper .createSortedBlocks(mapDocuments, dedupConf) - .repartition(10000); + .repartition(NUM_PARTITIONS); // create relations by comparing only elements in the same group Deduper .computeRelations(sc, blocks, dedupConf) .map(t -> createSimRel(t._1(), t._2(), entity)) - .repartition(10000) + .repartition(NUM_PARTITIONS) .map(r -> OBJECT_MAPPER.writeValueAsString(r)) .saveAsTextFile(outputPath); - - // save the simrel in the workingdir - /* - * spark .createDataset(relations.rdd(), Encoders.bean(Relation.class)) .write() .mode(SaveMode.Append) - * .save(outputPath); - */ } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index c19769749..88fe5b26d 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -4,7 +4,9 @@ package eu.dnetlib.dhp.oa.dedup; import static org.apache.spark.sql.functions.col; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.slf4j.Logger; @@ -95,11 +97,17 @@ public class SparkPropagateRelation extends AbstractSparkAction { FieldType.TARGET, getDeletedFn()); - save(distinctRelations(newRels.union(updated).union(mergeRels)), outputRelationPath, SaveMode.Overwrite); + save( + newRels + .union(updated) + .union(mergeRels) + .map((MapFunction) r -> r, Encoders.kryo(Relation.class)), + outputRelationPath, SaveMode.Overwrite); } private Dataset distinctRelations(Dataset rels) { return rels + .filter(getRelationFilterFunction()) .groupByKey((MapFunction) r -> ModelSupport.idFn().apply(r), Encoders.STRING()) .agg(new RelationAggregator().toColumn()) .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); @@ -119,6 +127,14 @@ public class SparkPropagateRelation extends AbstractSparkAction { .map(mapFn, Encoders.bean(Relation.class)); } + private FilterFunction getRelationFilterFunction() { + return (FilterFunction) r -> StringUtils.isNotBlank(r.getSource()) || + StringUtils.isNotBlank(r.getTarget()) || + StringUtils.isNotBlank(r.getRelClass()) || + StringUtils.isNotBlank(r.getSubRelType()) || + StringUtils.isNotBlank(r.getRelClass()); + } + private static MapFunction patchRelFn() { return value -> { final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java index fd6e70916..c5f893668 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java @@ -22,12 +22,11 @@ public class ConnectedComponent implements Serializable { public ConnectedComponent() { } - public ConnectedComponent(Set docIds, final int cut) { this.docIds = docIds; createID(); - if (cut > 0 && docIds.size() > cut){ - docIds = docIds.stream().filter(s -> !ccId.equalsIgnoreCase(s)).limit(cut -1).collect(Collectors.toSet()); + if (cut > 0 && docIds.size() > cut) { + docIds = docIds.stream().filter(s -> !ccId.equalsIgnoreCase(s)).limit(cut - 1).collect(Collectors.toSet()); docIds.add(ccId); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 88d5f24f9..8e1ef8f9c 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -166,23 +166,31 @@ public class SparkDedupTest implements Serializable { long orgs_simrel = spark .read() - .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") + .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_simrel") .count(); + long pubs_simrel = spark .read() - .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel") + .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_simrel") .count(); - long sw_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/software_simrel").count(); - long ds_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel").count(); + long sw_simrel = spark + .read() + .textFile(testOutputBasePath + "/" + testActionSetId + "/software_simrel") + .count(); + + long ds_simrel = spark + .read() + .textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel") + .count(); long orp_simrel = spark .read() - .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel") + .textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel") .count(); assertEquals(3432, orgs_simrel); - assertEquals(7054, pubs_simrel); + assertEquals(7152, pubs_simrel); assertEquals(344, sw_simrel); assertEquals(458, ds_simrel); assertEquals(6750, orp_simrel); @@ -225,8 +233,10 @@ public class SparkDedupTest implements Serializable { .read() .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel") .count(); - - long ds_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel").count(); + long ds_mergerel = spark + .read() + .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel") + .count(); long orp_mergerel = spark .read() @@ -234,7 +244,7 @@ public class SparkDedupTest implements Serializable { .count(); assertEquals(1276, orgs_mergerel); - assertEquals(1440, pubs_mergerel); + assertEquals(1442, pubs_mergerel); assertEquals(288, sw_mergerel); assertEquals(472, ds_mergerel); assertEquals(718, orp_mergerel); diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json index 2469b2cc0..fa889d63b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json @@ -6,10 +6,10 @@ "subEntityType" : "resulttype", "subEntityValue" : "dataset", "orderField" : "title", - "queueMaxSize" : "800", + "queueMaxSize" : "100", "groupMaxSize" : "100", "maxChildren" : "100", - "slidingWindowSize" : "80", + "slidingWindowSize" : "100", "rootBuilder" : ["result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ], "includeChildren" : "true", "idPath" : "$.id", @@ -17,7 +17,8 @@ }, "pace" : { "clustering" : [ - { "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, + { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} }, + { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } }, { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } ], "decisionTree" : { diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json index 4adcc0439..b45b6ae83 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json @@ -6,10 +6,10 @@ "subEntityType" : "resulttype", "subEntityValue" : "otherresearchproduct", "orderField" : "title", - "queueMaxSize" : "800", + "queueMaxSize" : "100", "groupMaxSize" : "100", "maxChildren" : "100", - "slidingWindowSize" : "80", + "slidingWindowSize" : "100", "rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ], "includeChildren" : "true", "idPath" : "$.id", @@ -17,7 +17,8 @@ }, "pace" : { "clustering" : [ - { "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, + { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} }, + { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } }, { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } ], "decisionTree" : { diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json index ef0b26af4..15ebc7a6a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json @@ -6,10 +6,10 @@ "subEntityType": "resulttype", "subEntityValue": "publication", "orderField": "title", - "queueMaxSize": "800", + "queueMaxSize": "100", "groupMaxSize": "100", "maxChildren": "100", - "slidingWindowSize": "80", + "slidingWindowSize": "100", "rootBuilder": [ "result", "resultProject_outcome_isProducedBy", @@ -29,7 +29,8 @@ }, "pace": { "clustering" : [ - { "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, + { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} }, + { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } }, { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } ], "decisionTree": { diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json index 623abbf9f..f53ff385f 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json @@ -6,10 +6,10 @@ "subEntityType" : "resulttype", "subEntityValue" : "software", "orderField" : "title", - "queueMaxSize" : "800", + "queueMaxSize" : "100", "groupMaxSize" : "100", "maxChildren" : "100", - "slidingWindowSize" : "80", + "slidingWindowSize" : "100", "rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ], "includeChildren" : "true", "idPath" : "$.id", @@ -17,8 +17,9 @@ }, "pace" : { "clustering" : [ - { "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } }, - { "name" : "lowercase", "fields" : [ "doi", "url" ], "params" : { } } + { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} }, + { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } }, + { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } } ], "decisionTree": { "start": { From 7dd91edf4335e7f23ac87661b86008e835134aa6 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 13 Jul 2020 15:40:41 +0200 Subject: [PATCH 09/14] parsing of optional parameter --- .../dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index 0b44935d0..70457fb4a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -5,6 +5,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -76,14 +77,11 @@ public class SparkCreateMergeRels extends AbstractSparkAction { final String workingPath = parser.get("workingPath"); final String isLookUpUrl = parser.get("isLookUpUrl"); final String actionSetId = parser.get("actionSetId"); - int cut = 0; - try { - cut = Integer.parseInt(parser.get("cutConnectedComponent")); - - } catch (Throwable e) { - log.error("unable to parse " + parser.get(" cut-off threshold")); - } - + int cut = Optional + .ofNullable(parser.get("cutConnectedComponent")) + .map(Integer::valueOf) + .orElse(0); + log.info("connected component cut: '{}'", cut); log.info("graphBasePath: '{}'", graphBasePath); log.info("isLookUpUrl: '{}'", isLookUpUrl); log.info("actionSetId: '{}'", actionSetId); From 1d133b7fe6590dbd706ef77e40cfd657debbc00c Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 13 Jul 2020 15:52:41 +0200 Subject: [PATCH 10/14] update test --- .../test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index e10655126..294b19ecd 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -288,6 +288,12 @@ public class SparkDedupTest implements Serializable { assertEquals(0, sw_mergerel); assertEquals(0, ds_mergerel); assertEquals(0, orp_mergerel); + + FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")); + FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")); + FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")); + FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")); + FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")); } @Test From c8284bab06c13f47d135cbd7d8a8fe009f090b6b Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 13 Jul 2020 15:54:51 +0200 Subject: [PATCH 11/14] WIP SparkCreateMergeRels distinct relations --- .../main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java | 4 ++-- .../java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java | 4 ++-- .../src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java | 4 +++- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java index 0a29aa51b..7935fe1ca 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java @@ -36,11 +36,11 @@ public class RelationAggregator extends Aggregator @Override public Encoder bufferEncoder() { - return Encoders.bean(Relation.class); + return Encoders.kryo(Relation.class); } @Override public Encoder outputEncoder() { - return Encoders.bean(Relation.class); + return Encoders.kryo(Relation.class); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 88fe5b26d..baba3bc87 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -98,10 +98,10 @@ public class SparkPropagateRelation extends AbstractSparkAction { getDeletedFn()); save( - newRels + distinctRelations(newRels .union(updated) .union(mergeRels) - .map((MapFunction) r -> r, Encoders.kryo(Relation.class)), + .map((MapFunction) r -> r, Encoders.kryo(Relation.class))), outputRelationPath, SaveMode.Overwrite); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index e10655126..82c2d82b7 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -77,11 +77,13 @@ public class SparkDedupTest implements Serializable { FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); + final SparkConf conf = new SparkConf(); + conf.set("spark.sql.shuffle.partitions", "200"); spark = SparkSession .builder() .appName(SparkDedupTest.class.getSimpleName()) .master("local[*]") - .config(new SparkConf()) + .config(conf) .getOrCreate(); jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); From 8c67938ad0a9d7713530d2a1ede0cf59f161fbe5 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 13 Jul 2020 16:07:07 +0200 Subject: [PATCH 12/14] configurable number of partitions used in the SparkCreateSimRels phase --- .../dhp/oa/dedup/SparkCreateSimRels.java | 14 +++++--- .../oa/dedup/createSimRels_parameters.json | 6 ++++ .../dhp/oa/dedup/scan/oozie_app/workflow.xml | 36 +++++++++++-------- .../dhp/oa/dedup/updateEntity_parameters.json | 26 +++++++------- 4 files changed, 50 insertions(+), 32 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index 2e96b3563..a66ab431c 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.oa.dedup; import java.io.IOException; +import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -34,7 +35,7 @@ public class SparkCreateSimRels extends AbstractSparkAction { private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class); - public static final int NUM_PARTITIONS = 10000; + public static final int NUM_PARTITIONS = 1000; public SparkCreateSimRels(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); @@ -63,7 +64,12 @@ public class SparkCreateSimRels extends AbstractSparkAction { final String isLookUpUrl = parser.get("isLookUpUrl"); final String actionSetId = parser.get("actionSetId"); final String workingPath = parser.get("workingPath"); + final int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(NUM_PARTITIONS); + log.info("numPartitions: '{}'", numPartitions); log.info("graphBasePath: '{}'", graphBasePath); log.info("isLookUpUrl: '{}'", isLookUpUrl); log.info("actionSetId: '{}'", actionSetId); @@ -83,7 +89,7 @@ public class SparkCreateSimRels extends AbstractSparkAction { JavaPairRDD mapDocuments = sc .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) - .repartition(NUM_PARTITIONS) + .repartition(numPartitions) .mapToPair( (PairFunction) s -> { MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); @@ -93,13 +99,13 @@ public class SparkCreateSimRels extends AbstractSparkAction { // create blocks for deduplication JavaPairRDD blocks = Deduper .createSortedBlocks(mapDocuments, dedupConf) - .repartition(NUM_PARTITIONS); + .repartition(numPartitions); // create relations by comparing only elements in the same group Deduper .computeRelations(sc, blocks, dedupConf) .map(t -> createSimRel(t._1(), t._2(), entity)) - .repartition(NUM_PARTITIONS) + .repartition(numPartitions) .map(r -> OBJECT_MAPPER.writeValueAsString(r)) .saveAsTextFile(outputPath); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json index ce38dc6f0..09f4365d3 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json @@ -22,5 +22,11 @@ "paramLongName": "workingPath", "paramDescription": "path of the working directory", "paramRequired": true + }, + { + "paramName": "np", + "paramLongName": "numPartitions", + "paramDescription": "number of partitions for the similarity relations intermediate phases", + "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index 298a248e3..c42ce1263 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -20,6 +20,10 @@ dedupGraphPath path for the output graph + + cutConnectedComponent + max number of elements in a connected component + sparkDriverMemory memory for driver process @@ -106,10 +110,11 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --i${graphBasePath} - --la${isLookUpUrl} - --asi${actionSetId} - --w${workingPath} + --graphBasePath${graphBasePath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --workingPath${workingPath} + --numPartitions8000 @@ -132,10 +137,11 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --i${graphBasePath} - --w${workingPath} - --la${isLookUpUrl} - --asi${actionSetId} + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} + --cutConnectedComponent${cutConnectedComponent} @@ -158,10 +164,10 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --i${graphBasePath} - --w${workingPath} - --la${isLookUpUrl} - --asi${actionSetId} + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --isLookUpUrl${isLookUpUrl} + --actionSetId${actionSetId} @@ -184,9 +190,9 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --i${graphBasePath} - --w${workingPath} - --o${dedupGraphPath} + --graphBasePath${graphBasePath} + --workingPath${workingPath} + --dedupGraphPath${dedupGraphPath} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json index c91f3c04b..6a2a48746 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json @@ -1,17 +1,17 @@ [ -{ - "paramName": "i", - "paramLongName": "graphBasePath", - "paramDescription": "the base path of raw graph", - "paramRequired": true -}, -{ - "paramName": "w", - "paramLongName": "workingPath", - "paramDescription": "the working directory path", - "paramRequired": true -}, -{ + { + "paramName": "i", + "paramLongName": "graphBasePath", + "paramDescription": "the base path of raw graph", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "the working directory path", + "paramRequired": true + }, + { "paramName": "o", "paramLongName": "dedupGraphPath", "paramDescription": "the path of the dedup graph", From 1143f426aaf2eadfa8b55dcb62fabf8b52bcb503 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 13 Jul 2020 16:13:36 +0200 Subject: [PATCH 13/14] WIP SparkCreateMergeRels distinct relations --- .../dhp/oa/dedup/RelationAggregator.java | 17 ++++++++++++++--- .../dhp/oa/dedup/SparkPropagateRelation.java | 3 ++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java index 7935fe1ca..6fb7b844b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java @@ -20,13 +20,12 @@ public class RelationAggregator extends Aggregator @Override public Relation reduce(Relation b, Relation a) { - return Objects.equals(a, ZERO) ? b : a; + return mergeRel(b, a); } @Override public Relation merge(Relation b, Relation a) { - b.mergeFrom(a); - return b; + return mergeRel(b, a); } @Override @@ -34,6 +33,18 @@ public class RelationAggregator extends Aggregator return r; } + private Relation mergeRel(Relation b, Relation a) { + if (Objects.equals(b, ZERO)) { + return a; + } + if (Objects.equals(a, ZERO)) { + return b; + } + + b.mergeFrom(a); + return b; + } + @Override public Encoder bufferEncoder() { return Encoders.kryo(Relation.class); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index baba3bc87..1073adbea 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.dedup; import static org.apache.spark.sql.functions.col; +import com.google.common.base.Joiner; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -108,7 +109,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { private Dataset distinctRelations(Dataset rels) { return rels .filter(getRelationFilterFunction()) - .groupByKey((MapFunction) r -> ModelSupport.idFn().apply(r), Encoders.STRING()) + .groupByKey((MapFunction) r -> String.join(r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()), Encoders.STRING()) .agg(new RelationAggregator().toColumn()) .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class)); } From 344a90c2e6ab1aecf2f48bd5e949bfb8bbb467ae Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 13 Jul 2020 16:32:04 +0200 Subject: [PATCH 14/14] updated assertions in propagateRelationTest --- .../eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 8ed05c18d..91e45447c 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -160,14 +160,11 @@ public class SparkDedupTest implements Serializable { parser .parseArgument( new String[] { - "-i", - testGraphBasePath, - "-asi", - testActionSetId, - "-la", - "lookupurl", - "-w", - testOutputBasePath + "-i", testGraphBasePath, + "-asi", testActionSetId, + "-la", "lookupurl", + "-w", testOutputBasePath, + "-np", "50" }); new SparkCreateSimRels(parser, spark).run(isLookUpService); @@ -535,7 +532,7 @@ public class SparkDedupTest implements Serializable { long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); - assertEquals(4971, relations); + assertEquals(4866, relations); // check deletedbyinference final Dataset mergeRels = spark