[affRo] added option to run on crossref

This commit is contained in:
Miriam Baglioni 2024-10-24 11:49:13 +02:00
parent 595883fef0
commit 420f43fc2f
2 changed files with 43 additions and 3 deletions

View File

@ -26,10 +26,15 @@ spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListen
# The following is needed as a property of a workflow
wfAppPath=${oozieTopWfApplicationPath}
resumeFrom=Crossref
resultFolder=/tmp/affro-results/oalex
inputFolder=/user/zeppelin/affiliations/raw_aff_string/2024-08
#OpenAlex input/output
#resultFolder=/tmp/affro-results/oalex
#inputFolder=/user/zeppelin/affiliations/raw_aff_string/2024-08
#Crossref input/output
resultFolder=/tmp/affro-results/crossref
inputFolder=/data/doiboost/crossref/crossref_unpack
#
#crossrefInputPath=/data/bip-affiliations/crossref-data.json

View File

@ -64,8 +64,8 @@
<decision name="resumeFrom">
<switch>
<case to="run-affro-on-iisdata">${wf:conf('resumeFrom') eq 'IIS'}</case>
<case to="run-affro-on-crossref">${wf:conf('resumeFrom') eq 'Crossref'}</case>
<default to="run-affro-on-oalexstrings"/>
</switch>
</decision>
<action name="run-affro-on-iisdata">
@ -137,5 +137,40 @@
</action>
<action name="run-affro-on-crossref">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Affiliations inference (Affro)</name>
<jar>crossref.py</jar>
<spark-opts>
--executor-cores=4
--executor-memory=6G
--driver-memory=15G
--conf spark.executor.memoryOverhead=6G
--conf spark.sql.shuffle.partitions=20000
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3
--conf spark.executorEnv.PYSPARK_PYTHON=python3
--py-files ${wfAppPath}/affRo/affro_cluster.py,${wfAppPath}/affRo/create_input_cluster.py,${wfAppPath}/affRo/functions_cluster.py,${wfAppPath}/affRo/matching_cluster.py
--files ${wfAppPath}/affRo/dictionaries/dix_acad.json,${wfAppPath}/affRo/dictionaries/dix_categ.json,${wfAppPath}/affRo/dictionaries/dix_city.json,${wfAppPath}/affRo/dictionaries/dix_country.json,${wfAppPath}/affRo/dictionaries/dix_mult.json,${wfAppPath}/affRo/dictionaries/dix_status.json,${wfAppPath}/affRo/txt_files/city_names.txt,${wfAppPath}/affRo/txt_files/remove_list.txt,${wfAppPath}/affRo/txt_files/stop_words.txt,${wfAppPath}/affRo/txt_files/university_terms.txt
</spark-opts>
<arg>${inputFolder}</arg>
<arg>${resultFolder}</arg>
<file>${wfAppPath}/affRo/crossref.py#crossref.py</file>
</spark>
<ok to="End" />
<error to="Kill" />
</action>
<end name="End"/>
</workflow-app>