new parametrized workflow with preparation steps and new parameter input files

This commit is contained in:
Miriam Baglioni 2020-04-27 10:48:31 +02:00
parent e2093644dc
commit f9ee343fc0
5 changed files with 338 additions and 80 deletions

View File

@ -35,6 +35,12 @@
"paramLongName": "preparedInfoPath", "paramLongName": "preparedInfoPath",
"paramDescription": "the path where prepared info have been stored", "paramDescription": "the path where prepared info have been stored",
"paramRequired": true "paramRequired": true
},
{
"paramName":"test",
"paramLongName":"isTest",
"paramDescription": "true if it is executing a test",
"paramRequired": false
} }
] ]

View File

@ -1,34 +1,10 @@
[ [
{
"paramName":"is",
"paramLongName":"isLookupUrl",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
{ {
"paramName":"s", "paramName":"s",
"paramLongName":"sourcePath", "paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read", "paramDescription": "the path of the sequencial file to read",
"paramRequired": true "paramRequired": true
}, },
{
"paramName":"as",
"paramLongName":"allowedsemrels",
"paramDescription": "the allowed semantic relations for propagation",
"paramRequired": true
},
{
"paramName":"h",
"paramLongName":"hive_metastore_uris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName":"sg",
"paramLongName":"saveGraph",
"paramDescription": "true if the new version of the graph must be saved",
"paramRequired": false
},
{ {
"paramName": "ssm", "paramName": "ssm",
"paramLongName": "isSparkSessionManaged", "paramLongName": "isSparkSessionManaged",
@ -40,11 +16,5 @@
"paramLongName": "outputPath", "paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files", "paramDescription": "the path used to store temporary output files",
"paramRequired": true "paramRequired": true
}, }
{
"paramName":"tn",
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
}
] ]

View File

@ -5,12 +5,6 @@
"paramDescription": "URL of the isLookUp Service", "paramDescription": "URL of the isLookUp Service",
"paramRequired": true "paramRequired": true
}, },
{
"paramName":"mt",
"paramLongName":"master",
"paramDescription": "should be local or yarn",
"paramRequired": true
},
{ {
"paramName":"s", "paramName":"s",
"paramLongName":"sourcePath", "paramLongName":"sourcePath",
@ -30,16 +24,21 @@
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName":"wu", "paramName": "ssm",
"paramLongName":"writeUpdate", "paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the update must be writte. No double check if information is already present", "paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName":"sg", "paramName":"tn",
"paramLongName":"saveGraph", "paramLongName":"resultTableName",
"paramDescription": "true if the new version of the graph must be saved", "paramDescription": "the name of the result table we are currently working on",
"paramRequired": true "paramRequired": true
} }
] ]

View File

@ -19,4 +19,40 @@
<name>hive_metastore_uris</name> <name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value> <value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property> </property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
</property>
<property>
<name>sparkExecutorNumber</name>
<value>4</value>
</property>
<property>
<name>sparkDriverMemory</name>
<value>15G</value>
</property>
<property>
<name>sparkExecutorMemory</name>
<value>6G</value>
</property>
<property>
<name>sparkExecutorCores</name>
<value>1</value>
</property>
<property>
<name>spark2MaxExecutors</name>
<value>50</value>
</property>
</configuration> </configuration>

View File

