[affMatchings] adding choice to run the algo on oalexdata and get specific branch instead of release of affro

This commit is contained in:
Miriam Baglioni 2024-10-18 13:58:14 +02:00
parent 50401a872f
commit f8988af98d
3 changed files with 56 additions and 19 deletions

View File

@ -61,8 +61,12 @@
<version>1.8.1</version>
<configuration>
<connectionType>connection</connectionType>
<scmVersionType>tag</scmVersionType><!-- 'branch' can also be provided here -->
<scmVersion>${affro.release.version}</scmVersion><!-- in case of scmVersionType == 'branch', this field points to the branch name -->
<!--
<scmVersionType>tag</scmVersionType>--><!-- 'branch' can also be provided here -->
<!-- <scmVersion>${affro.release.version}</scmVersion>--><!-- in case of scmVersionType == 'branch', this field points to the branch name -->
<scmVersionType>branch</scmVersionType><!-- 'branch' can also be provided here -->
<scmVersion>openaire-workflow-ready</scmVersion><!-- in case of scmVersionType == 'branch', this field points to the branch name -->
<checkoutDirectory>${project.build.directory}/${oozie.package.file.name}/${oozieAppDir}/affRo</checkoutDirectory>
</configuration>
<executions>

View File

@ -9,11 +9,6 @@
# oozie.execution.log.file.location = target/extract-and-run-on-remote-host.log
# maven.executable=mvn
# Some memory and driver settings for more demanding tasks
sparkDriverMemory=10G
sparkExecutorMemory=10G
sparkExecutorCores=4
sparkShufflePartitions=7680
# The above is given differently in an example I found online
oozie.action.sharelib.for.spark=spark2
@ -31,12 +26,9 @@ spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListen
# The following is needed as a property of a workflow
wfAppPath=${oozieTopWfApplicationPath}
sparkExecutorMemory=10G
sparkDriverMemory=20G
sparkExecutorCores=4
sparkShufflePartitions=7680
resultFolder=${nameNode}/${workingDir}/affro-results
resultFolder=/tmp/affro-results/oalex
inputFolder=/user/zeppelin/affiliations/raw_aff_string/2024-08
#

View File

@ -56,13 +56,19 @@
</configuration>
</global>
<start to="run-affro"/>
<start to="resumeFrom"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="resumeFrom">
<switch>
<case to="run-affro-on-iisdata">${wf:conf('resumeFrom') eq 'IIS'}</case>
<default to="run-affro-on-oalexstrings"/>
<action name="run-affro">
</switch>
</decision>
<action name="run-affro-on-iisdata">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
@ -71,11 +77,11 @@
<jar>update_records.py</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--executor-cores=4
--executor-memory=6G
--driver-memory=15G
--conf spark.executor.memoryOverhead=6G
--conf spark.sql.shuffle.partitions=20000
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -96,5 +102,40 @@
</action>
<action name="run-affro-on-oalexstrings">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Affiliations inference (Affro)</name>
<jar>strings.py</jar>
<spark-opts>
--executor-cores=4
--executor-memory=6G
--driver-memory=15G
--conf spark.executor.memoryOverhead=6G
--conf spark.sql.shuffle.partitions=20000
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3
--conf spark.executorEnv.PYSPARK_PYTHON=python3
--py-files ${wfAppPath}/affRo/affro_cluster.py,${wfAppPath}/affRo/create_input_cluster.py,${wfAppPath}/affRo/functions_cluster.py,${wfAppPath}/affRo/matching_cluster.py
--files ${wfAppPath}/affRo/dictionaries/dix_acad.json,${wfAppPath}/affRo/dictionaries/dix_categ.json,${wfAppPath}/affRo/dictionaries/dix_city.json,${wfAppPath}/affRo/dictionaries/dix_country.json,${wfAppPath}/affRo/dictionaries/dix_mult.json,${wfAppPath}/affRo/dictionaries/dix_status.json,${wfAppPath}/affRo/txt_files/city_names.txt,${wfAppPath}/affRo/txt_files/remove_list.txt,${wfAppPath}/affRo/txt_files/stop_words.txt,${wfAppPath}/affRo/txt_files/university_terms.txt
</spark-opts>
<arg>${inputFolder}</arg>
<arg>${resultFolder}</arg>
<file>${wfAppPath}/affRo/strings.py#strings.py</file>
</spark>
<ok to="End" />
<error to="Kill" />
</action>
<end name="End"/>
</workflow-app>