diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml
index 6787226a8..f98708c64 100644
--- a/dhp-workflows/dhp-broker-events/pom.xml
+++ b/dhp-workflows/dhp-broker-events/pom.xml
@@ -59,7 +59,6 @@
eu.dnetlib.dhp
dnet-openaire-broker-common
- [3.0.0,)
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java
new file mode 100644
index 000000000..6fb7b844b
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/RelationAggregator.java
@@ -0,0 +1,57 @@
+
+package eu.dnetlib.dhp.oa.dedup;
+
+import java.util.Objects;
+
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.expressions.Aggregator;
+
+import eu.dnetlib.dhp.schema.oaf.Relation;
+
+public class RelationAggregator extends Aggregator {
+
+ private static Relation ZERO = new Relation();
+
+ @Override
+ public Relation zero() {
+ return ZERO;
+ }
+
+ @Override
+ public Relation reduce(Relation b, Relation a) {
+ return mergeRel(b, a);
+ }
+
+ @Override
+ public Relation merge(Relation b, Relation a) {
+ return mergeRel(b, a);
+ }
+
+ @Override
+ public Relation finish(Relation r) {
+ return r;
+ }
+
+ private Relation mergeRel(Relation b, Relation a) {
+ if (Objects.equals(b, ZERO)) {
+ return a;
+ }
+ if (Objects.equals(a, ZERO)) {
+ return b;
+ }
+
+ b.mergeFrom(a);
+ return b;
+ }
+
+ @Override
+ public Encoder bufferEncoder() {
+ return Encoders.kryo(Relation.class);
+ }
+
+ @Override
+ public Encoder outputEncoder() {
+ return Encoders.kryo(Relation.class);
+ }
+}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java
index c0503d991..6d625cd11 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java
@@ -5,11 +5,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
@@ -75,7 +77,11 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final String workingPath = parser.get("workingPath");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
-
+ int cut = Optional
+ .ofNullable(parser.get("cutConnectedComponent"))
+ .map(Integer::valueOf)
+ .orElse(0);
+ log.info("connected component cut: '{}'", cut);
log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
@@ -100,8 +106,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final RDD> edgeRdd = spark
.read()
- .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
- .as(Encoders.bean(Relation.class))
+ .textFile(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity))
+ .map(
+ (MapFunction) r -> OBJECT_MAPPER.readValue(r, Relation.class),
+ Encoders.bean(Relation.class))
.javaRDD()
.map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass()))
.rdd();
@@ -109,7 +117,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final Dataset mergeRels = spark
.createDataset(
GraphProcessor
- .findCCs(vertexes.rdd(), edgeRdd, maxIterations)
+ .findCCs(vertexes.rdd(), edgeRdd, maxIterations, cut)
.toJavaRDD()
.filter(k -> k.getDocIds().size() > 1)
.flatMap(cc -> ccToMergeRel(cc, dedupConf))
@@ -117,6 +125,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
Encoders.bean(Relation.class));
mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath);
+
}
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java
index 2cfe2e080..3beb90e0b 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java
@@ -2,6 +2,7 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
+import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
@@ -34,6 +35,8 @@ public class SparkCreateSimRels extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkCreateSimRels.class);
+ public static final int NUM_PARTITIONS = 1000;
+
public SparkCreateSimRels(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
@@ -48,13 +51,6 @@ public class SparkCreateSimRels extends AbstractSparkAction {
parser.parseArgument(args);
SparkConf conf = new SparkConf();
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- conf
- .registerKryoClasses(
- new Class[] {
- MapDocument.class, FieldListImpl.class, FieldValueImpl.class, Block.class
- });
-
new SparkCreateSimRels(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@@ -68,7 +64,12 @@ public class SparkCreateSimRels extends AbstractSparkAction {
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
+ final int numPartitions = Optional
+ .ofNullable(parser.get("numPartitions"))
+ .map(Integer::valueOf)
+ .orElse(NUM_PARTITIONS);
+ log.info("numPartitions: '{}'", numPartitions);
log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl);
log.info("actionSetId: '{}'", actionSetId);
@@ -88,6 +89,7 @@ public class SparkCreateSimRels extends AbstractSparkAction {
JavaPairRDD mapDocuments = sc
.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
+ .repartition(numPartitions)
.mapToPair(
(PairFunction) s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
@@ -95,19 +97,17 @@ public class SparkCreateSimRels extends AbstractSparkAction {
});
// create blocks for deduplication
- JavaPairRDD blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf);
+ JavaPairRDD blocks = Deduper
+ .createSortedBlocks(mapDocuments, dedupConf)
+ .repartition(numPartitions);
// create relations by comparing only elements in the same group
- JavaRDD relations = Deduper
+ Deduper
.computeRelations(sc, blocks, dedupConf)
- .map(t -> createSimRel(t._1(), t._2(), entity));
-
- // save the simrel in the workingdir
- spark
- .createDataset(relations.rdd(), Encoders.bean(Relation.class))
- .write()
- .mode(SaveMode.Append)
- .save(outputPath);
+ .map(t -> createSimRel(t._1(), t._2(), entity))
+ .repartition(numPartitions)
+ .map(r -> OBJECT_MAPPER.writeValueAsString(r))
+ .saveAsTextFile(outputPath);
}
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java
index 516808511..03e6674e4 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java
@@ -4,12 +4,16 @@ package eu.dnetlib.dhp.oa.dedup;
import static org.apache.spark.sql.functions.col;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Joiner;
+
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
@@ -95,7 +99,24 @@ public class SparkPropagateRelation extends AbstractSparkAction {
FieldType.TARGET,
getDeletedFn());
- save(newRels.union(updated).union(mergeRels), outputRelationPath, SaveMode.Overwrite);
+ save(
+ distinctRelations(
+ newRels
+ .union(updated)
+ .union(mergeRels)
+ .map((MapFunction) r -> r, Encoders.kryo(Relation.class))),
+ outputRelationPath, SaveMode.Overwrite);
+ }
+
+ private Dataset distinctRelations(Dataset rels) {
+ return rels
+ .filter(getRelationFilterFunction())
+ .groupByKey(
+ (MapFunction) r -> String
+ .join(r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()),
+ Encoders.STRING())
+ .agg(new RelationAggregator().toColumn())
+ .map((MapFunction, Relation>) t -> t._2(), Encoders.bean(Relation.class));
}
private static Dataset processDataset(
@@ -112,6 +133,14 @@ public class SparkPropagateRelation extends AbstractSparkAction {
.map(mapFn, Encoders.bean(Relation.class));
}
+ private FilterFunction getRelationFilterFunction() {
+ return (FilterFunction) r -> StringUtils.isNotBlank(r.getSource()) ||
+ StringUtils.isNotBlank(r.getTarget()) ||
+ StringUtils.isNotBlank(r.getRelClass()) ||
+ StringUtils.isNotBlank(r.getSubRelType()) ||
+ StringUtils.isNotBlank(r.getRelClass());
+ }
+
private static MapFunction patchRelFn() {
return value -> {
final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java
index bfd2c25e2..cd4f99f63 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java
@@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.dedup.graph;
import java.io.IOException;
import java.io.Serializable;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.annotate.JsonIgnore;
@@ -18,12 +19,17 @@ public class ConnectedComponent implements Serializable {
private Set docIds;
private String ccId;
- public ConnectedComponent() {
- }
-
- public ConnectedComponent(Set docIds) {
+ public ConnectedComponent(Set docIds, final int cut) {
this.docIds = docIds;
createID();
+ if (cut > 0 && docIds.size() > cut) {
+ this.docIds = docIds
+ .stream()
+ .filter(s -> !ccId.equalsIgnoreCase(s))
+ .limit(cut - 1)
+ .collect(Collectors.toSet());
+ this.docIds.add(ccId);
+ }
}
public String createID() {
@@ -41,6 +47,7 @@ public class ConnectedComponent implements Serializable {
public String getMin() {
final StringBuilder min = new StringBuilder();
+
docIds
.forEach(
i -> {
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala
index e19bb7ff5..f4dd85d75 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/GraphProcessor.scala
@@ -7,7 +7,7 @@ import scala.collection.JavaConversions;
object GraphProcessor {
- def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int): RDD[ConnectedComponent] = {
+ def findCCs(vertexes: RDD[(VertexId, String)], edges: RDD[Edge[String]], maxIterations: Int, cut:Int): RDD[ConnectedComponent] = {
val graph: Graph[String, String] = Graph(vertexes, edges).partitionBy(PartitionStrategy.RandomVertexCut) //TODO remember to remove partitionby
val cc = graph.connectedComponents(maxIterations).vertices
@@ -22,15 +22,15 @@ object GraphProcessor {
}
}
val connectedComponents = joinResult.groupByKey()
- .map[ConnectedComponent](cc => asConnectedComponent(cc))
+ .map[ConnectedComponent](cc => asConnectedComponent(cc, cut))
connectedComponents
}
- def asConnectedComponent(group: (VertexId, Iterable[String])): ConnectedComponent = {
+ def asConnectedComponent(group: (VertexId, Iterable[String]), cut:Int): ConnectedComponent = {
val docs = group._2.toSet[String]
- val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs));
+ val connectedComponent = new ConnectedComponent(JavaConversions.setAsJavaSet[String](docs), cut);
connectedComponent
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json
index 6eedd5432..b1df08535 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json
@@ -17,6 +17,12 @@
"paramDescription": "the url for the lookup service",
"paramRequired": true
},
+ {
+ "paramName": "cc",
+ "paramLongName": "cutConnectedComponent",
+ "paramDescription": "the number of maximum elements that belongs to a connected components",
+ "paramRequired": false
+ },
{
"paramName": "w",
"paramLongName": "workingPath",
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json
index ce38dc6f0..09f4365d3 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json
@@ -22,5 +22,11 @@
"paramLongName": "workingPath",
"paramDescription": "path of the working directory",
"paramRequired": true
+ },
+ {
+ "paramName": "np",
+ "paramLongName": "numPartitions",
+ "paramDescription": "number of partitions for the similarity relations intermediate phases",
+ "paramRequired": false
}
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml
index 298a248e3..c42ce1263 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml
@@ -20,6 +20,10 @@
dedupGraphPath
path for the output graph
+
+ cutConnectedComponent
+ max number of elements in a connected component
+
sparkDriverMemory
memory for driver process
@@ -106,10 +110,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
- --i${graphBasePath}
- --la${isLookUpUrl}
- --asi${actionSetId}
- --w${workingPath}
+ --graphBasePath${graphBasePath}
+ --isLookUpUrl${isLookUpUrl}
+ --actionSetId${actionSetId}
+ --workingPath${workingPath}
+ --numPartitions8000
@@ -132,10 +137,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
- --i${graphBasePath}
- --w${workingPath}
- --la${isLookUpUrl}
- --asi${actionSetId}
+ --graphBasePath${graphBasePath}
+ --workingPath${workingPath}
+ --isLookUpUrl${isLookUpUrl}
+ --actionSetId${actionSetId}
+ --cutConnectedComponent${cutConnectedComponent}
@@ -158,10 +164,10 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
- --i${graphBasePath}
- --w${workingPath}
- --la${isLookUpUrl}
- --asi${actionSetId}
+ --graphBasePath${graphBasePath}
+ --workingPath${workingPath}
+ --isLookUpUrl${isLookUpUrl}
+ --actionSetId${actionSetId}
@@ -184,9 +190,9 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
- --i${graphBasePath}
- --w${workingPath}
- --o${dedupGraphPath}
+ --graphBasePath${graphBasePath}
+ --workingPath${workingPath}
+ --dedupGraphPath${dedupGraphPath}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json
index c91f3c04b..6a2a48746 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json
@@ -1,17 +1,17 @@
[
-{
- "paramName": "i",
- "paramLongName": "graphBasePath",
- "paramDescription": "the base path of raw graph",
- "paramRequired": true
-},
-{
- "paramName": "w",
- "paramLongName": "workingPath",
- "paramDescription": "the working directory path",
- "paramRequired": true
-},
-{
+ {
+ "paramName": "i",
+ "paramLongName": "graphBasePath",
+ "paramDescription": "the base path of raw graph",
+ "paramRequired": true
+ },
+ {
+ "paramName": "w",
+ "paramLongName": "workingPath",
+ "paramDescription": "the working directory path",
+ "paramRequired": true
+ },
+ {
"paramName": "o",
"paramLongName": "dedupGraphPath",
"paramDescription": "the path of the dedup graph",
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java
index 88d5f24f9..fb5ebc099 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java
@@ -3,6 +3,8 @@ package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
@@ -11,6 +13,9 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@@ -18,6 +23,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
@@ -71,11 +77,13 @@ public class SparkDedupTest implements Serializable {
FileUtils.deleteDirectory(new File(testOutputBasePath));
FileUtils.deleteDirectory(new File(testDedupGraphBasePath));
+ final SparkConf conf = new SparkConf();
+ conf.set("spark.sql.shuffle.partitions", "200");
spark = SparkSession
.builder()
.appName(SparkDedupTest.class.getSimpleName())
.master("local[*]")
- .config(new SparkConf())
+ .config(conf)
.getOrCreate();
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@@ -152,37 +160,42 @@ public class SparkDedupTest implements Serializable {
parser
.parseArgument(
new String[] {
- "-i",
- testGraphBasePath,
- "-asi",
- testActionSetId,
- "-la",
- "lookupurl",
- "-w",
- testOutputBasePath
+ "-i", testGraphBasePath,
+ "-asi", testActionSetId,
+ "-la", "lookupurl",
+ "-w", testOutputBasePath,
+ "-np", "50"
});
new SparkCreateSimRels(parser, spark).run(isLookUpService);
long orgs_simrel = spark
.read()
- .load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
+ .textFile(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
.count();
+
long pubs_simrel = spark
.read()
- .load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
+ .textFile(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
.count();
- long sw_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/software_simrel").count();
- long ds_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel").count();
+ long sw_simrel = spark
+ .read()
+ .textFile(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
+ .count();
+
+ long ds_simrel = spark
+ .read()
+ .textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
+ .count();
long orp_simrel = spark
.read()
- .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
+ .textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
.count();
assertEquals(3432, orgs_simrel);
- assertEquals(7054, pubs_simrel);
+ assertEquals(7152, pubs_simrel);
assertEquals(344, sw_simrel);
assertEquals(458, ds_simrel);
assertEquals(6750, orp_simrel);
@@ -190,6 +203,101 @@ public class SparkDedupTest implements Serializable {
@Test
@Order(2)
+ public void cutMergeRelsTest() throws Exception {
+
+ ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ SparkCreateMergeRels.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
+ parser
+ .parseArgument(
+ new String[] {
+ "-i",
+ testGraphBasePath,
+ "-asi",
+ testActionSetId,
+ "-la",
+ "lookupurl",
+ "-w",
+ testOutputBasePath,
+ "-cc",
+ "3"
+ });
+
+ new SparkCreateMergeRels(parser, spark).run(isLookUpService);
+
+ long orgs_mergerel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
+ .as(Encoders.bean(Relation.class))
+ .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges"))
+ .groupBy("source")
+ .agg(count("target").alias("cnt"))
+ .select("source", "cnt")
+ .where("cnt > 3")
+ .count();
+
+ long pubs_mergerel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")
+ .as(Encoders.bean(Relation.class))
+ .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges"))
+ .groupBy("source")
+ .agg(count("target").alias("cnt"))
+ .select("source", "cnt")
+ .where("cnt > 3")
+ .count();
+ long sw_mergerel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
+ .as(Encoders.bean(Relation.class))
+ .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges"))
+ .groupBy("source")
+ .agg(count("target").alias("cnt"))
+ .select("source", "cnt")
+ .where("cnt > 3")
+ .count();
+
+ long ds_mergerel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
+ .as(Encoders.bean(Relation.class))
+ .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges"))
+ .groupBy("source")
+ .agg(count("target").alias("cnt"))
+ .select("source", "cnt")
+ .where("cnt > 3")
+ .count();
+
+ long orp_mergerel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")
+ .as(Encoders.bean(Relation.class))
+ .filter((FilterFunction) r -> r.getRelClass().equalsIgnoreCase("merges"))
+ .groupBy("source")
+ .agg(count("target").alias("cnt"))
+ .select("source", "cnt")
+ .where("cnt > 3")
+ .count();
+
+ assertEquals(0, orgs_mergerel);
+ assertEquals(0, pubs_mergerel);
+ assertEquals(0, sw_mergerel);
+ assertEquals(0, ds_mergerel);
+ assertEquals(0, orp_mergerel);
+
+ FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel"));
+ FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel"));
+ FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/software_mergerel"));
+ FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel"));
+ FileUtils
+ .deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel"));
+ }
+
+ @Test
+ @Order(3)
public void createMergeRelsTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@@ -225,8 +333,10 @@ public class SparkDedupTest implements Serializable {
.read()
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
.count();
-
- long ds_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel").count();
+ long ds_mergerel = spark
+ .read()
+ .load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")
+ .count();
long orp_mergerel = spark
.read()
@@ -234,14 +344,14 @@ public class SparkDedupTest implements Serializable {
.count();
assertEquals(1276, orgs_mergerel);
- assertEquals(1440, pubs_mergerel);
+ assertEquals(1442, pubs_mergerel);
assertEquals(288, sw_mergerel);
assertEquals(472, ds_mergerel);
assertEquals(718, orp_mergerel);
}
@Test
- @Order(3)
+ @Order(4)
public void createDedupRecordTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@@ -288,7 +398,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
- @Order(4)
+ @Order(5)
public void updateEntityTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@@ -404,7 +514,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
- @Order(5)
+ @Order(6)
public void propagateRelationTest() throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
@@ -423,7 +533,7 @@ public class SparkDedupTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
- assertEquals(4971, relations);
+ assertEquals(4866, relations);
// check deletedbyinference
final Dataset mergeRels = spark
@@ -454,7 +564,7 @@ public class SparkDedupTest implements Serializable {
}
@Test
- @Order(6)
+ @Order(7)
public void testRelations() throws Exception {
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json
index 2469b2cc0..fa889d63b 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json
@@ -6,10 +6,10 @@
"subEntityType" : "resulttype",
"subEntityValue" : "dataset",
"orderField" : "title",
- "queueMaxSize" : "800",
+ "queueMaxSize" : "100",
"groupMaxSize" : "100",
"maxChildren" : "100",
- "slidingWindowSize" : "80",
+ "slidingWindowSize" : "100",
"rootBuilder" : ["result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true",
"idPath" : "$.id",
@@ -17,7 +17,8 @@
},
"pace" : {
"clustering" : [
- { "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } },
+ { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
+ { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree" : {
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json
index 4adcc0439..b45b6ae83 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json
@@ -6,10 +6,10 @@
"subEntityType" : "resulttype",
"subEntityValue" : "otherresearchproduct",
"orderField" : "title",
- "queueMaxSize" : "800",
+ "queueMaxSize" : "100",
"groupMaxSize" : "100",
"maxChildren" : "100",
- "slidingWindowSize" : "80",
+ "slidingWindowSize" : "100",
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true",
"idPath" : "$.id",
@@ -17,7 +17,8 @@
},
"pace" : {
"clustering" : [
- { "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } },
+ { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
+ { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree" : {
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json
index ef0b26af4..15ebc7a6a 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json
@@ -6,10 +6,10 @@
"subEntityType": "resulttype",
"subEntityValue": "publication",
"orderField": "title",
- "queueMaxSize": "800",
+ "queueMaxSize": "100",
"groupMaxSize": "100",
"maxChildren": "100",
- "slidingWindowSize": "80",
+ "slidingWindowSize": "100",
"rootBuilder": [
"result",
"resultProject_outcome_isProducedBy",
@@ -29,7 +29,8 @@
},
"pace": {
"clustering" : [
- { "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } },
+ { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
+ { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree": {
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json
index 623abbf9f..f53ff385f 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json
@@ -6,10 +6,10 @@
"subEntityType" : "resulttype",
"subEntityValue" : "software",
"orderField" : "title",
- "queueMaxSize" : "800",
+ "queueMaxSize" : "100",
"groupMaxSize" : "100",
"maxChildren" : "100",
- "slidingWindowSize" : "80",
+ "slidingWindowSize" : "100",
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
"includeChildren" : "true",
"idPath" : "$.id",
@@ -17,8 +17,9 @@
},
"pace" : {
"clustering" : [
- { "name" : "wordssuffixprefix", "fields" : [ "title" ], "params" : { "max" : "2", "len" : "3" } },
- { "name" : "lowercase", "fields" : [ "doi", "url" ], "params" : { } }
+ { "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
+ { "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
+ { "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree": {
"start": {
diff --git a/pom.xml b/pom.xml
index 4619f3174..411ef9521 100644
--- a/pom.xml
+++ b/pom.xml
@@ -323,6 +323,12 @@
[2.0.0,3.0.0)
+
+ eu.dnetlib.dhp
+ dnet-openaire-broker-common
+ ${dnet.openaire.broker.common}
+
+
org.apache.cxf
cxf-rt-transports-http
@@ -618,5 +624,6 @@
3.3.3
3.4.2
[2.12,3.0)
+ 3.0.0