WIP: materialize graph as Hive DB, mergeAggregatorGraphs [added workflow node to drop the DB]

This commit is contained in:
Claudio Atzori 2020-08-04 12:29:42 +02:00
parent 771bf8bcc4
commit f3ce97ecf9
1 changed files with 32 additions and 1 deletions

View File

@ -64,12 +64,43 @@
</property> </property>
</parameters> </parameters>
<start to="fork_merge_graph"/> <start to="should_drop_db"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<decision name="should_drop_db">
<switch>
<case to="fork_merge_graph">${wf:conf('outputGraphFormat') eq 'JSON'}</case>
<case to="reset_DB">${wf:conf('outputGraphFormat') eq 'HIVE'}</case>
<default to="reset_DB"/>
</switch>
</decision>
<action name="reset_DB">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>reset_DB</name>
<class>eu.dnetlib.dhp.common.ResetHiveDbApplication</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--dbName</arg><arg>${outputGraph}</arg>
</spark>
<ok to="fork_merge_graph"/>
<error to="Kill"/>
</action>
<fork name="fork_merge_graph"> <fork name="fork_merge_graph">
<path start="merge_publication"/> <path start="merge_publication"/>
<path start="merge_dataset"/> <path start="merge_dataset"/>