8172_impact_indicators_workflow #284

Merged
miriam.baglioni merged 49 commits from 8172_impact_indicators_workflow into beta 2023-08-14 15:50:48 +02:00
2 changed files with 163 additions and 143 deletions
Showing only changes of commit b8e8c959fe - Show all commits

View File

@ -1,18 +1,16 @@
# The following set of properties are defined in https://support.openaire.eu/projects/openaire/wiki/Hadoop_clusters
# and concern the parameterization required for running workflows on the @GARR cluster
dhp.hadoop.frontend.temp.dir=/home/ilias.kanellos
dhp.hadoop.frontend.user.name=ilias.kanellos
dhp.hadoop.frontend.host.name=iis-cdh5-test-gw.ocean.icm.edu.pl
dhp.hadoop.frontend.port.ssh=22
oozieServiceLoc=http://iis-cdh5-test-m3:11000/oozie
jobTracker=yarnRM
nameNode=hdfs://nameservice1
oozie.execution.log.file.location = target/extract-and-run-on-remote-host.log
maven.executable=mvn
sparkDriverMemory=7G
sparkExecutorMemory=7G
sparkExecutorCores=4
# --- You can override the following properties (if needed) coming from your ~/.dhp/application.properties ---
# dhp.hadoop.frontend.temp.dir=/home/ilias.kanellos
# dhp.hadoop.frontend.user.name=ilias.kanellos
# dhp.hadoop.frontend.host.name=iis-cdh5-test-gw.ocean.icm.edu.pl
# dhp.hadoop.frontend.port.ssh=22
# oozieServiceLoc=http://iis-cdh5-test-m3:11000/oozie
# jobTracker=yarnRM
# nameNode=hdfs://nameservice1
# oozie.execution.log.file.location = target/extract-and-run-on-remote-host.log
# maven.executable=mvn
# Some memory and driver settings for more demanding tasks
sparkHighDriverMemory=20G
@ -21,6 +19,9 @@ sparkNormalDriverMemory=10G
sparkHighExecutorMemory=20G
sparkNormalExecutorMemory=10G
sparkExecutorCores=4
sparkShufflePartitions=7680
# The above is given differently in an example I found online
oozie.action.sharelib.for.spark=spark2
oozieActionShareLibForSpark2=spark2
@ -66,29 +67,26 @@ ramGamma=0.6
convergenceError=0.000000000001
# I think this should be the oozie workflow directory
oozieWorkflowPath=user/ilias.kanellos/workflow_example/
# The directory where the workflow data is/should be stored
workflowDataDir=user/ilias.kanellos/ranking_workflow
# oozieWorkflowPath=user/ilias.kanellos/workflow_example/
# Directory where json data containing scores will be output
bipScorePath=${workflowDataDir}/openaire_universe_scores/
bipScorePath=${workingDir}/openaire_universe_scores/
# Directory where dataframes are checkpointed
checkpointDir=${nameNode}/${workflowDataDir}/check/
checkpointDir=${nameNode}/${workingDir}/check/
# The directory for the doi-based bip graph
bipGraphFilePath=${nameNode}/${workflowDataDir}/bipdbv8_graph
bipGraphFilePath=${nameNode}/${workingDir}/bipdbv8_graph
# The folder from which synonyms of openaire-ids are read
# openaireDataInput=${nameNode}/tmp/beta_provision/graph/21_graph_cleaned/
openaireDataInput=${/tmp/prod_provision/graph/18_graph_blacklisted}
openaireDataInput=/tmp/prod_provision/graph/18_graph_blacklisted
# A folder where we will write the openaire to doi mapping
synonymFolder=${nameNode}/${workflowDataDir}/openaireid_to_dois/
synonymFolder=${nameNode}/${workingDir}/openaireid_to_dois/
# This will be where we store the openaire graph input. They told us on GARR to use a directory under /data
openaireGraphInputPath=${nameNode}/${workflowDataDir}/openaire_id_graph
openaireGraphInputPath=${nameNode}/${workingDir}/openaire_id_graph
# The workflow application path
wfAppPath=${nameNode}/${oozieWorkflowPath}
@ -96,8 +94,8 @@ wfAppPath=${nameNode}/${oozieWorkflowPath}
oozie.wf.application.path=${wfAppPath}
# Path where the final output should be?
actionSetOutputPath=${workflowDataDir}/bip_actionsets/
actionSetOutputPath=${workingDir}/bip_actionsets/
# The directory to store project impact indicators
projectImpactIndicatorsOutput=${workflowDataDir}/project_indicators
projectImpactIndicatorsOutput=${workingDir}/project_indicators

