added global properties in wf definitions to avoid repeating name-node and job-tracker in the (many) distcp actions; reintroduced output directory removal at the beginning of each spark action

This commit is contained in:
Claudio Atzori 2020-05-14 10:25:41 +02:00
parent 12bfa6702e
commit ab37953332
17 changed files with 213 additions and 212 deletions

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.bulktag; package eu.dnetlib.dhp.bulktag;
import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional; import java.util.Optional;
@ -84,6 +85,7 @@ public class SparkBulkTagJob {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, outputPath);
execBulkTag(spark, inputPath, outputPath, protoMappingParams, resultClazz, cc); execBulkTag(spark, inputPath, outputPath, protoMappingParams, resultClazz, cc);
}); });
} }

View File

@ -69,13 +69,16 @@ public class SparkCountryPropagationJob {
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> execPropagation( spark -> {
removeOutputDir(spark, outputPath);
execPropagation(
spark, spark,
sourcePath, sourcePath,
preparedInfoPath, preparedInfoPath,
outputPath, outputPath,
resultClazz, resultClazz,
saveGraph)); saveGraph);
});
} }
private static <R extends Result> void execPropagation( private static <R extends Result> void execPropagation(

View File

@ -74,9 +74,7 @@ public class PrepareResultOrcidAssociationStep1 {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
}
prepareInfo( prepareInfo(
spark, inputRelationPath, inputResultPath, outputResultPath, resultClazz, allowedsemrel); spark, inputRelationPath, inputResultPath, outputResultPath, resultClazz, allowedsemrel);
}); });

View File

@ -50,9 +50,7 @@ public class PrepareResultOrcidAssociationStep2 {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
}
mergeInfo(spark, inputPath, outputPath); mergeInfo(spark, inputPath, outputPath);
}); });
} }

View File

@ -70,11 +70,10 @@ public class SparkOrcidToResultFromSemRelJob {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
} if (saveGraph) {
if (saveGraph)
execPropagation(spark, possibleUpdates, inputPath, outputPath, resultClazz); execPropagation(spark, possibleUpdates, inputPath, outputPath, resultClazz);
}
}); });
} }

View File

@ -60,6 +60,8 @@ public class PrepareProjectResultsAssociation {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, potentialUpdatePath);
removeOutputDir(spark, alreadyLinkedPath);
prepareResultProjProjectResults( prepareResultProjProjectResults(
spark, spark,
inputPath, inputPath,

View File

@ -55,9 +55,7 @@ public class PrepareResultCommunitySet {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
}
prepareInfo(spark, inputPath, outputPath, organizationMap); prepareInfo(spark, inputPath, outputPath, organizationMap);
}); });
} }

View File

@ -68,11 +68,10 @@ public class SparkResultToCommunityFromOrganizationJob {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
} if (saveGraph) {
if (saveGraph)
execPropagation(spark, inputPath, outputPath, resultClazz, possibleupdatespath); execPropagation(spark, inputPath, outputPath, resultClazz, possibleupdatespath);
}
}); });
} }

View File

@ -58,30 +58,15 @@ public class PrepareResultInstRepoAssociation {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
readNeededResources(spark, inputPath); readNeededResources(spark, inputPath);
removeOutputDir(spark, datasourceOrganizationPath);
prepareDatasourceOrganization(spark, datasourceOrganizationPath); prepareDatasourceOrganization(spark, datasourceOrganizationPath);
removeOutputDir(spark, alreadyLinkedPath);
prepareAlreadyLinkedAssociation(spark, alreadyLinkedPath); prepareAlreadyLinkedAssociation(spark, alreadyLinkedPath);
}); });
} }
private static void prepareAlreadyLinkedAssociation(
SparkSession spark, String alreadyLinkedPath) {
String query = "Select source resultId, collect_set(target) organizationSet "
+ "from relation "
+ "where datainfo.deletedbyinference = false "
+ "and relClass = '"
+ RELATION_RESULT_ORGANIZATION_REL_CLASS
+ "' "
+ "group by source";
spark
.sql(query)
.as(Encoders.bean(ResultOrganizationSet.class))
// TODO retry to stick with datasets
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(alreadyLinkedPath, GzipCodec.class);
}
private static void readNeededResources(SparkSession spark, String inputPath) { private static void readNeededResources(SparkSession spark, String inputPath) {
Dataset<Datasource> datasource = readPath(spark, inputPath + "/datasource", Datasource.class); Dataset<Datasource> datasource = readPath(spark, inputPath + "/datasource", Datasource.class);
datasource.createOrReplaceTempView("datasource"); datasource.createOrReplaceTempView("datasource");
@ -119,4 +104,24 @@ public class PrepareResultInstRepoAssociation {
.option("compression", "gzip") .option("compression", "gzip")
.json(datasourceOrganizationPath); .json(datasourceOrganizationPath);
} }
private static void prepareAlreadyLinkedAssociation(
SparkSession spark, String alreadyLinkedPath) {
String query = "Select source resultId, collect_set(target) organizationSet "
+ "from relation "
+ "where datainfo.deletedbyinference = false "
+ "and relClass = '"
+ RELATION_RESULT_ORGANIZATION_REL_CLASS
+ "' "
+ "group by source";
spark
.sql(query)
.as(Encoders.bean(ResultOrganizationSet.class))
// TODO retry to stick with datasets
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(alreadyLinkedPath, GzipCodec.class);
}
} }

