roots wf merged in scan wf

This commit is contained in:
miconis 2020-03-24 17:40:58 +01:00
parent 51ff68db66
commit 8e8b5e8f30
9 changed files with 93 additions and 172 deletions

View File

@ -42,14 +42,14 @@ public class SparkCreateSimRels implements Serializable {
//read oozie parameters //read oozie parameters
final String graphBasePath = parser.get("graphBasePath"); final String graphBasePath = parser.get("graphBasePath");
final String rawSet = parser.get("rawSet");
final String isLookUpUrl = parser.get("isLookUpUrl"); final String isLookUpUrl = parser.get("isLookUpUrl");
final String rawSet = parser.get("rawSet");
final String actionSetId = parser.get("actionSetId"); final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
System.out.println(String.format("graphBasePath: '%s'", graphBasePath)); System.out.println(String.format("graphBasePath: '%s'", graphBasePath));
System.out.println(String.format("rawSet: '%s'", rawSet));
System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl)); System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl));
System.out.println(String.format("rawSet: '%s'", rawSet));
System.out.println(String.format("actionSetId: '%s'", actionSetId)); System.out.println(String.format("actionSetId: '%s'", actionSetId));
System.out.println(String.format("workingPath: '%s'", workingPath)); System.out.println(String.format("workingPath: '%s'", workingPath));
@ -84,14 +84,17 @@ public class SparkCreateSimRels implements Serializable {
.mode("overwrite") .mode("overwrite")
.save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)); .save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity));
//create atomic actions if (rawSet != null) {
JavaRDD<Tuple2<Text, Text>> newSimRels = relationsRDD //create atomic actions
.map(this::createSequenceFileRow); JavaRDD<Tuple2<Text, Text>> newSimRels = relationsRDD
.map(this::createSequenceFileRow);
simRel = simRel.union(newSimRels); simRel = simRel.union(newSimRels);
}
} }
simRel.mapToPair(r -> r) if (rawSet != null)
simRel.mapToPair(r -> r)
.saveAsHadoopFile(rawSet, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); .saveAsHadoopFile(rawSet, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
} }

View File

@ -21,8 +21,9 @@ import org.apache.spark.sql.SparkSession;
import scala.Tuple2; import scala.Tuple2;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable;
public class SparkUpdateEntity { public class SparkUpdateEntity implements Serializable {
final String IDJSONPATH = "$.id"; final String IDJSONPATH = "$.id";
@ -82,6 +83,7 @@ public class SparkUpdateEntity {
JavaRDD<String> map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), getOafClass(entity)) : k._2()._1()); JavaRDD<String> map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), getOafClass(entity)) : k._2()._1());
sourceEntity = map.union(dedupEntity); sourceEntity = map.union(dedupEntity);
} }
sourceEntity.saveAsTextFile(dedupGraphPath + "/" + entity, GzipCodec.class); sourceEntity.saveAsTextFile(dedupGraphPath + "/" + entity, GzipCodec.class);

View File

@ -4,14 +4,6 @@
<name>graphBasePath</name> <name>graphBasePath</name>
<description>the raw graph base path</description> <description>the raw graph base path</description>
</property> </property>
<property>
<name>isLookUpUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>actionSetId</name>
<description>id of the actionSet</description>
</property>
<property> <property>
<name>workingPath</name> <name>workingPath</name>
<description>path of the working directory</description> <description>path of the working directory</description>
@ -34,6 +26,21 @@
</property> </property>
</parameters> </parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
</configuration>
</global>
<start to="UpdateEntity"/> <start to="UpdateEntity"/>
<kill name="Kill"> <kill name="Kill">
@ -45,11 +52,9 @@
<prepare> <prepare>
<delete path='${dedupGraphPath}'/> <delete path='${dedupGraphPath}'/>
</prepare> </prepare>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master> <master>yarn-cluster</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Create Dedup Record</name> <name>Update Entity</name>
<class>eu.dnetlib.dhp.dedup.SparkUpdateEntity</class> <class>eu.dnetlib.dhp.dedup.SparkUpdateEntity</class>
<jar>dhp-dedup-${projectVersion}.jar</jar> <jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
@ -74,8 +79,6 @@
<prepare> <prepare>
<delete path='${dedupGraphPath}/relation'/> <delete path='${dedupGraphPath}/relation'/>
</prepare> </prepare>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master> <master>yarn-cluster</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Update Relations</name> <name>Update Relations</name>

View File

