replaced parameter 'reuseRecords' with 'resumeFrom', allowing to restart the provision workflow execution from any step, useful for manual submissions or debugging

This commit is contained in:
Claudio Atzori 2020-05-22 08:50:06 +02:00
parent dbfb9c19fe
commit b33dd58be4
1 changed files with 11 additions and 11 deletions

View File

@ -84,7 +84,6 @@
<name>sparkNetworkTimeout</name> <name>sparkNetworkTimeout</name>
<description>configures spark.network.timeout</description> <description>configures spark.network.timeout</description>
</property> </property>
</parameters> </parameters>
<global> <global>
@ -98,12 +97,16 @@
</configuration> </configuration>
</global> </global>
<start to="reuse_records"/> <start to="resume_from"/>
<decision name="reuse_records"> <decision name="resume_from">
<switch> <switch>
<case to="prepare_relations">${wf:conf('reuseRecords') eq false}</case> <case to="prepare_relations">${wf:conf('resumeFrom') eq 'prepare_relations'}</case>
<case to="to_solr_index">${wf:conf('reuseRecords') eq true}</case> <case to="fork_join_related_entities">${wf:conf('resumeFrom') eq 'fork_join_related_entities'}</case>
<case to="join_all_entities">${wf:conf('resumeFrom') eq 'join_all_entities'}</case>
<case to="adjancency_lists">${wf:conf('resumeFrom') eq 'adjancency_lists'}</case>
<case to="convert_to_xml">${wf:conf('resumeFrom') eq 'convert_to_xml'}</case>
<case to="to_solr_index">${wf:conf('resumeFrom') eq 'to_solr_index'}</case>
<default to="prepare_relations"/> <default to="prepare_relations"/>
</switch> </switch>
</decision> </decision>
@ -131,9 +134,7 @@
</spark-opts> </spark-opts>
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg> <arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg> <arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--relPartitions</arg><arg>${relPartitions}</arg> <arg>--relPartitions</arg><arg>3000</arg>
<arg>--relationFilter</arg><arg>${relationFilter}</arg>
<arg>--maxRelations</arg><arg>${maxRelations}</arg>
</spark> </spark>
<ok to="fork_join_related_entities"/> <ok to="fork_join_related_entities"/>
<error to="Kill"/> <error to="Kill"/>
@ -340,7 +341,6 @@
<join name="wait_joins" to="join_all_entities"/> <join name="wait_joins" to="join_all_entities"/>
<action name="join_all_entities"> <action name="join_all_entities">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -362,7 +362,7 @@
<arg>--inputGraphRootPath</arg><arg>${inputGraphRootPath}</arg> <arg>--inputGraphRootPath</arg><arg>${inputGraphRootPath}</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg> <arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities</arg> <arg>--outputPath</arg><arg>${workingDir}/join_entities</arg>
<arg>--numPartitions</arg><arg>24000</arg> <arg>--numPartitions</arg><arg>12000</arg>
</spark> </spark>
<ok to="adjancency_lists"/> <ok to="adjancency_lists"/>
<error to="Kill"/> <error to="Kill"/>
@ -386,7 +386,7 @@
--conf spark.sql.shuffle.partitions=7680 --conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout} --conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/join_entities</arg> <arg>--inputPath</arg> <arg>${workingDir}/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/joined</arg> <arg>--outputPath</arg><arg>${workingDir}/joined</arg>
</spark> </spark>
<ok to="convert_to_xml"/> <ok to="convert_to_xml"/>