View File

@ -83,10 +83,8 @@ public class SparkResultToOrganizationFromIstRepoJob {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
} if (saveGraph) {
if (saveGraph)
execPropagation( execPropagation(
spark, spark,
datasourceorganization, datasourceorganization,
@ -94,6 +92,7 @@ public class SparkResultToOrganizationFromIstRepoJob {
inputPath, inputPath,
outputPath, outputPath,
resultClazz); resultClazz);
}
}); });
} }

View File

@ -18,6 +18,17 @@
</property> </property>
</parameters> </parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="reset_outputpath"/> <start to="reset_outputpath"/>
<kill name="Kill"> <kill name="Kill">
@ -42,8 +53,6 @@
<action name="copy_relation"> <action name="copy_relation">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/relation</arg> <arg>${nameNode}/${sourcePath}/relation</arg>
<arg>${nameNode}/${outputPath}/relation</arg> <arg>${nameNode}/${outputPath}/relation</arg>
</distcp> </distcp>
@ -53,8 +62,6 @@
<action name="copy_organization"> <action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/organization</arg> <arg>${nameNode}/${sourcePath}/organization</arg>
<arg>${nameNode}/${outputPath}/organization</arg> <arg>${nameNode}/${outputPath}/organization</arg>
</distcp> </distcp>
@ -64,8 +71,6 @@
<action name="copy_projects"> <action name="copy_projects">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/project</arg> <arg>${nameNode}/${sourcePath}/project</arg>
<arg>${nameNode}/${outputPath}/project</arg> <arg>${nameNode}/${outputPath}/project</arg>
</distcp> </distcp>
@ -75,8 +80,6 @@
<action name="copy_datasources"> <action name="copy_datasources">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/datasource</arg> <arg>${nameNode}/${sourcePath}/datasource</arg>
<arg>${nameNode}/${outputPath}/datasource</arg> <arg>${nameNode}/${outputPath}/datasource</arg>
</distcp> </distcp>
@ -95,8 +98,6 @@
<action name="join_bulktag_publication"> <action name="join_bulktag_publication">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master> <master>yarn-cluster</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>bulkTagging-publication</name> <name>bulkTagging-publication</name>
@ -124,8 +125,6 @@
<action name="join_bulktag_dataset"> <action name="join_bulktag_dataset">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master> <master>yarn-cluster</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>bulkTagging-dataset</name> <name>bulkTagging-dataset</name>
@ -153,8 +152,6 @@
<action name="join_bulktag_otherresearchproduct"> <action name="join_bulktag_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master> <master>yarn-cluster</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>bulkTagging-orp</name> <name>bulkTagging-orp</name>
@ -182,8 +179,6 @@
<action name="join_bulktag_software"> <action name="join_bulktag_software">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master> <master>yarn-cluster</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>bulkTagging-software</name> <name>bulkTagging-software</name>

View File

