From 9802bcb9fea911fba27937decdb582fd733b9557 Mon Sep 17 00:00:00 2001 From: miconis Date: Wed, 1 Apr 2020 18:48:31 +0200 Subject: [PATCH] dedup testing --- dhp-workflows/dhp-dedup-openaire/pom.xml | 6 ++ .../dhp/oa/dedup/SparkCreateSimRels.java | 100 ++++++++++++----- .../oa/dedup/dedup/SparkCreateDedupTest.java | 77 ------------- .../dhp/oa/dedup/dedup/SparkDedupTest.java | 101 ++++++++++++++++++ 4 files changed, 179 insertions(+), 105 deletions(-) delete mode 100644 dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkCreateDedupTest.java create mode 100644 dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkDedupTest.java diff --git a/dhp-workflows/dhp-dedup-openaire/pom.xml b/dhp-workflows/dhp-dedup-openaire/pom.xml index e7f2a926f..6e3723ad8 100644 --- a/dhp-workflows/dhp-dedup-openaire/pom.xml +++ b/dhp-workflows/dhp-dedup-openaire/pom.xml @@ -90,6 +90,12 @@ com.fasterxml.jackson.core jackson-core + + junit + junit + 4.12 + test + diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index e1c1f581c..6025a54c1 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -5,7 +5,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.util.MapDocumentUtil; @@ -17,12 +19,18 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; +import org.dom4j.Document; import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.dom4j.io.SAXReader; import scala.Tuple2; import java.io.Serializable; +import java.io.StringReader; +import java.util.ArrayList; import java.util.List; public class SparkCreateSimRels implements Serializable { @@ -51,35 +59,38 @@ public class SparkCreateSimRels implements Serializable { System.out.println(String.format("actionSetId: '%s'", actionSetId)); System.out.println(String.format("workingPath: '%s'", workingPath)); - try (SparkSession spark = getSparkSession(parser)) { - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + List configurations = getConfigurations(isLookUpUrl, actionSetId); + System.out.println("configurations = " + configurations.size()); - //for each dedup configuration - for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) { - final String entity = dedupConf.getWf().getEntityType(); - final String subEntity = dedupConf.getWf().getSubEntityValue(); - - JavaPairRDD mapDocument = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) - .mapToPair(s -> { - MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); - return new Tuple2<>(d.getIdentifier(), d); - }); - - //create blocks for deduplication - JavaPairRDD> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf); - - //create relations by comparing only elements in the same group - final JavaPairRDD dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf); - - JavaRDD relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity)); - - //save the simrel in the workingdir - spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class)) - .write() - .mode("overwrite") - .save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)); - } - } +// try (SparkSession spark = getSparkSession(parser)) { +// final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); +// +// //for each dedup configuration +// for (DedupConfig dedupConf: getConfigurations(isLookUpUrl, actionSetId)) { +// final String entity = dedupConf.getWf().getEntityType(); +// final String subEntity = dedupConf.getWf().getSubEntityValue(); +// +// JavaPairRDD mapDocument = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) +// .mapToPair((PairFunction) s -> { +// MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); +// return new Tuple2(d.getIdentifier(), d); +// }); +// +// //create blocks for deduplication +// JavaPairRDD> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf); +// +// //create relations by comparing only elements in the same group +// final JavaPairRDD dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf); +// +// JavaRDD relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity)); +// +// //save the simrel in the workingdir +// spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class)) +// .write() +// .mode("overwrite") +// .save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)); +// } +// } } /** @@ -131,4 +142,37 @@ public class SparkCreateSimRels implements Serializable { .getOrCreate(); } + public List getConfigurations(String isLookUpUrl, String orchestrator) throws ISLookUpException, DocumentException { + final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl); + + final String xquery = String.format("/RESOURCE_PROFILE[.//DEDUPLICATION/ACTION_SET/@id = '%s']", orchestrator); + + String orchestratorProfile = isLookUpService.getResourceProfileByQuery(xquery); + + final Document doc = new SAXReader().read(new StringReader(orchestratorProfile)); + + final String actionSetId = doc.valueOf("//DEDUPLICATION/ACTION_SET/@id"); + final List configurations = new ArrayList<>(); + + for (final Object o : doc.selectNodes("//SCAN_SEQUENCE/SCAN")) { + configurations.add(loadConfig(isLookUpService, actionSetId, o)); + } + + return configurations; + + } + + public DedupConfig loadConfig(final ISLookUpService isLookUpService, final String actionSetId, final Object o) + throws ISLookUpException { + final Element s = (Element) o; + final String configProfileId = s.attributeValue("id"); + final String conf = + isLookUpService.getResourceProfileByQuery(String.format( + "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", + configProfileId)); + final DedupConfig dedupConfig = DedupConfig.load(conf); + dedupConfig.getWf().setConfigurationId(actionSetId); + return dedupConfig; + } + } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkCreateDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkCreateDedupTest.java deleted file mode 100644 index d7fc3f694..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkCreateDedupTest.java +++ /dev/null @@ -1,77 +0,0 @@ -package eu.dnetlib.dhp.oa.dedup.dedup; - -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.dedup.SparkCreateConnectedComponent; -import eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord; -import eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; - -import java.io.IOException; - -public class SparkCreateDedupTest { - - String configuration; - String entity = "organization"; - - @BeforeEach - public void setUp() throws IOException { -// configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org.curr.conf.json")); - configuration = ""; - } - - @Disabled("must be parametrized to run locally") - public void createSimRelsTest() throws Exception { - SparkCreateSimRels.main(new String[]{ - "-mt", "local[*]", - "-i", "/Users/miconis/dumps", - "-o", "/tmp/dedup/rawset_test", - "-asi", "dedup-similarity-result-levenstein", - "-la", "lookupurl", - "-w", "workingPath" - }); - } - - @Disabled("must be parametrized to run locally") - public void createCCTest() throws Exception { - - SparkCreateConnectedComponent.main(new String[]{ - "-mt", "local[*]", - "-s", "/Users/miconis/dumps", - "-e", entity, - "-c", ArgumentApplicationParser.compressArgument(configuration), - "-t", "/tmp/dedup", - }); - } - - @Disabled("must be parametrized to run locally") - public void dedupRecordTest() throws Exception { - SparkCreateDedupRecord.main(new String[]{ - "-mt", "local[*]", - "-s", "/Users/miconis/dumps", - "-e", entity, - "-c", ArgumentApplicationParser.compressArgument(configuration), - "-d", "/tmp/dedup", - }); - } - - @Disabled("must be parametrized to run locally") - public void printConfiguration() throws Exception { - System.out.println(ArgumentApplicationParser.compressArgument(configuration)); - } - - @Disabled("must be parametrized to run locally") - public void testHashCode() { - final String s1 = "20|grid________::6031f94bef015a37783268ec1e75f17f"; - final String s2 = "20|nsf_________::b12be9edf414df8ee66b4c52a2d8da46"; - - final HashFunction hashFunction = Hashing.murmur3_128(); - - System.out.println(s1.hashCode()); - System.out.println(hashFunction.hashString(s1).asLong()); - System.out.println(s2.hashCode()); - System.out.println(hashFunction.hashString(s2).asLong()); - } -} diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkDedupTest.java new file mode 100644 index 000000000..c0ddaa842 --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkDedupTest.java @@ -0,0 +1,101 @@ +package eu.dnetlib.dhp.oa.dedup.dedup; + +import com.google.common.collect.Lists; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.dedup.SparkCreateConnectedComponent; +import eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord; +import eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.pace.config.DedupConfig; +import org.apache.commons.io.IOUtils; +import org.dom4j.DocumentException; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.junit.runner.RunWith; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.List; + +@RunWith(MockitoJUnitRunner.Silent.class) +public class SparkDedupTest { + + @Mock + SparkCreateSimRels sparkCreateSimRels; + + public List prepareConfigurations() throws IOException { + + return Lists.newArrayList( + DedupConfig.load(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))), + DedupConfig.load(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))) + ); + } + + @Before + public void setUp() throws IOException, ISLookUpException, DocumentException { + + when(sparkCreateSimRels.getConfigurations(anyString(), anyString())).thenReturn(prepareConfigurations()); + + } + + @Test + public void createSimRelsTest() throws Exception { + SparkCreateSimRels.main(new String[]{ + "-mt", "local[*]", + "-i", "/Users/miconis/dumps", + "-asi", "dedup-similarity-result-levenstein", + "-la", "lookupurl", + "-w", "workingPath" + }); + } + +// @Disabled("must be parametrized to run locally") +// public void createCCTest() throws Exception { +// +// SparkCreateConnectedComponent.main(new String[]{ +// "-mt", "local[*]", +// "-s", "/Users/miconis/dumps", +// "-e", entity, +// "-c", ArgumentApplicationParser.compressArgument(configuration), +// "-t", "/tmp/dedup", +// }); +// } +// +// @Disabled("must be parametrized to run locally") +// public void dedupRecordTest() throws Exception { +// SparkCreateDedupRecord.main(new String[]{ +// "-mt", "local[*]", +// "-s", "/Users/miconis/dumps", +// "-e", entity, +// "-c", ArgumentApplicationParser.compressArgument(configuration), +// "-d", "/tmp/dedup", +// }); +// } +// +// @Disabled("must be parametrized to run locally") +// public void printConfiguration() throws Exception { +// System.out.println(ArgumentApplicationParser.compressArgument(configuration)); +// } + + @Disabled("must be parametrized to run locally") + public void testHashCode() { + final String s1 = "20|grid________::6031f94bef015a37783268ec1e75f17f"; + final String s2 = "20|nsf_________::b12be9edf414df8ee66b4c52a2d8da46"; + + final HashFunction hashFunction = Hashing.murmur3_128(); + + System.out.println(s1.hashCode()); + System.out.println(hashFunction.hashString(s1).asLong()); + System.out.println(s2.hashCode()); + System.out.println(hashFunction.hashString(s2).asLong()); + } +}