@ -17,12 +17,6 @@
"paramDescription": "the base path of the raw graph", "paramDescription": "the base path of the raw graph",
"paramRequired": true "paramRequired": true
}, },
{
"paramName": "o",
"paramLongName": "rawSet",
"paramDescription": "the raw set to be saved (full path)",
"paramRequired": true
},
{ {
"paramName": "la", "paramName": "la",
"paramLongName": "isLookUpUrl", "paramLongName": "isLookUpUrl",

View File

@ -27,7 +27,7 @@
"paramName": "o", "paramName": "o",
"paramLongName": "rawSet", "paramLongName": "rawSet",
"paramDescription": "the raw set to be saved (full path)", "paramDescription": "the raw set to be saved (full path)",
"paramRequired": true "paramRequired": false
}, },
{ {
"paramName": "w", "paramName": "w",

View File

@ -1,18 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -1,115 +0,0 @@
<workflow-app name="Build Root Records" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphBasePath</name>
<description>the raw graph base path</description>
</property>
<property>
<name>isLookUpUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>actionSetId</name>
<description>id of the actionSet</description>
</property>
<property>
<name>workingPath</name>
<description>path of the working directory</description>
</property>
<property>
<name>dedupGraphPath</name>
<description>path of the dedup graph</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
</configuration>
</global>
<start to="CreateMergeRel"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="CreateMergeRel">
<spark xmlns="uri:oozie:spark-action:0.2">
<prepare>
<delete path='${workingPath}/${actionSetId}/*_mergerel'/>
</prepare>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create Merge Relations</name>
<class>eu.dnetlib.dhp.dedup.SparkCreateConnectedComponent</class>
<jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg>
<arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg>
</spark>
<ok to="CreateDedupRecord"/>
<error to="Kill"/>
</action>
<action name="CreateDedupRecord">
<spark xmlns="uri:oozie:spark-action:0.2">
<prepare>
<delete path='${workingPath}/${actionSetId}/*_deduprecord'/>
</prepare>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create Dedup Record</name>
<class>eu.dnetlib.dhp.dedup.SparkCreateDedupRecord</class>
<jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg>
<arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,4 +1,4 @@
<workflow-app name="Create Similarity Relations" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="Duplicate Scan" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
<name>graphBasePath</name> <name>graphBasePath</name>
@ -49,13 +49,13 @@
</configuration> </configuration>
</global> </global>
<start to="DuplicateScan"/> <start to="CreateSimRel"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="DuplicateScan"> <action name="CreateSimRel">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<prepare> <prepare>
<delete path="${rawSet}"/> <delete path="${rawSet}"/>
@ -75,11 +75,66 @@
</spark-opts> </spark-opts>
<arg>-mt</arg><arg>yarn</arg> <arg>-mt</arg><arg>yarn</arg>
<arg>--i</arg><arg>${graphBasePath}</arg> <arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--o</arg><arg>${rawSet}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg> <arg>--la</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg> <arg>--asi</arg><arg>${actionSetId}</arg>
<arg>--w</arg><arg>${workingPath}</arg> <arg>--w</arg><arg>${workingPath}</arg>
</spark> </spark>
<ok to="CreateMergeRel"/>
<error to="Kill"/>
</action>
<action name="CreateMergeRel">
<spark xmlns="uri:oozie:spark-action:0.2">
<prepare>
<delete path='${workingPath}/${actionSetId}/*_mergerel'/>
</prepare>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create Merge Relations</name>
<class>eu.dnetlib.dhp.dedup.SparkCreateConnectedComponent</class>
<jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg>
<arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg>
</spark>
<ok to="CreateDedupRecord"/>
<error to="Kill"/>
</action>
<action name="CreateDedupRecord">
<spark xmlns="uri:oozie:spark-action:0.2">
<prepare>
<delete path='${workingPath}/${actionSetId}/*_deduprecord'/>
</prepare>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create Dedup Record</name>
<class>eu.dnetlib.dhp.dedup.SparkCreateDedupRecord</class>
<jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg>
<arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg>
</spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>

View File

@ -25,17 +25,14 @@ public class SparkCreateDedupTest {
@Test @Test
@Ignore @Ignore
public void createSimRelsTest2() throws Exception { public void createSimRelsTest() throws Exception {
SparkCreateSimRels.main(new String[]{ SparkCreateSimRels.main(new String[]{
"-mt", "local[*]", "-mt", "local[*]",
"-s", "/Users/miconis/dumps", "-i", "/Users/miconis/dumps",
"-e", entity, "-o", "/tmp/dedup/rawset_test",
"-c", ArgumentApplicationParser.compressArgument(configuration),
"-rs", "/tmp/dedup/rawset_test",
"-ai", "agentId",
"-an", "agentName",
"-asi", "dedup-similarity-result-levenstein", "-asi", "dedup-similarity-result-levenstein",
"-la", "lookupurl", "-la", "lookupurl",
"-w", "workingPath"
}); });
} }