View File

@ -46,21 +46,23 @@
<!-- Script name goes here -->
<jar>create_openaire_ranking_graph.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 20G --executor-cores 4 --driver-memory 20G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
<spark-opts>
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkHighDriverMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<!-- Script arguments here -->
<!-- The openaire graph data from which to read relations and objects -->
<arg>${openaireDataInput}</arg>
<!-- Year for filtering entries w/ larger values / empty -->
<arg>${currentYear}</arg>
<!-- number of partitions to be used on joins -->
<arg>7680</arg>
<arg>${sparkShufflePartitions}</arg>
<!-- The output of the graph should be the openaire input graph for ranking-->
<arg>${openaireGraphInputPath}</arg>
<!-- This needs to point to the file on the hdfs i think -->
@ -100,18 +102,20 @@
<!-- Script name goes here -->
<jar>CC.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 18G --executor-cores 4 --driver-memory 10G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
<spark-opts>
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<!-- Script arguments here -->
<arg>${openaireGraphInputPath}</arg>
<!-- number of partitions to be used on joins -->
<arg>7680</arg>
<arg>${sparkShufflePartitions}</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/CC.py#CC.py</file>
</spark>
@ -141,21 +145,23 @@
<!-- Script name goes here -->
<jar>TAR.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 18G --executor-cores 4 --driver-memory 10G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
<spark-opts>
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<!-- Script arguments here -->
<arg>${openaireGraphInputPath}</arg>
<arg>${ramGamma}</arg>
<arg>${currentYear}</arg>
<arg>RAM</arg>
<!-- number of partitions to be used on joins -->
<arg>7680</arg>
<arg>${sparkShufflePartitions}</arg>
<arg>${checkpointDir}</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/TAR.py#TAR.py</file>
@ -189,18 +195,20 @@
<!-- Script name goes here -->
<jar>CC.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 18G --executor-cores 4 --driver-memory 10G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
<spark-opts>
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<!-- Script arguments here -->
<arg>${openaireGraphInputPath}</arg>
<!-- number of partitions to be used on joins -->
<arg>7680</arg>
<arg>${sparkShufflePartitions}</arg>
<arg>3</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/CC.py#CC.py</file>
@ -244,21 +252,23 @@
<!-- Script name goes here -->
<jar>PageRank.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 18G --executor-cores 4 --driver-memory 10G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
<spark-opts>
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<!-- Script arguments here -->
<arg>${openaireGraphInputPath}</arg>
<arg>${pageRankAlpha}</arg>
<arg>${convergenceError}</arg>
<arg>${checkpointDir}</arg>
<!-- number of partitions to be used on joins -->
<arg>7680</arg>
<arg>${sparkShufflePartitions}</arg>
<arg>dfs</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/PageRank.py#PageRank.py</file>
@ -289,14 +299,16 @@
<!-- Script name goes here -->
<jar>AttRank.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 18G --executor-cores 4 --driver-memory 10G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
<spark-opts>
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<!-- Script arguments here -->
<arg>${openaireGraphInputPath}</arg>
<arg>${attrankAlpha}</arg>
@ -308,7 +320,7 @@
<arg>${convergenceError}</arg>
<arg>${checkpointDir}</arg>
<!-- number of partitions to be used on joins -->
<arg>7680</arg>
<arg>${sparkShufflePartitions}</arg>
<arg>dfs</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/AttRank.py#AttRank.py</file>
@ -339,7 +351,7 @@
<!-- name of script to run -->
<argument>get_ranking_files.sh</argument>
<!-- We only pass the directory where we expect to find the rankings -->
<argument>/${workflowDataDir}</argument>
<argument>/${workingDir}</argument>
<!-- the name of the file run -->
<file>${wfAppPath}/get_ranking_files.sh#get_ranking_files.sh</file>
@ -381,24 +393,26 @@
<!-- Script name goes here -->
<jar>format_ranking_results.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 10G --executor-cores 4 --driver-memory 10G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
<spark-opts>
--executor-memory=${sparkNormalExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<!-- Script arguments here -->
<arg>json-5-way</arg>
<!-- Input files must be identified dynamically -->
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['cc_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['impulse_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['ram_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['pr_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['attrank_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['cc_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['impulse_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['ram_file']}</arg>
<!-- Num partitions -->
<arg>7680</arg>
<arg>${sparkShufflePartitions}</arg>
<!-- Type of data to be produced [bip (dois) / openaire (openaire-ids) ] -->
<arg>openaire</arg>
<!-- This needs to point to the file on the hdfs i think -->
@ -429,24 +443,26 @@
<!-- Script name goes here -->
<jar>format_ranking_results.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 10G --executor-cores 4 --driver-memory 10G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
<spark-opts>
--executor-memory=${sparkNormalExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<!-- Script arguments here -->
<arg>zenodo</arg>
<!-- Input files must be identified dynamically -->
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['cc_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['impulse_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['ram_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['pr_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['attrank_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['cc_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['impulse_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['ram_file']}</arg>
<!-- Num partitions -->
<arg>7680</arg>
<arg>${sparkShufflePartitions}</arg>
<!-- Type of data to be produced [bip (dois) / openaire (openaire-ids) ] -->
<arg>openaire</arg>
<!-- This needs to point to the file on the hdfs i think -->
@ -484,14 +500,16 @@
<!-- Script name goes here -->
<jar>map_openaire_ids_to_dois.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 18G --executor-cores 4 --driver-memory 15G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
<spark-opts>
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkHighDriverMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<!-- Script arguments here -->
<arg>${openaireDataInput}</arg>
<!-- number of partitions to be used on joins -->
@ -526,24 +544,26 @@
<!-- Script name goes here -->
<jar>map_scores_to_dois.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 18G --executor-cores 4 --driver-memory 15G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
<spark-opts>
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkHighDriverMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<!-- Script arguments here -->
<arg>${synonymFolder}</arg>
<!-- Number of partitions -->
<arg>7680</arg>
<arg>${sparkShufflePartitions}</arg>
<!-- The remaining input are the ranking files fproduced for bip db-->
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['cc_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['impulse_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['ram_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['pr_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['attrank_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['cc_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['impulse_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['ram_file']}</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/map_scores_to_dois.py#map_scores_to_dois.py</file>
@ -576,9 +596,9 @@
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-memory=${sparkNormalExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -609,14 +629,16 @@
<!-- Script name goes here -->
<jar>projects_impact.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 18G --executor-cores 4 --driver-memory 10G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
<spark-opts>
--executor-memory=${sparkHighExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<!-- Script arguments here -->
@ -624,13 +646,13 @@
<arg>${openaireDataInput}/relations</arg>
<!-- input files with impact indicators for results -->
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['cc_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['impulse_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['pr_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['attrank_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['cc_file']}</arg>
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['impulse_file']}</arg>
<!-- number of partitions to be used on joins -->
<arg>7680</arg>
<arg>${sparkShufflePartitions}</arg>
<arg>${projectImpactIndicatorsOutput}</arg>
@ -654,9 +676,9 @@
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-memory=${sparkNormalExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--driver-memory=${sparkNormalDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}