new workflow with added steps

This commit is contained in:
Miriam Baglioni 2020-05-19 18:44:25 +02:00
parent 457293ccc0
commit 08218d2f3f
1 changed files with 82 additions and 32 deletions

View File

@ -24,6 +24,7 @@
<fs>
<delete path='${outputPath}'/>
<mkdir path='${outputPath}'/>
<delete path="/tmp/h2020programme"/>
</fs>
<ok to="get_project_file"/>
<error to="Kill"/>
@ -31,51 +32,100 @@
<action name="get_project_file">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.project.GetFile</main-class>
<main-class>eu.dnetlib.dhp.actionmanager.project.csvutils.ReadCSV</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--fileURL</arg><arg>${projectFileURL}</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/projects.csv</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/projects</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject</arg>
</java>
<ok to="shell_get_programme_file"/>
<ok to="get_programme_file"/>
<error to="Kill"/>
</action>
<action name="shell_get_programme_file">
<action name="get_programme_file">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.project.GetFile</main-class>
<main-class>eu.dnetlib.dhp.actionmanager.project.csvutils.ReadCSV</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--fileURL</arg><arg>${programmeFileURL}</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/programme.csv</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/programme</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme</arg>
</java>
<ok to="prepare_programme"/>
<error to="Kill"/>
</action>
<action name="prepare_programme">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareProgramme</name>
<class>eu.dnetlib.dhp.actionmanager.project.PrepareProgramme</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--programmePath</arg><arg>${workingDir}/programme</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedProgramme</arg>
</spark>
<ok to="prepare_project"/>
<error to="Kill"/>
</action>
<action name="prepare_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareProgramme</name>
<class>eu.dnetlib.dhp.actionmanager.project.PrepareProjects</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--projectPath</arg><arg>${workingDir}/projects</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedProjects</arg>
</spark>
<ok to="create_updates"/>
<error to="Kill"/>
</action>
<action name="create_updates">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>ProjectProgrammeAS</name>
<class>eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--projectPath</arg><arg>${workingDir}/preparedProjects</arg>
<arg>--programmePath</arg><arg>${workingDir}/preparedProgramme</arg>
<arg>--outputPath</arg><arg>/tmp/h2020programme</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<!-- <action name="create_actionset">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <master>yarn</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>ProjectProgrammeAS</name>-->
<!-- <class>eu.dnetlib.dhp.actionmanager.project</class>-->
<!-- <jar>dhp-aggregation-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- &#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!-- &#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!-- &#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!-- &#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!-- &#45;&#45;conf spark.sql.shuffle.partitions=3840-->
<!-- </spark-opts>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${sourcePath}/relation</arg>-->
<!-- <arg>&#45;&#45;outputPath</arg><arg>${outputPath}/relation</arg>-->
<!-- <arg>&#45;&#45;hdfsPath</arg><arg>${workingDir}/blacklist</arg>-->
<!-- <arg>&#45;&#45;mergesPath</arg><arg>${workingDir}/mergesRelation</arg>-->
<!-- </spark>-->
<!-- <ok to="End"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<end name="End"/>
</workflow-app>