@ -8,54 +8,301 @@
<name>allowedsemrels</name> <name>allowedsemrels</name>
<description>the semantic relationships allowed for propagation</description> <description>the semantic relationships allowed for propagation</description>
</property> </property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property> <property>
<name>isLookupUrl</name> <name>isLookupUrl</name>
<description>the isLookup service endpoint</description> <description>the isLookup service endpoint</description>
</property> </property>
</parameters> </parameters>
<start to="ResultToCommunityFromSemRelPropagation"/> <start to="reset-outputpath"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="reset-outputpath">
<fs>
<delete path='${workingDir}/resulttocommunitysemrel_propagation'/>
</fs>
<ok to="copy_relation"/>
<error to="Kill"/>
</action>
<action name="copy_relation">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/relation</arg>
<arg>${nameNode}/${workingDir}/projecttoresult_propagation/relation</arg>
</distcp>
<ok to="fork_prepare_assoc_step1"/>
<error to="Kill"/>
</action>
<action name="ResultToCommunityFromSemRelPropagation"> <fork name="fork_prepare_assoc_step1">
<spark xmlns="uri:oozie:spark-action:0.2"> <path start="join_prepare_publication"/>
<job-tracker>${jobTracker}</job-tracker> <path start="join_prepare_dataset"/>
<name-node>${nameNode}</name-node> <path start="join_prepare_otherresearchproduct"/>
<master>yarn-cluster</master> <path start="join_prepare_software"/>
</fork>
<action name="join_prepare_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ResultToCommunitySemRel-PreparePhase1-Publications</name>
<class>eu.dnetlib.dhp.resulttocommunityfromsemrel.PrepareResultCommunitySetStep1</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/targetCommunityAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="join_prepare_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ResultToCommunitySemRel-PreparePhase1-Dataset</name>
<class>eu.dnetlib.dhp.resulttocommunityfromsemrel.PrepareResultCommunitySetStep1</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/targetCommunityAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="join_prepare_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ResultToCommunitySemRel-PreparePhase1-ORP</name>
<class>eu.dnetlib.dhp.resulttocommunityfromsemrel.PrepareResultCommunitySetStep1</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/targetCommunityAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="join_prepare_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ResultToCommunitySemRel-PreparePhase1-Software</name>
<class>eu.dnetlib.dhp.resulttocommunityfromsemrel.PrepareResultCommunitySetStep1</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/targetCommunityAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<join name="wait" to="prepare_assoc_step2"/>
<action name="prepare_assoc_step2">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>ResultToCommunitySemRelPropagation</name> <name>ResultToCommunityEmRelPropagation-PreparePhase2</name>
<class>eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob</class> <class>eu.dnetlib.dhp.resulttocommunityfromsemrel.PrepareResultCommunitySetStep2</class>
<jar>dhp-propagation-${projectVersion}.jar</jar> <jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} <spark-opts>
--executor-cores ${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts> </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg> <arg>--sourcePath</arg><arg>${workingDir}/preparedInfo/targetCommunityAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--outputPath</arg><arg>${workingDir}/preparedInfo/mergedCommunityAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
</spark> </spark>
<ok to="End"/> <ok to="fork-join-exec-propagation"/>
<!-- <ok to="End"/>-->
<error to="Kill"/> <error to="Kill"/>
</action>
</action>
<fork name="fork-join-exec-propagation">
<path start="join_propagate_publication"/>
<path start="join_propagate_dataset"/>
<path start="join_propagate_otherresearchproduct"/>
<path start="join_propagate_software"/>
</fork>
<action name="join_propagate_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Result2CommunitySemRelPropagation-Publication</name>
<class>eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob4</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/mergedCommunityAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/publication</arg>
</spark>
<ok to="wait2"/>
<error to="Kill"/>
</action>
<action name="join_propagate_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Result2CommunitySemRelPropagation-Dataset</name>
<class>eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob4</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/mergedCommunityAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/dataset</arg>
</spark>
<ok to="wait2"/>
<error to="Kill"/>
</action>
<action name="join_propagate_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Result2CommunitySemRelPropagation-ORP</name>
<class>eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob4</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/mergedCommunityAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/otherresearchproduct</arg>
</spark>
<ok to="wait2"/>
<error to="Kill"/>
</action>
<action name="join_propagate_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Result2CommunitySemRelPropagation-Software</name>
<class>eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob4</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/mergedCommunityAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/software</arg>
</spark>
<ok to="wait2"/>
<error to="Kill"/>
</action>
<join name="wait2" to="End"/>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>