1
0
Fork 0

[cleaning] the datasource master-duplicate fixup should not be brought to production yet

This commit is contained in:
Claudio Atzori 2023-01-20 09:20:26 +01:00
parent 3800361033
commit c1e2460293
1 changed files with 1 additions and 195 deletions

View File

@ -582,201 +582,7 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait_clean_country" to="should_patch_datasource_ids"/> <join name="wait_clean_country" to="End"/>
<decision name="should_patch_datasource_ids">
<switch>
<case to="get_ds_master_duplicate">${wf:conf('shouldClean') eq true}</case>
<default to="End"/>
</switch>
</decision>
<action name="get_ds_master_duplicate">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.clean.MasterDuplicateAction</main-class>
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/masterduplicate</arg>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
</java>
<ok to="fork_patch_cfhb"/>
<error to="Kill"/>
</action>
<fork name="fork_patch_cfhb">
<path start="patch_publication_cfhb"/>
<path start="patch_dataset_cfhb"/>
<path start="patch_otherresearchproduct_cfhb"/>
<path start="patch_software_cfhb"/>
</fork>
<action name="patch_publication_cfhb">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>patch publication cfhb</name>
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--resolvedPath</arg><arg>${workingDir}/cfHbResolved/publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/cfHbPatched/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
</spark>
<ok to="wait_clean_cfhb"/>
<error to="Kill"/>
</action>
<action name="patch_dataset_cfhb">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>patch dataset cfhb</name>
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--resolvedPath</arg><arg>${workingDir}/cfHbResolved/dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/cfHbPatched/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
</spark>
<ok to="wait_clean_cfhb"/>
<error to="Kill"/>
</action>
<action name="patch_otherresearchproduct_cfhb">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>patch otherresearchproduct cfhb</name>
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--resolvedPath</arg><arg>${workingDir}/cfHbResolved/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/cfHbPatched/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
</spark>
<ok to="wait_clean_cfhb"/>
<error to="Kill"/>
</action>
<action name="patch_software_cfhb">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>patch software cfhb</name>
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--resolvedPath</arg><arg>${workingDir}/cfHbResolved/software</arg>
<arg>--outputPath</arg><arg>${workingDir}/cfHbPatched/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
</spark>
<ok to="wait_clean_cfhb"/>
<error to="Kill"/>
</action>
<join name="wait_clean_cfhb" to="fork_copy_cfhb_patched_results"/>
<fork name="fork_copy_cfhb_patched_results">
<path start="copy_cfhb_patched_publication"/>
<path start="copy_cfhb_patched_dataset"/>
<path start="copy_cfhb_patched_otherresearchproduct"/>
<path start="copy_cfhb_patched_software"/>
</fork>
<action name="copy_cfhb_patched_publication">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${graphOutputPath}/publication"/>
</prepare>
<arg>${workingDir}/cfHbPatched/publication</arg>
<arg>${graphOutputPath}/publication</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_cfhb_patched_dataset">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${graphOutputPath}/dataset"/>
</prepare>
<arg>${workingDir}/cfHbPatched/dataset</arg>
<arg>${graphOutputPath}/dataset</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_cfhb_patched_otherresearchproduct">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${graphOutputPath}/otherresearchproduct"/>
</prepare>
<arg>${workingDir}/cfHbPatched/otherresearchproduct</arg>
<arg>${graphOutputPath}/otherresearchproduct</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_cfhb_patched_software">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${graphOutputPath}/software"/>
</prepare>
<arg>${workingDir}/cfHbPatched/software</arg>
<arg>${graphOutputPath}/software</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<join name="copy_wait" to="End"/>
<end name="End"/> <end name="End"/>