diff --git a/dhp-workflows/dhp-dedup-openaire/pom.xml b/dhp-workflows/dhp-dedup-openaire/pom.xml
index e7f2a926f..6e3723ad8 100644
--- a/dhp-workflows/dhp-dedup-openaire/pom.xml
+++ b/dhp-workflows/dhp-dedup-openaire/pom.xml
@@ -90,6 +90,12 @@
com.fasterxml.jackson.core
jackson-core
+
+ junit
+ junit
+ 4.12
+ test
+
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java
index e1c1f581c..6025a54c1 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java
@@ -5,7 +5,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.MapDocumentUtil;
@@ -17,12 +19,18 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
+import org.dom4j.Document;
import org.dom4j.DocumentException;
+import org.dom4j.Element;
+import org.dom4j.io.SAXReader;
import scala.Tuple2;
import java.io.Serializable;
+import java.io.StringReader;
+import java.util.ArrayList;
import java.util.List;
public class SparkCreateSimRels implements Serializable {
@@ -51,35 +59,38 @@ public class SparkCreateSimRels implements Serializable {
System.out.println(String.format("actionSetId: '%s'", actionSetId));
System.out.println(String.format("workingPath: '%s'", workingPath));
- try (SparkSession spark = getSparkSession(parser)) {
- final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+ List configurations = getConfigurations(isLookUpUrl, actionSetId);
+ System.out.println("configurations = " + configurations.size());
- //for each dedup configuration
- for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) {
- final String entity = dedupConf.getWf().getEntityType();
- final String subEntity = dedupConf.getWf().getSubEntityValue();
-
- JavaPairRDD mapDocument = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
- .mapToPair(s -> {
- MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
- return new Tuple2<>(d.getIdentifier(), d);
- });
-
- //create blocks for deduplication
- JavaPairRDD> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf);
-
- //create relations by comparing only elements in the same group
- final JavaPairRDD dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf);
-
- JavaRDD relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity));
-
- //save the simrel in the workingdir
- spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class))
- .write()
- .mode("overwrite")
- .save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity));
- }
- }
+// try (SparkSession spark = getSparkSession(parser)) {
+// final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+//
+// //for each dedup configuration
+// for (DedupConfig dedupConf: getConfigurations(isLookUpUrl, actionSetId)) {
+// final String entity = dedupConf.getWf().getEntityType();
+// final String subEntity = dedupConf.getWf().getSubEntityValue();
+//
+// JavaPairRDD mapDocument = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
+// .mapToPair((PairFunction) s -> {
+// MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
+// return new Tuple2(d.getIdentifier(), d);
+// });
+//
+// //create blocks for deduplication
+// JavaPairRDD> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf);
+//
+// //create relations by comparing only elements in the same group
+// final JavaPairRDD dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf);
+//
+// JavaRDD relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity));
+//
+// //save the simrel in the workingdir
+// spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class))
+// .write()
+// .mode("overwrite")
+// .save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity));
+// }
+// }
}
/**
@@ -131,4 +142,37 @@ public class SparkCreateSimRels implements Serializable {
.getOrCreate();
}
+ public List getConfigurations(String isLookUpUrl, String orchestrator) throws ISLookUpException, DocumentException {
+ final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl);
+
+ final String xquery = String.format("/RESOURCE_PROFILE[.//DEDUPLICATION/ACTION_SET/@id = '%s']", orchestrator);
+
+ String orchestratorProfile = isLookUpService.getResourceProfileByQuery(xquery);
+
+ final Document doc = new SAXReader().read(new StringReader(orchestratorProfile));
+
+ final String actionSetId = doc.valueOf("//DEDUPLICATION/ACTION_SET/@id");
+ final List configurations = new ArrayList<>();
+
+ for (final Object o : doc.selectNodes("//SCAN_SEQUENCE/SCAN")) {
+ configurations.add(loadConfig(isLookUpService, actionSetId, o));
+ }
+
+ return configurations;
+
+ }
+
+ public DedupConfig loadConfig(final ISLookUpService isLookUpService, final String actionSetId, final Object o)
+ throws ISLookUpException {
+ final Element s = (Element) o;
+ final String configProfileId = s.attributeValue("id");
+ final String conf =
+ isLookUpService.getResourceProfileByQuery(String.format(
+ "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
+ configProfileId));
+ final DedupConfig dedupConfig = DedupConfig.load(conf);
+ dedupConfig.getWf().setConfigurationId(actionSetId);
+ return dedupConfig;
+ }
+
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkCreateDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkCreateDedupTest.java
deleted file mode 100644
index d7fc3f694..000000000
--- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkCreateDedupTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package eu.dnetlib.dhp.oa.dedup.dedup;
-
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.oa.dedup.SparkCreateConnectedComponent;
-import eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord;
-import eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-
-import java.io.IOException;
-
-public class SparkCreateDedupTest {
-
- String configuration;
- String entity = "organization";
-
- @BeforeEach
- public void setUp() throws IOException {
-// configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org.curr.conf.json"));
- configuration = "";
- }
-
- @Disabled("must be parametrized to run locally")
- public void createSimRelsTest() throws Exception {
- SparkCreateSimRels.main(new String[]{
- "-mt", "local[*]",
- "-i", "/Users/miconis/dumps",
- "-o", "/tmp/dedup/rawset_test",
- "-asi", "dedup-similarity-result-levenstein",
- "-la", "lookupurl",
- "-w", "workingPath"
- });
- }
-
- @Disabled("must be parametrized to run locally")
- public void createCCTest() throws Exception {
-
- SparkCreateConnectedComponent.main(new String[]{
- "-mt", "local[*]",
- "-s", "/Users/miconis/dumps",
- "-e", entity,
- "-c", ArgumentApplicationParser.compressArgument(configuration),
- "-t", "/tmp/dedup",
- });
- }
-
- @Disabled("must be parametrized to run locally")
- public void dedupRecordTest() throws Exception {
- SparkCreateDedupRecord.main(new String[]{
- "-mt", "local[*]",
- "-s", "/Users/miconis/dumps",
- "-e", entity,
- "-c", ArgumentApplicationParser.compressArgument(configuration),
- "-d", "/tmp/dedup",
- });
- }
-
- @Disabled("must be parametrized to run locally")
- public void printConfiguration() throws Exception {
- System.out.println(ArgumentApplicationParser.compressArgument(configuration));
- }
-
- @Disabled("must be parametrized to run locally")
- public void testHashCode() {
- final String s1 = "20|grid________::6031f94bef015a37783268ec1e75f17f";
- final String s2 = "20|nsf_________::b12be9edf414df8ee66b4c52a2d8da46";
-
- final HashFunction hashFunction = Hashing.murmur3_128();
-
- System.out.println(s1.hashCode());
- System.out.println(hashFunction.hashString(s1).asLong());
- System.out.println(s2.hashCode());
- System.out.println(hashFunction.hashString(s2).asLong());
- }
-}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkDedupTest.java
new file mode 100644
index 000000000..c0ddaa842
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkDedupTest.java
@@ -0,0 +1,101 @@
+package eu.dnetlib.dhp.oa.dedup.dedup;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.oa.dedup.SparkCreateConnectedComponent;
+import eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord;
+import eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.pace.config.DedupConfig;
+import org.apache.commons.io.IOUtils;
+import org.dom4j.DocumentException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.runner.RunWith;
+
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+
+@RunWith(MockitoJUnitRunner.Silent.class)
+public class SparkDedupTest {
+
+ @Mock
+ SparkCreateSimRels sparkCreateSimRels;
+
+ public List prepareConfigurations() throws IOException {
+
+ return Lists.newArrayList(
+ DedupConfig.load(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))),
+ DedupConfig.load(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")))
+ );
+ }
+
+ @Before
+ public void setUp() throws IOException, ISLookUpException, DocumentException {
+
+ when(sparkCreateSimRels.getConfigurations(anyString(), anyString())).thenReturn(prepareConfigurations());
+
+ }
+
+ @Test
+ public void createSimRelsTest() throws Exception {
+ SparkCreateSimRels.main(new String[]{
+ "-mt", "local[*]",
+ "-i", "/Users/miconis/dumps",
+ "-asi", "dedup-similarity-result-levenstein",
+ "-la", "lookupurl",
+ "-w", "workingPath"
+ });
+ }
+
+// @Disabled("must be parametrized to run locally")
+// public void createCCTest() throws Exception {
+//
+// SparkCreateConnectedComponent.main(new String[]{
+// "-mt", "local[*]",
+// "-s", "/Users/miconis/dumps",
+// "-e", entity,
+// "-c", ArgumentApplicationParser.compressArgument(configuration),
+// "-t", "/tmp/dedup",
+// });
+// }
+//
+// @Disabled("must be parametrized to run locally")
+// public void dedupRecordTest() throws Exception {
+// SparkCreateDedupRecord.main(new String[]{
+// "-mt", "local[*]",
+// "-s", "/Users/miconis/dumps",
+// "-e", entity,
+// "-c", ArgumentApplicationParser.compressArgument(configuration),
+// "-d", "/tmp/dedup",
+// });
+// }
+//
+// @Disabled("must be parametrized to run locally")
+// public void printConfiguration() throws Exception {
+// System.out.println(ArgumentApplicationParser.compressArgument(configuration));
+// }
+
+ @Disabled("must be parametrized to run locally")
+ public void testHashCode() {
+ final String s1 = "20|grid________::6031f94bef015a37783268ec1e75f17f";
+ final String s2 = "20|nsf_________::b12be9edf414df8ee66b4c52a2d8da46";
+
+ final HashFunction hashFunction = Hashing.murmur3_128();
+
+ System.out.println(s1.hashCode());
+ System.out.println(hashFunction.hashString(s1).asLong());
+ System.out.println(s2.hashCode());
+ System.out.println(hashFunction.hashString(s2).asLong());
+ }
+}