1
0
Fork 0

Hosted By Map - extention of the workflow to consider also the application of the map to publications and datasources

This commit is contained in:
Miriam Baglioni 2021-08-04 10:18:11 +02:00
parent 67ba4c40e0
commit e94ae0b1de
1 changed files with 99 additions and 7 deletions

View File

@ -9,6 +9,10 @@
<name>outputPath</name> <name>outputPath</name>
<description>the output path</description> <description>the output path</description>
</property> </property>
<property>
<name>hostedByMapPath</name>
<description>the output path</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -65,23 +69,30 @@
</configuration> </configuration>
</global> </global>
<start to="reset_outputpath"/> <start to="resume_from"/>
<decision name="resume_from">
<switch>
<case to="remove_hbmpath">${wf:conf('resumeFrom') eq 'ProduceHBM'}</case>
<default to="prepareInfo"/>
</switch>
</decision>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="reset_outputpath"> <action name="remove_hbmpath">
<fs> <fs>
<delete path="${outputPath}"/> <delete path="${hostedByMapPath}"/>
<mkdir path="${outputPath}"/> <!-- <mkdir path="${hostedByMapPath}"/>-->
</fs> </fs>
<ok to="fork_downloads_csv"/> <ok to="fork_downloads_csv"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<fork name="fork_downloads_csv"> <fork name="fork_downloads_csv">
<path start="download_gold"/> <path start="download_gold"/>
<path start="download_doaj"/> <path start="download_doaj"/>
@ -134,13 +145,94 @@
</spark-opts> </spark-opts>
<arg>--datasourcePath</arg><arg>${sourcePath}/datasource</arg> <arg>--datasourcePath</arg><arg>${sourcePath}/datasource</arg>
<arg>--workingPath</arg><arg>${workingDir}</arg> <arg>--workingPath</arg><arg>${workingDir}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${hostedByMapPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg> <arg>--master</arg><arg>yarn-cluster</arg>
</spark> </spark>
<ok to="End"/> <ok to="prepareInfo"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="prepareInfo">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<name>Prepare info to apply the hbm</name>
<class>eu.dnetlib.dhp.oa.graph.hostedbymap.SparkPrepareHostedByInfoToApply</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--hostedByMapPath</arg><arg>${hostedByMapPath}</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="fork_apply"/>
<error to="Kill"/>
</action>
<fork name="fork_apply">
<path start="apply_result"/>
<path start="apply_datasource"/>
</fork>
<action name="apply_result">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<name>Apply hbm to result</name>
<class>eu.dnetlib.dhp.oa.graph.hostedbymap.SparkApplyHostedByMapToResult</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="join_apply"/>
<error to="Kill"/>
</action>
<action name="apply_datasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<name>Apply hbm to datasource</name>
<class>eu.dnetlib.dhp.oa.graph.hostedbymap.SparkApplyHostedByMapToDatasource</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--outputPath</arg><arg>${outputPath}/datasource</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="join_apply"/>
<error to="Kill"/>
</action>
<join name="join_apply" to="End"/>
<end name="End"/> <end name="End"/>