From 4e82a24af29bec443f30c37cbc0ed56d3051c874 Mon Sep 17 00:00:00 2001 From: miconis Date: Thu, 19 Mar 2020 15:01:07 +0100 Subject: [PATCH] minor changes and implementation of the create connected components action --- .../java/eu/dnetlib/dedup/DedupUtility.java | 81 +++++++------ .../dedup/SparkCreateConnectedComponent.java | 9 +- .../dedup/SparkCreateConnectedComponent2.java | 100 +++++++++++++++++ .../eu/dnetlib/dedup/SparkCreateSimRels.java | 4 +- .../eu/dnetlib/dedup/SparkCreateSimRels2.java | 106 ++++++------------ .../dhp/dedup/createCC_parameters.json | 38 +++++++ ...ers.json => createSimRels_parameters.json} | 12 +- .../dhp/dedup/oozie_app/DuplicateScanWf.xml | 26 ++--- 8 files changed, 231 insertions(+), 145 deletions(-) create mode 100644 dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent2.java create mode 100644 dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createCC_parameters.json rename dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/{dedup_parameters.json => createSimRels_parameters.json} (77%) diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java index 3bed74f86..94a328533 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java @@ -4,6 +4,9 @@ import com.google.common.collect.Sets; import com.wcohen.ss.JaroWinkler; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner; import eu.dnetlib.pace.config.DedupConfig; @@ -20,9 +23,14 @@ import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.util.LongAccumulator; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.dom4j.io.SAXReader; import scala.Tuple2; import java.io.IOException; +import java.io.StringReader; import java.io.StringWriter; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; @@ -54,38 +62,6 @@ public class DedupUtility { return accumulators; } - public static JavaRDD loadDataFromHDFS(String path, JavaSparkContext context) { - return context.textFile(path); - } - - public static void deleteIfExists(String path) throws IOException { - Configuration conf = new Configuration(); - FileSystem fileSystem = FileSystem.get(conf); - if (fileSystem.exists(new Path(path))) { - fileSystem.delete(new Path(path), true); - } - } - - public static DedupConfig loadConfigFromHDFS(String path) throws IOException { - - Configuration conf = new Configuration(); - FileSystem fileSystem = FileSystem.get(conf); - FSDataInputStream inputStream = new FSDataInputStream(fileSystem.open(new Path(path))); - - return DedupConfig.load(IOUtils.toString(inputStream, StandardCharsets.UTF_8.name())); - - } - - static String readFromClasspath(final String filename, final Class clazz) { - final StringWriter sw = new StringWriter(); - try { - IOUtils.copy(clazz.getResourceAsStream(filename), sw); - return sw.toString(); - } catch (final IOException e) { - throw new RuntimeException("cannot load resource from classpath: " + filename); - } - } - static Set getGroupingKeys(DedupConfig conf, MapDocument doc) { return Sets.newHashSet(BlacklistAwareClusteringCombiner.filterAndCombine(doc, conf)); } @@ -150,12 +126,12 @@ public class DedupUtility { return String.format("%s/%s", basePath, entityType); } - public static String createSimRelPath(final String basePath, final String entityType) { - return String.format("%s/%s_simRel", basePath, entityType); + public static String createSimRelPath(final String basePath, final String actionSetId,final String entityType) { + return String.format("%s/%s/%s_simrel", basePath, actionSetId, entityType); } - public static String createMergeRelPath(final String basePath, final String entityType) { - return String.format("%s/%s_mergeRel", basePath, entityType); + public static String createMergeRelPath(final String basePath, final String actionSetId, final String entityType) { + return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType); } private static Double sim(Author a, Author b) { @@ -216,4 +192,37 @@ public class DedupUtility { return false; return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue())); } + + public static List getConfigurations(String isLookUpUrl, String orchestrator) throws ISLookUpException, DocumentException { + final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl); + + final String xquery = String.format("/RESOURCE_PROFILE[.//DEDUPLICATION/ACTION_SET/@id = '%s']", orchestrator); + + String orchestratorProfile = isLookUpService.getResourceProfileByQuery(xquery); + + final Document doc = new SAXReader().read(new StringReader(orchestratorProfile)); + + final String actionSetId = doc.valueOf("//DEDUPLICATION/ACTION_SET/@id"); + final List configurations = new ArrayList<>(); + + for (final Object o : doc.selectNodes("//SCAN_SEQUENCE/SCAN")) { + configurations.add(loadConfig(isLookUpService, actionSetId, o)); + } + + return configurations; + + } + + private static DedupConfig loadConfig(final ISLookUpService isLookUpService, final String actionSetId, final Object o) + throws ISLookUpException { + final Element s = (Element) o; + final String configProfileId = s.attributeValue("id"); + final String conf = + isLookUpService.getResourceProfileByQuery(String.format( + "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", + configProfileId)); + final DedupConfig dedupConfig = DedupConfig.load(conf); + dedupConfig.getWf().setConfigurationId(actionSetId); + return dedupConfig; + } } diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java index 16e112c25..bdfd2c572 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java @@ -1,8 +1,5 @@ package eu.dnetlib.dedup; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import eu.dnetlib.dedup.graph.ConnectedComponent; import eu.dnetlib.dedup.graph.GraphProcessor; @@ -29,7 +26,7 @@ import java.util.List; public class SparkCreateConnectedComponent { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json"))); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createSimRels_parameters.json"))); parser.parseArgument(args); final SparkSession spark = SparkSession .builder() @@ -50,7 +47,7 @@ public class SparkCreateConnectedComponent { s -> new Tuple2(getHashcode(s), s) ); - final Dataset similarityRelations = spark.read().load(DedupUtility.createSimRelPath(targetPath,entity)).as(Encoders.bean(Relation.class)); + final Dataset similarityRelations = spark.read().load(DedupUtility.createSimRelPath(targetPath, "",entity)).as(Encoders.bean(Relation.class)); final RDD> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd(); final JavaRDD cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD(); final Dataset mergeRelation = spark.createDataset(cc.filter(k->k.getDocIds().size()>1).flatMap((FlatMapFunction) c -> @@ -70,7 +67,7 @@ public class SparkCreateConnectedComponent { tmp.add(r); return tmp.stream(); }).iterator()).rdd(), Encoders.bean(Relation.class)); - mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(targetPath,entity)); + mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(targetPath,"",entity)); } public static long getHashcode(final String id) { diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent2.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent2.java new file mode 100644 index 000000000..ad3f6efc0 --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent2.java @@ -0,0 +1,100 @@ +package eu.dnetlib.dedup; + +import com.google.common.hash.Hashing; +import eu.dnetlib.dedup.graph.ConnectedComponent; +import eu.dnetlib.dedup.graph.GraphProcessor; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.util.MapDocumentUtil; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.graphx.Edge; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.List; + +public class SparkCreateConnectedComponent2 { + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createCC_parameters.json"))); + parser.parseArgument(args); + + new SparkCreateConnectedComponent2().run(parser); + } + + private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException { + + final String graphBasePath = parser.get("graphBasePath"); + final String workingPath = parser.get("workingPath"); + final String isLookUpUrl = parser.get("isLookUpUrl"); + final String actionSetId = parser.get("actionSetId"); + + try (SparkSession spark = getSparkSession(parser)) { + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + + for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) { + final String entity = dedupConf.getWf().getEntityType(); + final String subEntity = dedupConf.getWf().getSubEntityValue(); + + final JavaPairRDD vertexes = sc.textFile(graphBasePath + "/" + subEntity) + .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) + .mapToPair((PairFunction) + s -> new Tuple2(getHashcode(s), s) + ); + + final Dataset similarityRelations = spark.read().load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)).as(Encoders.bean(Relation.class)); + final RDD> edgeRdd = similarityRelations.javaRDD().map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())).rdd(); + final JavaRDD cc = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD(); + final Dataset mergeRelation = spark.createDataset(cc.filter(k -> k.getDocIds().size() > 1).flatMap((FlatMapFunction) c -> + c.getDocIds() + .stream() + .flatMap(id -> { + List tmp = new ArrayList<>(); + Relation r = new Relation(); + r.setSource(c.getCcId()); + r.setTarget(id); + r.setRelClass("merges"); + tmp.add(r); + r = new Relation(); + r.setTarget(c.getCcId()); + r.setSource(id); + r.setRelClass("isMergedIn"); + tmp.add(r); + return tmp.stream(); + }).iterator()).rdd(), Encoders.bean(Relation.class)); + mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(workingPath, actionSetId, entity)); + } + } + } + + public static long getHashcode(final String id) { + return Hashing.murmur3_128().hashUnencodedChars(id).asLong(); + } + + private static SparkSession getSparkSession(ArgumentApplicationParser parser) { + SparkConf conf = new SparkConf(); + + return SparkSession + .builder() + .appName(SparkCreateSimRels2.class.getSimpleName()) + .master(parser.get("master")) + .config(conf) + .enableHiveSupport() + .getOrCreate(); + } +} diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java index 831e45daf..543dae8e9 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels.java @@ -1,6 +1,5 @@ package eu.dnetlib.dedup; -import com.google.common.hash.Hashing; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.pace.config.DedupConfig; @@ -10,7 +9,6 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import scala.Tuple2; @@ -29,7 +27,7 @@ import java.util.List; public class SparkCreateSimRels { public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json"))); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createSimRels_parameters.json"))); parser.parseArgument(args); final SparkSession spark = SparkSession .builder() diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels2.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels2.java index 3892bc2b0..4f5458a24 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels2.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateSimRels2.java @@ -22,6 +22,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.dom4j.Document; import org.dom4j.DocumentException; @@ -39,7 +40,7 @@ public class SparkCreateSimRels2 implements Serializable { private static final Log log = LogFactory.getLog(SparkCreateSimRels2.class); public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json"))); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/createSimRels_parameters.json"))); parser.parseArgument(args); new SparkCreateSimRels2().run(parser); @@ -48,12 +49,11 @@ public class SparkCreateSimRels2 implements Serializable { private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException { //read oozie parameters - final String rawGraphBasePath = parser.get("rawGraphBasePath"); + final String graphBasePath = parser.get("graphBasePath"); final String rawSet = parser.get("rawSet"); - final String agentId = parser.get("agentId"); - final String agentName = parser.get("agentName"); final String isLookUpUrl = parser.get("isLookUpUrl"); final String actionSetId = parser.get("actionSetId"); + final String workingPath = parser.get("workingPath"); try (SparkSession spark = getSparkSession(parser)) { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); @@ -62,11 +62,11 @@ public class SparkCreateSimRels2 implements Serializable { JavaRDD> simRel = sc.emptyRDD(); //for each dedup configuration - for (DedupConfig dedupConf: getConfigurations(isLookUpUrl, actionSetId)) { + for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) { final String entity = dedupConf.getWf().getEntityType(); final String subEntity = dedupConf.getWf().getSubEntityValue(); - JavaPairRDD mapDocument = sc.textFile(rawGraphBasePath + "/" + subEntity) + JavaPairRDD mapDocument = sc.textFile(graphBasePath + "/" + subEntity) .mapToPair(s -> { MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); return new Tuple2<>(d.getIdentifier(), d); @@ -78,59 +78,54 @@ public class SparkCreateSimRels2 implements Serializable { //create relations by comparing only elements in the same group final JavaPairRDD dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf); - JavaRDD relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2())); + JavaRDD relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity)); + + //save the simrel in the workingdir + spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class)).write().mode("overwrite").save( DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)); //create atomic actions JavaRDD> newSimRels = relationsRDD - .mapToPair(rel -> - new Tuple2<>( - createActionId(rel.getSource(), rel.getTarget(), entity), //TODO update the type, maybe take it from the configuration? - new AtomicAction(rawSet, new Agent(agentId, agentName, Agent.AGENT_TYPE.service), rel.getSource(), "isSimilarTo", rel.getTarget(), new ObjectMapper().writeValueAsString(rel).getBytes()))) - .map(aa -> new Tuple2<>(aa._1(), transformAction(aa._2()))); + .map(this::createSequenceFileRow); simRel = simRel.union(newSimRels); - } simRel.mapToPair(r -> r) .saveAsHadoopFile(rawSet, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); - } } - public Text createActionId(String source, String target, String entity) { - - String type = ""; - - switch(entity){ - case "result": - type = "resultResult_dedupSimilarity_isSimilarTo"; - break; - case "organization": - type = "organizationOrganization_dedupSimilarity_isSimilarTo"; - break; - default: - break; - } - - String id = source + "@" + type + "@" + target; - - return new Text(id); - } - - public Text transformAction(AtomicAction aa) throws JsonProcessingException { + public Tuple2 createSequenceFileRow(Relation relation) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); - return new Text(mapper.writeValueAsString(aa)); + String id = relation.getSource() + "@" + relation.getRelClass() + "@" + relation.getTarget(); + //TODO do be replaced by the new implementation of AtomicAction + AtomicAction aa = new AtomicAction("rawSet", new Agent("agentId", "agentName", Agent.AGENT_TYPE.service), relation.getSource(), relation.getRelClass(), relation.getTarget(), new ObjectMapper().writeValueAsString(relation).getBytes()); + + return new Tuple2<>( + new Text(id), + new Text(mapper.writeValueAsString(aa)) + ); } - public Relation createSimRel(String source, String target){ + public Relation createSimRel(String source, String target, String entity){ final Relation r = new Relation(); r.setSource(source); r.setTarget(target); - r.setRelClass("isSimilarTo"); + + switch(entity){ + case "result": + r.setRelClass("resultResult_dedupSimilarity_isSimilarTo"); + break; + case "organization": + r.setRelClass("organizationOrganization_dedupSimilarity_isSimilarTo"); + break; + default: + r.setRelClass("isSimilarTo"); + break; + } return r; } @@ -146,39 +141,4 @@ public class SparkCreateSimRels2 implements Serializable { .getOrCreate(); } - public List getConfigurations(String isLookUpUrl, String orchestrator) throws ISLookUpException, DocumentException { - final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl); - - final String xquery = String.format("/RESOURCE_PROFILE[.//DEDUPLICATION/ACTION_SET/@id = '%s']", orchestrator); - log.info("loading dedup orchestration: " + xquery); - - String orchestratorProfile = isLookUpService.getResourceProfileByQuery(xquery); - - final Document doc = new SAXReader().read(new StringReader(orchestratorProfile)); - - final String actionSetId = doc.valueOf("//DEDUPLICATION/ACTION_SET/@id"); - final List configurations = new ArrayList<>(); - - for (final Object o : doc.selectNodes("//SCAN_SEQUENCE/SCAN")) { - configurations.add(loadConfig(isLookUpService, actionSetId, o)); - } - - return configurations; - - } - - private DedupConfig loadConfig(final ISLookUpService isLookUpService, final String actionSetId, final Object o) - throws ISLookUpException { - final Element s = (Element) o; - final String configProfileId = s.attributeValue("id"); - final String conf = - isLookUpService.getResourceProfileByQuery(String.format( - "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", - configProfileId)); - log.debug("loaded dedup configuration from IS profile: " + conf); - final DedupConfig dedupConfig = DedupConfig.load(conf); - dedupConfig.getWf().setConfigurationId(actionSetId); - return dedupConfig; - } - } diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createCC_parameters.json b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createCC_parameters.json new file mode 100644 index 000000000..bcd2ff974 --- /dev/null +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createCC_parameters.json @@ -0,0 +1,38 @@ +[ + { + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName": "asi", + "paramLongName": "actionSetId", + "paramDescription": "action set identifier (name of the orchestrator)", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "graphBasePath", + "paramDescription": "the base path of the raw graph", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "rawSet", + "paramDescription": "the raw set to be saved (full path)", + "paramRequired": true + }, + { + "paramName": "la", + "paramLongName": "isLookUpUrl", + "paramDescription": "the url for the lookup service", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "path for the working directory", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_parameters.json b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createSimRels_parameters.json similarity index 77% rename from dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_parameters.json rename to dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createSimRels_parameters.json index 1582739d4..83a030159 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/dedup_parameters.json +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createSimRels_parameters.json @@ -30,15 +30,9 @@ "paramRequired": true }, { - "paramName": "ai", - "paramLongName": "agentId", - "paramDescription": "the agent identifier", - "paramRequired": true - }, - { - "paramName": "an", - "paramLongName": "agentName", - "paramDescription": "the agent name", + "paramName": "w", + "paramLongName": "workingPath", + "paramDescription": "path of the working directory", "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/DuplicateScanWf.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/DuplicateScanWf.xml index 5daa12ce5..5ab6c9e47 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/DuplicateScanWf.xml +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/DuplicateScanWf.xml @@ -1,25 +1,13 @@ - rawGraphBasePath + graphBasePath the raw graph base path - - actionSetBasePath - the output base path - rawSet the output directory in the targetPath - - agentId - the agent identifier - - - agentName - the agent name - isLookUpUrl the address of the lookUp service @@ -28,6 +16,10 @@ actionSetId id of the actionSet + + workingPath + path for the working directory + sparkDriverMemory memory for driver process @@ -42,15 +34,15 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + - + @@ -74,8 +66,6 @@ -mtyarn-cluster --i${rawGraphBasePath} --o${rawSet} - --ai${agentId} - --an${agentName} --la${isLookUpUrl} --asi${actionSetId}