From 8e8b5e8f30a86b16f90209af94e0dff770fe0fac Mon Sep 17 00:00:00 2001 From: miconis Date: Tue, 24 Mar 2020 17:40:58 +0100 Subject: [PATCH] roots wf merged in scan wf --- .../dnetlib/dhp/dedup/SparkCreateSimRels.java | 17 +-- .../dnetlib/dhp/dedup/SparkUpdateEntity.java | 4 +- .../dedup/consistency/oozie_app/workflow.xml | 29 +++-- .../dhp/dedup/createCC_parameters.json | 6 - .../dhp/dedup/createSimRels_parameters.json | 2 +- .../dedup/roots/oozie_app/config-default.xml | 18 --- .../dhp/dedup/roots/oozie_app/workflow.xml | 115 ------------------ .../dhp/dedup/scan/oozie_app/workflow.xml | 63 +++++++++- .../dhp/dedup/SparkCreateDedupTest.java | 11 +- 9 files changed, 93 insertions(+), 172 deletions(-) delete mode 100644 dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/roots/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/roots/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkCreateSimRels.java index 18d0d4ee6..8c0efdcad 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkCreateSimRels.java @@ -42,14 +42,14 @@ public class SparkCreateSimRels implements Serializable { //read oozie parameters final String graphBasePath = parser.get("graphBasePath"); - final String rawSet = parser.get("rawSet"); final String isLookUpUrl = parser.get("isLookUpUrl"); + final String rawSet = parser.get("rawSet"); final String actionSetId = parser.get("actionSetId"); final String workingPath = parser.get("workingPath"); System.out.println(String.format("graphBasePath: '%s'", graphBasePath)); - System.out.println(String.format("rawSet: '%s'", rawSet)); System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl)); + System.out.println(String.format("rawSet: '%s'", rawSet)); System.out.println(String.format("actionSetId: '%s'", actionSetId)); System.out.println(String.format("workingPath: '%s'", workingPath)); @@ -84,14 +84,17 @@ public class SparkCreateSimRels implements Serializable { .mode("overwrite") .save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)); - //create atomic actions - JavaRDD> newSimRels = relationsRDD - .map(this::createSequenceFileRow); + if (rawSet != null) { + //create atomic actions + JavaRDD> newSimRels = relationsRDD + .map(this::createSequenceFileRow); - simRel = simRel.union(newSimRels); + simRel = simRel.union(newSimRels); + } } - simRel.mapToPair(r -> r) + if (rawSet != null) + simRel.mapToPair(r -> r) .saveAsHadoopFile(rawSet, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); } diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkUpdateEntity.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkUpdateEntity.java index dd079e4cd..b8b41d217 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkUpdateEntity.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dhp/dedup/SparkUpdateEntity.java @@ -21,8 +21,9 @@ import org.apache.spark.sql.SparkSession; import scala.Tuple2; import java.io.IOException; +import java.io.Serializable; -public class SparkUpdateEntity { +public class SparkUpdateEntity implements Serializable { final String IDJSONPATH = "$.id"; @@ -82,6 +83,7 @@ public class SparkUpdateEntity { JavaRDD map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), getOafClass(entity)) : k._2()._1()); sourceEntity = map.union(dedupEntity); + } sourceEntity.saveAsTextFile(dedupGraphPath + "/" + entity, GzipCodec.class); diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/consistency/oozie_app/workflow.xml index e14fa7c55..4386b2ea1 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/consistency/oozie_app/workflow.xml @@ -4,14 +4,6 @@ graphBasePath the raw graph base path - - isLookUpUrl - the address of the lookUp service - - - actionSetId - id of the actionSet - workingPath path of the working directory @@ -34,6 +26,21 @@ + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + + @@ -45,11 +52,9 @@ - ${jobTracker} - ${nameNode} yarn-cluster cluster - Create Dedup Record + Update Entity eu.dnetlib.dhp.dedup.SparkUpdateEntity dhp-dedup-${projectVersion}.jar @@ -74,8 +79,6 @@ - ${jobTracker} - ${nameNode} yarn-cluster cluster Update Relations diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createCC_parameters.json b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createCC_parameters.json index bcd2ff974..42ef2b78e 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createCC_parameters.json +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createCC_parameters.json @@ -17,12 +17,6 @@ "paramDescription": "the base path of the raw graph", "paramRequired": true }, - { - "paramName": "o", - "paramLongName": "rawSet", - "paramDescription": "the raw set to be saved (full path)", - "paramRequired": true - }, { "paramName": "la", "paramLongName": "isLookUpUrl", diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createSimRels_parameters.json b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createSimRels_parameters.json index b8c8af699..9eb08a29b 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createSimRels_parameters.json +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/createSimRels_parameters.json @@ -27,7 +27,7 @@ "paramName": "o", "paramLongName": "rawSet", "paramDescription": "the raw set to be saved (full path)", - "paramRequired": true + "paramRequired": false }, { "paramName": "w", diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/roots/oozie_app/config-default.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/roots/oozie_app/config-default.xml deleted file mode 100644 index 2e0ed9aee..000000000 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/roots/oozie_app/config-default.xml +++ /dev/null @@ -1,18 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/roots/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/roots/oozie_app/workflow.xml deleted file mode 100644 index 49b396995..000000000 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/roots/oozie_app/workflow.xml +++ /dev/null @@ -1,115 +0,0 @@ - - - - graphBasePath - the raw graph base path - - - isLookUpUrl - the address of the lookUp service - - - actionSetId - id of the actionSet - - - workingPath - path of the working directory - - - dedupGraphPath - path of the dedup graph - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - ${jobTracker} - ${nameNode} - - - mapreduce.job.queuename - ${queueName} - - - oozie.launcher.mapred.job.queue.name - ${oozieLauncherQueueName} - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - yarn-cluster - cluster - Create Merge Relations - eu.dnetlib.dhp.dedup.SparkCreateConnectedComponent - dhp-dedup-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores ${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" - --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" - --conf spark.sql.warehouse.dir="/user/hive/warehouse" - - -mtyarn-cluster - --i${graphBasePath} - --w${workingPath} - --la${isLookUpUrl} - --asi${actionSetId} - - - - - - - - - - - yarn-cluster - cluster - Create Dedup Record - eu.dnetlib.dhp.dedup.SparkCreateDedupRecord - dhp-dedup-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores ${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" - --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" - --conf spark.sql.warehouse.dir="/user/hive/warehouse" - - -mtyarn-cluster - --i${graphBasePath} - --w${workingPath} - --la${isLookUpUrl} - --asi${actionSetId} - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/scan/oozie_app/workflow.xml index c4198a5c5..dc2263263 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/scan/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + graphBasePath @@ -49,13 +49,13 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + @@ -75,11 +75,66 @@ -mtyarn --i${graphBasePath} - --o${rawSet} --la${isLookUpUrl} --asi${actionSetId} --w${workingPath} + + + + + + + + + + yarn-cluster + cluster + Create Merge Relations + eu.dnetlib.dhp.dedup.SparkCreateConnectedComponent + dhp-dedup-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" + --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + --conf spark.sql.warehouse.dir="/user/hive/warehouse" + + -mtyarn-cluster + --i${graphBasePath} + --w${workingPath} + --la${isLookUpUrl} + --asi${actionSetId} + + + + + + + + + + + yarn-cluster + cluster + Create Dedup Record + eu.dnetlib.dhp.dedup.SparkCreateDedupRecord + dhp-dedup-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" + --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + --conf spark.sql.warehouse.dir="/user/hive/warehouse" + + -mtyarn-cluster + --i${graphBasePath} + --w${workingPath} + --la${isLookUpUrl} + --asi${actionSetId} + diff --git a/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dhp/dedup/SparkCreateDedupTest.java b/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dhp/dedup/SparkCreateDedupTest.java index ebc139867..b1be5795e 100644 --- a/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dhp/dedup/SparkCreateDedupTest.java +++ b/dhp-workflows/dhp-dedup/src/test/java/eu/dnetlib/dhp/dedup/SparkCreateDedupTest.java @@ -25,17 +25,14 @@ public class SparkCreateDedupTest { @Test @Ignore - public void createSimRelsTest2() throws Exception { + public void createSimRelsTest() throws Exception { SparkCreateSimRels.main(new String[]{ "-mt", "local[*]", - "-s", "/Users/miconis/dumps", - "-e", entity, - "-c", ArgumentApplicationParser.compressArgument(configuration), - "-rs", "/tmp/dedup/rawset_test", - "-ai", "agentId", - "-an", "agentName", + "-i", "/Users/miconis/dumps", + "-o", "/tmp/dedup/rawset_test", "-asi", "dedup-similarity-result-levenstein", "-la", "lookupurl", + "-w", "workingPath" }); }