@ -19,6 +19,17 @@
</parameters> </parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="reset_outputpath"/> <start to="reset_outputpath"/>
<kill name="Kill"> <kill name="Kill">
@ -43,8 +54,6 @@
<action name="copy_relation"> <action name="copy_relation">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/relation</arg> <arg>${nameNode}/${sourcePath}/relation</arg>
<arg>${nameNode}/${outputPath}/relation</arg> <arg>${nameNode}/${outputPath}/relation</arg>
</distcp> </distcp>
@ -54,18 +63,15 @@
<action name="copy_organization"> <action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/organization</arg> <arg>${nameNode}/${sourcePath}/organization</arg>
<arg>${nameNode}/${outputPath}/organization</arg> <arg>${nameNode}/${outputPath}/organization</arg>
</distcp> </distcp>
<ok to="copy_wait"/> <ok to="copy_wait"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="copy_projects"> <action name="copy_projects">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/project</arg> <arg>${nameNode}/${sourcePath}/project</arg>
<arg>${nameNode}/${outputPath}/project</arg> <arg>${nameNode}/${outputPath}/project</arg>
</distcp> </distcp>
@ -75,8 +81,6 @@
<action name="copy_datasources"> <action name="copy_datasources">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/datasource</arg> <arg>${nameNode}/${sourcePath}/datasource</arg>
<arg>${nameNode}/${outputPath}/datasource</arg> <arg>${nameNode}/${outputPath}/datasource</arg>
</distcp> </distcp>

View File

@ -57,6 +57,7 @@
<ok to="copy_wait"/> <ok to="copy_wait"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="copy_projects"> <action name="copy_projects">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker> <job-tracker>${jobTracker}</job-tracker>
@ -81,7 +82,6 @@
<join name="copy_wait" to="fork_prepare_assoc_step1"/> <join name="copy_wait" to="fork_prepare_assoc_step1"/>
<fork name="fork_prepare_assoc_step1"> <fork name="fork_prepare_assoc_step1">
<path start="join_prepare_publication"/> <path start="join_prepare_publication"/>
<path start="join_prepare_dataset"/> <path start="join_prepare_dataset"/>
@ -230,8 +230,8 @@
</spark> </spark>
<ok to="fork-join-exec-propagation"/> <ok to="fork-join-exec-propagation"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<fork name="fork-join-exec-propagation"> <fork name="fork-join-exec-propagation">
<path start="join_propagate_publication"/> <path start="join_propagate_publication"/>
<path start="join_propagate_dataset"/> <path start="join_propagate_dataset"/>
@ -271,6 +271,7 @@
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="join_propagate_dataset"> <action name="join_propagate_dataset">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -302,6 +303,7 @@
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="join_propagate_otherresearchproduct"> <action name="join_propagate_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -333,6 +335,7 @@
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="join_propagate_software"> <action name="join_propagate_software">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>

View File

@ -14,6 +14,17 @@
</property> </property>
</parameters> </parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="reset_outputpath"/> <start to="reset_outputpath"/>
<kill name="Kill"> <kill name="Kill">
@ -42,8 +53,6 @@
<action name="copy_relation"> <action name="copy_relation">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/relation</arg> <arg>${nameNode}/${sourcePath}/relation</arg>
<arg>${nameNode}/${outputPath}/relation</arg> <arg>${nameNode}/${outputPath}/relation</arg>
</distcp> </distcp>
@ -53,8 +62,6 @@
<action name="copy_publication"> <action name="copy_publication">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/publication</arg> <arg>${nameNode}/${sourcePath}/publication</arg>
<arg>${nameNode}/${outputPath}/publication</arg> <arg>${nameNode}/${outputPath}/publication</arg>
</distcp> </distcp>
@ -64,8 +71,6 @@
<action name="copy_dataset"> <action name="copy_dataset">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/dataset</arg> <arg>${nameNode}/${sourcePath}/dataset</arg>
<arg>${nameNode}/${outputPath}/dataset</arg> <arg>${nameNode}/${outputPath}/dataset</arg>
</distcp> </distcp>
@ -75,8 +80,6 @@
<action name="copy_orp"> <action name="copy_orp">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/otherresearchproduct</arg> <arg>${nameNode}/${sourcePath}/otherresearchproduct</arg>
<arg>${nameNode}/${outputPath}/otherresearchproduct</arg> <arg>${nameNode}/${outputPath}/otherresearchproduct</arg>
</distcp> </distcp>
@ -86,28 +89,24 @@
<action name="copy_software"> <action name="copy_software">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/software</arg> <arg>${nameNode}/${sourcePath}/software</arg>
<arg>${nameNode}/${outputPath}/software</arg> <arg>${nameNode}/${outputPath}/software</arg>
</distcp> </distcp>
<ok to="wait"/> <ok to="wait"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="copy_organization"> <action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/organization</arg> <arg>${nameNode}/${sourcePath}/organization</arg>
<arg>${nameNode}/${outputPath}/organization</arg> <arg>${nameNode}/${outputPath}/organization</arg>
</distcp> </distcp>
<ok to="wait"/> <ok to="wait"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="copy_projects"> <action name="copy_projects">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/project</arg> <arg>${nameNode}/${sourcePath}/project</arg>
<arg>${nameNode}/${outputPath}/project</arg> <arg>${nameNode}/${outputPath}/project</arg>
</distcp> </distcp>
@ -117,8 +116,6 @@
<action name="copy_datasources"> <action name="copy_datasources">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/datasource</arg> <arg>${nameNode}/${sourcePath}/datasource</arg>
<arg>${nameNode}/${outputPath}/datasource</arg> <arg>${nameNode}/${outputPath}/datasource</arg>
</distcp> </distcp>

