diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels2.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels2.java index 3fa7be3f7..3892bc2b0 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels2.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels2.java @@ -2,8 +2,13 @@ package eu.dnetlib.dedup; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.actionmanager.actions.AtomicAction; +import eu.dnetlib.actionmanager.common.Agent; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.util.MapDocumentUtil; @@ -17,47 +22,38 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.dom4j.io.SAXReader; import scala.Tuple2; -import eu.dnetlib.actionmanager.actions.AtomicAction; -import eu.dnetlib.actionmanager.common.Agent; import java.io.Serializable; -import java.util.Arrays; +import java.io.StringReader; +import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; public class SparkCreateSimRels2 implements Serializable { - final static String CONF_SEPARATOR = "@@@"; - private static final Log log = LogFactory.getLog(SparkCreateSimRels2.class); - public static List decompressConfs(String compressedConfs){ - - return Arrays.stream(compressedConfs.split(CONF_SEPARATOR)) - .map(ArgumentApplicationParser::decompressValue) - .map(DedupConfig::load) - .collect(Collectors.toList()); - } - public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json"))); - parser.parseArgument(args); - new SparkCreateSimRels2().run(parser, decompressConfs(parser.get("dedupConf"))); + new SparkCreateSimRels2().run(parser); } - private void run(ArgumentApplicationParser parser, List dedupConfs) { + private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException { //read oozie parameters - final String sourcePath = parser.get("sourcePath"); - final String targetPath = parser.get("targetPath"); - final String rawSetName = parser.get("rawSet"); + final String rawGraphBasePath = parser.get("rawGraphBasePath"); + final String rawSet = parser.get("rawSet"); final String agentId = parser.get("agentId"); final String agentName = parser.get("agentName"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); try (SparkSession spark = getSparkSession(parser)) { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); @@ -66,10 +62,11 @@ public class SparkCreateSimRels2 implements Serializable { JavaRDD> simRel = sc.emptyRDD(); //for each dedup configuration - for (DedupConfig dedupConf: dedupConfs) { + for (DedupConfig dedupConf: getConfigurations(isLookUpUrl, actionSetId)) { final String entity = dedupConf.getWf().getEntityType(); + final String subEntity = dedupConf.getWf().getSubEntityValue(); - JavaPairRDD mapDocument = sc.textFile(sourcePath + "/" + entity) + JavaPairRDD mapDocument = sc.textFile(rawGraphBasePath + "/" + subEntity) .mapToPair(s -> { MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); return new Tuple2<>(d.getIdentifier(), d); @@ -88,25 +85,35 @@ public class SparkCreateSimRels2 implements Serializable { .mapToPair(rel -> new Tuple2<>( createActionId(rel.getSource(), rel.getTarget(), entity), //TODO update the type, maybe take it from the configuration? - new AtomicAction(rawSetName, new Agent(agentId, agentName, Agent.AGENT_TYPE.service), rel.getSource(), "isSimilarTo", rel.getTarget(), new ObjectMapper().writeValueAsString(rel).getBytes()))) + new AtomicAction(rawSet, new Agent(agentId, agentName, Agent.AGENT_TYPE.service), rel.getSource(), "isSimilarTo", rel.getTarget(), new ObjectMapper().writeValueAsString(rel).getBytes()))) .map(aa -> new Tuple2<>(aa._1(), transformAction(aa._2()))); simRel = simRel.union(newSimRels); } - String targetDirectory = targetPath + "/" + rawSetName; - -// simRel.map(s -> s._1().toString()).saveAsTextFile(targetDirectory); - simRel.mapToPair(r -> r) - .saveAsHadoopFile(targetDirectory, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + .saveAsHadoopFile(rawSet, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); } } - public Text createActionId(String source, String target, String type) { + public Text createActionId(String source, String target, String entity) { + + String type = ""; + + switch(entity){ + case "result": + type = "resultResult_dedupSimilarity_isSimilarTo"; + break; + case "organization": + type = "organizationOrganization_dedupSimilarity_isSimilarTo"; + break; + default: + break; + } + String id = source + "@" + type + "@" + target; return new Text(id); @@ -135,8 +142,43 @@ public class SparkCreateSimRels2 implements Serializable { .appName(SparkCreateSimRels2.class.getSimpleName()) .master(parser.get("master")) .config(conf) -// .enableHiveSupport() + .enableHiveSupport() .getOrCreate(); } + public List getConfigurations(String isLookUpUrl, String orchestrator) throws ISLookUpException, DocumentException { + final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl); + + final String xquery = String.format("/RESOURCE_PROFILE[.//DEDUPLICATION/ACTION_SET/@id = '%s']", orchestrator); + log.info("loading dedup orchestration: " + xquery); + + String orchestratorProfile = isLookUpService.getResourceProfileByQuery(xquery); + + final Document doc = new SAXReader().read(new StringReader(orchestratorProfile)); + + final String actionSetId = doc.valueOf("//DEDUPLICATION/ACTION_SET/@id"); + final List configurations = new ArrayList<>(); + + for (final Object o : doc.selectNodes("//SCAN_SEQUENCE/SCAN")) { + configurations.add(loadConfig(isLookUpService, actionSetId, o)); + } + + return configurations; + + } + + private DedupConfig loadConfig(final ISLookUpService isLookUpService, final String actionSetId, final Object o) + throws ISLookUpException { + final Element s = (Element) o; + final String configProfileId = s.attributeValue("id"); + final String conf = + isLookUpService.getResourceProfileByQuery(String.format( + "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", + configProfileId)); + log.debug("loaded dedup configuration from IS profile: " + conf); + final DedupConfig dedupConfig = DedupConfig.load(conf); + dedupConfig.getWf().setConfigurationId(actionSetId); + return dedupConfig; + } + } diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_parameters.json b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_parameters.json index 9bdddef8a..1582739d4 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_parameters.json +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_parameters.json @@ -6,33 +6,27 @@ "paramRequired": true }, { - "paramName": "s", - "paramLongName": "sourcePath", + "paramName": "la", + "paramLongName": "isLookUpUrl", + "paramDescription": "address for the LookUp", + "paramRequired": true + }, + { + "paramName": "asi", + "paramLongName": "actionSetId", + "paramDescription": "action set identifier (name of the orchestrator)", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "rawGraphBasePath", "paramDescription": "the base path of the raw graph", "paramRequired": true }, { - "paramName": "e", - "paramLongName": "entity", - "paramDescription": "the type of entity to be deduped (directory in the sourcePath)", - "paramRequired": true - }, - { - "paramName": "c", - "paramLongName": "dedupConf", - "paramDescription": "list of dedup configuration to be used", - "paramRequired": true - }, - { - "paramName": "t", - "paramLongName": "targetPath", - "paramDescription": "target base path to save dedup result (actions)", - "paramRequired": true - }, - { - "paramName": "rs", + "paramName": "o", "paramLongName": "rawSet", - "paramDescription": "the raw set to be saved (directory in the targetPath)", + "paramDescription": "the raw set to be saved (full path)", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/DuplicateScanWf.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/DuplicateScanWf.xml index 1dede2c70..5daa12ce5 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/DuplicateScanWf.xml +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/DuplicateScanWf.xml @@ -1,19 +1,11 @@ - sourcePath + rawGraphBasePath the raw graph base path - entity - the entity that should be processed - - - dedupConf - the (list of) dedup Configuration(s) - - - targetPath + actionSetBasePath the output base path @@ -28,6 +20,14 @@ agentName the agent name + + isLookUpUrl + the address of the lookUp service + + + actionSetId + id of the actionSet + sparkDriverMemory memory for driver process @@ -72,13 +72,12 @@ spark.sql.warehouse.dir="/user/hive/warehouse" -mtyarn-cluster - --sourcePath${sourcePath} - --targetPath${targetPath} - --entity${entity} - --dedupConf${dedupConf} - --rawSet${rawSet} - --agentId${agentId} - --agentName${agentName} + --i${rawGraphBasePath} + --o${rawSet} + --ai${agentId} + --an${agentName} + --la${isLookUpUrl} + --asi${actionSetId} diff --git a/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java b/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java index 12bba7c1e..abb00d27c 100644 --- a/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java +++ b/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dedup/SparkCreateDedupTest.java @@ -20,13 +20,11 @@ import java.util.Set; public class SparkCreateDedupTest { String configuration; - String configuration2; - String entity = "publication"; + String entity = "organization"; @Before public void setUp() throws IOException { - configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org1.curr.conf.json")); - configuration2 = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org2.curr.conf.json")); + configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org.curr.conf.json")); } @Test @@ -48,11 +46,12 @@ public class SparkCreateDedupTest { "-mt", "local[*]", "-s", "/Users/miconis/dumps", "-e", entity, - "-c", ArgumentApplicationParser.compressArgument(configuration) + "@@@" + ArgumentApplicationParser.compressArgument(configuration2), - "-t", "/tmp/dedup", - "-rs", "rawset_test", + "-c", ArgumentApplicationParser.compressArgument(configuration), + "-rs", "/tmp/dedup/rawset_test", "-ai", "agentId", - "-an", "agentName" + "-an", "agentName", + "-asi", "dedup-similarity-result-levenstein", + "-la", "lookupurl", }); } diff --git a/dhp-workflows/dhp-dedup/src/test/resources/eu/dnetlib/dedup/conf/org.curr.conf.json b/dhp-workflows/dhp-dedup/src/test/resources/eu/dnetlib/dedup/conf/org.curr.conf.json index 31b200c72..726f2b899 100644 --- a/dhp-workflows/dhp-dedup/src/test/resources/eu/dnetlib/dedup/conf/org.curr.conf.json +++ b/dhp-workflows/dhp-dedup/src/test/resources/eu/dnetlib/dedup/conf/org.curr.conf.json @@ -3,6 +3,7 @@ "threshold" : "0.99", "dedupRun" : "001", "entityType" : "organization", + "subEntityValue": "organization", "orderField" : "legalname", "queueMaxSize" : "2000", "groupMaxSize" : "50",