new workflow. It is composed of four steps. The first removes the directory where to store the results. The second copies the relation to the new locatio, the third id the preparation phase and then the actual propagation

This commit is contained in:
Miriam Baglioni 2020-04-14 16:49:24 +02:00
parent ca2b40952e
commit 3f4b579e7f
1 changed files with 100 additions and 8 deletions

View File

@ -20,14 +20,96 @@
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>writeUpdate</name>
<description>writes the information found for the update. No double check done if the information is already present</description>
</property>
<property>
<name>saveGraph</name>
<description>writes new version of the graph after the propagation step</description>
</property>
</parameters>
<start to="ProjectPropagation"/>
<start to="reset-outputpath"/>
<!--<start to="prepare_project_results_association"/>-->
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="reset-outputpath">
<fs>
<delete path='${workingDir}/projecttoresult_propagation/relation'/>
</fs>
<ok to="copy_relation"/>
<error to="Kill"/>
</action>
<action name="copy_relation">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/relation</arg>
<arg>${nameNode}/${workingDir}/projecttoresult_propagation/relation</arg>
</distcp>
<ok to="prepare_project_results_association"/>
<error to="Kill"/>
</action>
<action name="prepare_project_results_association">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareProjectResultsAssociation</name>
<class>eu.dnetlib.dhp.projecttoresult.PrepareProjectResultsAssociation</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--potentialUpdatePath</arg><arg>${workingDir}/projecttoresult_propagation/preparedInfo/potentialUpdates</arg>
<arg>--alreadyLinkedPath</arg><arg>${workingDir}/projecttoresult_propagation/preparedInfo/alreadyLinked</arg>
</spark>
<ok to="apply_propagation"/>
<error to="Kill"/>
</action>
<action name="apply_propagation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ProjectToResultPropagation</name>
<class>eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob3</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
<arg>--writeUpdate</arg><arg>${writeUpdate}</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--outputPath</arg><arg>${workingDir}/projecttoresult_propagation/relation</arg>
<arg>--potentialUpdatePath</arg><arg>${workingDir}/projecttoresult_propagation/preparedInfo/potentialUpdates</arg>
<arg>--alreadyLinkedPath</arg><arg>${workingDir}/projecttoresult_propagation/preparedInfo/alreadyLinked</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="ProjectPropagation">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
@ -35,17 +117,27 @@
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ProjectToResultPropagation</name>
<class>eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob</class>
<class>eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob2</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--writeUpdate</arg><arg>${writeUpdate}</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
<arg>--potentialUpdatePath</arg><arg>${workingDir}/projecttoresult_propagation/preparedInfo/potentialUpdates</arg>
<arg>--alreadyLinkedPath</arg><arg>${workingDir}/projecttoresult_propagation/preparedInfo/alreadyLinked</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>