View File

@ -14,6 +14,17 @@
</property> </property>
</parameters> </parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="reset_outputpath"/> <start to="reset_outputpath"/>
<kill name="Kill"> <kill name="Kill">
@ -38,8 +49,6 @@
<action name="copy_relation"> <action name="copy_relation">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/relation</arg> <arg>${nameNode}/${sourcePath}/relation</arg>
<arg>${nameNode}/${outputPath}/relation</arg> <arg>${nameNode}/${outputPath}/relation</arg>
</distcp> </distcp>
@ -49,8 +58,6 @@
<action name="copy_organization"> <action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/organization</arg> <arg>${nameNode}/${sourcePath}/organization</arg>
<arg>${nameNode}/${outputPath}/organization</arg> <arg>${nameNode}/${outputPath}/organization</arg>
</distcp> </distcp>
@ -60,8 +67,6 @@
<action name="copy_projects"> <action name="copy_projects">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/project</arg> <arg>${nameNode}/${sourcePath}/project</arg>
<arg>${nameNode}/${outputPath}/project</arg> <arg>${nameNode}/${outputPath}/project</arg>
</distcp> </distcp>
@ -71,8 +76,6 @@
<action name="copy_datasources"> <action name="copy_datasources">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/datasource</arg> <arg>${nameNode}/${sourcePath}/datasource</arg>
<arg>${nameNode}/${outputPath}/datasource</arg> <arg>${nameNode}/${outputPath}/datasource</arg>
</distcp> </distcp>
@ -101,8 +104,8 @@
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg> <arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg> <arg>--outputPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--organizationtoresultcommunitymap</arg><arg>${organizationtoresultcommunitymap}</arg> <arg>--organizationtoresultcommunitymap</arg><arg>${organizationtoresultcommunitymap}</arg>
</spark> </spark>
<ok to="fork-join-exec-propagation"/> <ok to="fork-join-exec-propagation"/>
@ -136,9 +139,9 @@
</spark-opts> </spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg> <arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg> <arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg> <arg>--saveGraph</arg><arg>${saveGraph}</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
@ -165,9 +168,9 @@
</spark-opts> </spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg> <arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg> <arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg> <arg>--saveGraph</arg><arg>${saveGraph}</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
@ -194,9 +197,9 @@
</spark-opts> </spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg> <arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg> <arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg> <arg>--saveGraph</arg><arg>${saveGraph}</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
@ -223,9 +226,9 @@
</spark-opts> </spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg> <arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg> <arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg> <arg>--saveGraph</arg><arg>${saveGraph}</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>

View File

