diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java new file mode 100644 index 0000000000..37ee1d8b3a --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java @@ -0,0 +1,83 @@ +package eu.dnetlib.dhp.oa.dedup; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.dom4j.io.SAXReader; +import scala.xml.Elem; + +import java.io.IOException; +import java.io.Serializable; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.List; + +abstract class AbstractSparkAction implements Serializable { + + public ArgumentApplicationParser parser; //parameters for the spark action + public ISLookUpService isLookUpService; //lookup service to take dedupconfig + + public AbstractSparkAction(ArgumentApplicationParser parser, ISLookUpService isLookUpService) throws Exception { + + this.parser = parser; + this.isLookUpService = isLookUpService; + } + + public List getConfigurations(String orchestrator) throws ISLookUpException, DocumentException, IOException { + + final String xquery = String.format("/RESOURCE_PROFILE[.//DEDUPLICATION/ACTION_SET/@id = '%s']", orchestrator); + + String orchestratorProfile = isLookUpService.getResourceProfileByQuery(xquery); + + final Document doc = new SAXReader().read(new StringReader(orchestratorProfile)); + + final String actionSetId = doc.valueOf("//DEDUPLICATION/ACTION_SET/@id"); + + final List configurations = new ArrayList<>(); + + for (final Object o : doc.selectNodes("//SCAN_SEQUENCE/SCAN")) { + configurations.add(loadConfig(isLookUpService, actionSetId, o)); + } + + return configurations; + } + + public DedupConfig loadConfig(final ISLookUpService isLookUpService, final String actionSetId, final Object o) + throws ISLookUpException, IOException { + final Element s = (Element) o; + final String configProfileId = s.attributeValue("id"); + final String conf = + isLookUpService.getResourceProfileByQuery(String.format( + "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", + configProfileId)); + + DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); + dedupConfig.getPace().initModel(); + dedupConfig.getPace().initTranslationMap(); + dedupConfig.getWf().setConfigurationId(actionSetId); + + return dedupConfig; + } + + abstract void run() throws DocumentException, IOException, ISLookUpException; + + protected SparkSession getSparkSession(ArgumentApplicationParser parser) { + SparkConf conf = new SparkConf(); + + return SparkSession + .builder() + .appName(SparkCreateSimRels.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .getOrCreate(); + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index 6025a54c1b..000a96a56c 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -28,25 +28,31 @@ import org.dom4j.Element; import org.dom4j.io.SAXReader; import scala.Tuple2; +import java.io.IOException; import java.io.Serializable; import java.io.StringReader; import java.util.ArrayList; import java.util.List; -public class SparkCreateSimRels implements Serializable { +public class SparkCreateSimRels extends AbstractSparkAction { private static final Log log = LogFactory.getLog(SparkCreateSimRels.class); + public SparkCreateSimRels(ArgumentApplicationParser parser, ISLookUpService isLookUpService) throws Exception { + super(parser, isLookUpService); + } + public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( + ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils.toString( SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); parser.parseArgument(args); - new SparkCreateSimRels().run(parser); + new SparkCreateSimRels(parser, ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))).run(); } - private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException { + @Override + public void run() throws DocumentException, IOException, ISLookUpException { //read oozie parameters final String graphBasePath = parser.get("graphBasePath"); @@ -59,38 +65,35 @@ public class SparkCreateSimRels implements Serializable { System.out.println(String.format("actionSetId: '%s'", actionSetId)); System.out.println(String.format("workingPath: '%s'", workingPath)); - List configurations = getConfigurations(isLookUpUrl, actionSetId); - System.out.println("configurations = " + configurations.size()); + try (SparkSession spark = getSparkSession(parser)) { + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); -// try (SparkSession spark = getSparkSession(parser)) { -// final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); -// -// //for each dedup configuration -// for (DedupConfig dedupConf: getConfigurations(isLookUpUrl, actionSetId)) { -// final String entity = dedupConf.getWf().getEntityType(); -// final String subEntity = dedupConf.getWf().getSubEntityValue(); -// -// JavaPairRDD mapDocument = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) -// .mapToPair((PairFunction) s -> { -// MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); -// return new Tuple2(d.getIdentifier(), d); -// }); -// -// //create blocks for deduplication -// JavaPairRDD> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf); -// -// //create relations by comparing only elements in the same group -// final JavaPairRDD dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf); -// -// JavaRDD relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity)); -// -// //save the simrel in the workingdir -// spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class)) -// .write() -// .mode("overwrite") -// .save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)); -// } -// } + //for each dedup configuration + for (DedupConfig dedupConf: getConfigurations(actionSetId)) { + final String entity = dedupConf.getWf().getEntityType(); + final String subEntity = dedupConf.getWf().getSubEntityValue(); + + JavaPairRDD mapDocument = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) + .mapToPair((PairFunction) s -> { + MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); + return new Tuple2(d.getIdentifier(), d); + }); + + //create blocks for deduplication + JavaPairRDD> blocks = Deduper.createsortedBlocks(sc, mapDocument, dedupConf); + + //create relations by comparing only elements in the same group + final JavaPairRDD dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf); + + JavaRDD relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity)); + + //save the simrel in the workingdir + spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class)) + .write() + .mode("overwrite") + .save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)); + } + } } /** @@ -130,49 +133,4 @@ public class SparkCreateSimRels implements Serializable { } return r; } - - private static SparkSession getSparkSession(ArgumentApplicationParser parser) { - SparkConf conf = new SparkConf(); - - return SparkSession - .builder() - .appName(SparkCreateSimRels.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .getOrCreate(); - } - - public List getConfigurations(String isLookUpUrl, String orchestrator) throws ISLookUpException, DocumentException { - final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl); - - final String xquery = String.format("/RESOURCE_PROFILE[.//DEDUPLICATION/ACTION_SET/@id = '%s']", orchestrator); - - String orchestratorProfile = isLookUpService.getResourceProfileByQuery(xquery); - - final Document doc = new SAXReader().read(new StringReader(orchestratorProfile)); - - final String actionSetId = doc.valueOf("//DEDUPLICATION/ACTION_SET/@id"); - final List configurations = new ArrayList<>(); - - for (final Object o : doc.selectNodes("//SCAN_SEQUENCE/SCAN")) { - configurations.add(loadConfig(isLookUpService, actionSetId, o)); - } - - return configurations; - - } - - public DedupConfig loadConfig(final ISLookUpService isLookUpService, final String actionSetId, final Object o) - throws ISLookUpException { - final Element s = (Element) o; - final String configProfileId = s.attributeValue("id"); - final String conf = - isLookUpService.getResourceProfileByQuery(String.format( - "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", - configProfileId)); - final DedupConfig dedupConfig = DedupConfig.load(conf); - dedupConfig.getWf().setConfigurationId(actionSetId); - return dedupConfig; - } - } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/MergeAuthorTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/MergeAuthorTest.java similarity index 97% rename from dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/MergeAuthorTest.java rename to dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/MergeAuthorTest.java index a729eaa9d9..69edf63438 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/MergeAuthorTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/MergeAuthorTest.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.oa.dedup.dedup; +package eu.dnetlib.dhp.oa.dedup; import eu.dnetlib.dhp.oa.dedup.DedupUtility; import eu.dnetlib.dhp.schema.oaf.Publication; diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java similarity index 66% rename from dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkDedupTest.java rename to dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 2b5efbf4fe..421612265e 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -1,56 +1,67 @@ -package eu.dnetlib.dhp.oa.dedup.dedup; +package eu.dnetlib.dhp.oa.dedup; import com.google.common.collect.Lists; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; -import eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; import org.apache.commons.io.IOUtils; import org.dom4j.DocumentException; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; import java.util.List; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; @ExtendWith(MockitoExtension.class) public class SparkDedupTest { - @Mock - SparkCreateSimRels sparkCreateSimRels; + ISLookUpService isLookUpService = mock(ISLookUpService.class, withSettings().serializable()); - public List prepareConfigurations() throws IOException { - - return Lists.newArrayList( - DedupConfig.load(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))), - DedupConfig.load(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))) - ); - } @BeforeEach public void setUp() throws IOException, ISLookUpException, DocumentException { - when(sparkCreateSimRels.getConfigurations(anyString(), anyString())).thenReturn(prepareConfigurations()); + withSettings().serializable(); + when(isLookUpService.getResourceProfileByQuery(Mockito.contains("test-orchestrator"))) + .thenReturn(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml"))); + + when(isLookUpService.getResourceProfileByQuery(Mockito.contains("organization"))) + .thenReturn(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))); + + when(isLookUpService.getResourceProfileByQuery(Mockito.contains("publication"))) + .thenReturn(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))); } @Test public void createSimRelsTest() throws Exception { - SparkCreateSimRels.main(new String[]{ + + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString( + SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); + parser.parseArgument(new String[]{ "-mt", "local[*]", "-i", "/Users/miconis/dumps", - "-asi", "dedup-similarity-result-levenstein", + "-asi", "test-orchestrator", "-la", "lookupurl", - "-w", "workingPath" - }); + "-w", "workingPath"}); + + new SparkCreateSimRels(parser, isLookUpService).run(); + } // @Disabled("must be parametrized to run locally") @@ -93,4 +104,12 @@ public class SparkDedupTest { System.out.println(s2.hashCode()); System.out.println(hashFunction.hashString(s2).asLong()); } + + public List prepareConfigurations() throws IOException { + + return Lists.newArrayList( + DedupConfig.load(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))), + DedupConfig.load(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"))) + ); + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/jpath/JsonPathTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java similarity index 99% rename from dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/jpath/JsonPathTest.java rename to dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java index e1f92d867a..d72734ed5a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/dedup/jpath/JsonPathTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.oa.dedup.dedup.jpath; +package eu.dnetlib.dhp.oa.dedup.jpath; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml new file mode 100644 index 0000000000..9a4871223c --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/test/resources/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml @@ -0,0 +1,25 @@ + +
+ + + + + +
+ + + + + + + + + + + + + + + SECURITY_PARAMETERS + +
\ No newline at end of file