@ -10,6 +10,17 @@
</property> </property>
</parameters> </parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="reset_outputpath"/> <start to="reset_outputpath"/>
<kill name="Kill"> <kill name="Kill">
@ -38,8 +49,6 @@
<action name="copy_relation"> <action name="copy_relation">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/relation</arg> <arg>${nameNode}/${sourcePath}/relation</arg>
<arg>${nameNode}/${outputPath}/relation</arg> <arg>${nameNode}/${outputPath}/relation</arg>
</distcp> </distcp>
@ -49,8 +58,6 @@
<action name="copy_publication"> <action name="copy_publication">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/publication</arg> <arg>${nameNode}/${sourcePath}/publication</arg>
<arg>${nameNode}/${outputPath}/publication</arg> <arg>${nameNode}/${outputPath}/publication</arg>
</distcp> </distcp>
@ -60,8 +67,6 @@
<action name="copy_dataset"> <action name="copy_dataset">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/dataset</arg> <arg>${nameNode}/${sourcePath}/dataset</arg>
<arg>${nameNode}/${outputPath}/dataset</arg> <arg>${nameNode}/${outputPath}/dataset</arg>
</distcp> </distcp>
@ -71,8 +76,6 @@
<action name="copy_orp"> <action name="copy_orp">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/otherresearchproduct</arg> <arg>${nameNode}/${sourcePath}/otherresearchproduct</arg>
<arg>${nameNode}/${outputPath}/otherresearchproduct</arg> <arg>${nameNode}/${outputPath}/otherresearchproduct</arg>
</distcp> </distcp>
@ -82,8 +85,6 @@
<action name="copy_software"> <action name="copy_software">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/software</arg> <arg>${nameNode}/${sourcePath}/software</arg>
<arg>${nameNode}/${outputPath}/software</arg> <arg>${nameNode}/${outputPath}/software</arg>
</distcp> </distcp>
@ -93,8 +94,6 @@
<action name="copy_organization"> <action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/organization</arg> <arg>${nameNode}/${sourcePath}/organization</arg>
<arg>${nameNode}/${outputPath}/organization</arg> <arg>${nameNode}/${outputPath}/organization</arg>
</distcp> </distcp>
@ -104,8 +103,6 @@
<action name="copy_projects"> <action name="copy_projects">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/project</arg> <arg>${nameNode}/${sourcePath}/project</arg>
<arg>${nameNode}/${outputPath}/project</arg> <arg>${nameNode}/${outputPath}/project</arg>
</distcp> </distcp>
@ -115,8 +112,6 @@
<action name="copy_datasources"> <action name="copy_datasources">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/datasource</arg> <arg>${nameNode}/${sourcePath}/datasource</arg>
<arg>${nameNode}/${outputPath}/datasource</arg> <arg>${nameNode}/${outputPath}/datasource</arg>
</distcp> </distcp>
@ -125,6 +120,7 @@
</action> </action>
<join name="wait" to="prepare_result_organization_association"/> <join name="wait" to="prepare_result_organization_association"/>
<action name="prepare_result_organization_association"> <action name="prepare_result_organization_association">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -176,12 +172,12 @@
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg> <arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/relation</arg> <arg>--outputPath</arg><arg>${outputPath}/relation</arg>
<arg>--datasourceOrganizationPath</arg><arg>${workingDir}/preparedInfo/datasourceOrganization</arg> <arg>--datasourceOrganizationPath</arg><arg>${workingDir}/preparedInfo/datasourceOrganization</arg>
<arg>--alreadyLinkedPath</arg><arg>${workingDir}/preparedInfo/alreadyLinked</arg> <arg>--alreadyLinkedPath</arg><arg>${workingDir}/preparedInfo/alreadyLinked</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
@ -206,12 +202,12 @@
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg> <arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/relation</arg> <arg>--outputPath</arg><arg>${outputPath}/relation</arg>
<arg>--datasourceOrganizationPath</arg><arg>${workingDir}/preparedInfo/datasourceOrganization</arg> <arg>--datasourceOrganizationPath</arg><arg>${workingDir}/preparedInfo/datasourceOrganization</arg>
<arg>--alreadyLinkedPath</arg><arg>${workingDir}/preparedInfo/alreadyLinked</arg> <arg>--alreadyLinkedPath</arg><arg>${workingDir}/preparedInfo/alreadyLinked</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
@ -236,12 +232,12 @@
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg> <arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/relation</arg> <arg>--outputPath</arg><arg>${outputPath}/relation</arg>
<arg>--datasourceOrganizationPath</arg><arg>${workingDir}/preparedInfo/datasourceOrganization</arg> <arg>--datasourceOrganizationPath</arg><arg>${workingDir}/preparedInfo/datasourceOrganization</arg>
<arg>--alreadyLinkedPath</arg><arg>${workingDir}/preparedInfo/alreadyLinked</arg> <arg>--alreadyLinkedPath</arg><arg>${workingDir}/preparedInfo/alreadyLinked</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
@ -266,12 +262,12 @@
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg> <arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${outputPath}/relation</arg> <arg>--outputPath</arg><arg>${outputPath}/relation</arg>
<arg>--datasourceOrganizationPath</arg><arg>${workingDir}/preparedInfo/datasourceOrganization</arg> <arg>--datasourceOrganizationPath</arg><arg>${workingDir}/preparedInfo/datasourceOrganization</arg>
<arg>--alreadyLinkedPath</arg><arg>${workingDir}/preparedInfo/alreadyLinked</arg> <arg>--alreadyLinkedPath</arg><arg>${workingDir}/preparedInfo/alreadyLinked</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>