1
0
Fork 0

Merge branch '8172_impact_indicators_workflow' of https://code-repo.d4science.org/D-Net/dnet-hadoop into 8172_impact_indicators_workflow

This commit is contained in:
Ilias Kanellos 2023-05-16 17:34:53 +03:00
commit 38020e242a
3 changed files with 82 additions and 91 deletions

View File

@ -24,3 +24,13 @@ mvn package -Poozie-package,deploy,run -Dworkflow.source.dir=eu/dnetlib/dhp/oa/g
```
Note: edit the property `bip.ranker.tag` of the `pom.xml` file to specify the tag of [BIP-Ranker](https://github.com/athenarc/Bip-Ranker) that you want to use.
Job info and logs:
```
export OOZIE_URL=http://iis-cdh5-test-m3:11000/oozie
oozie job -info <jobId>
oozie job -log <jobId>
```
where `jobId` is the id of the job returned by the `run_workflow.sh` script.

View File

@ -76,7 +76,7 @@ bipScorePath=${workingDir}/openaire_universe_scores/
checkpointDir=${nameNode}/${workingDir}/check/
# The directory for the doi-based bip graph
bipGraphFilePath=${nameNode}/${workingDir}/bipdbv8_graph
# bipGraphFilePath=${nameNode}/${workingDir}/bipdbv8_graph
# The folder from which synonyms of openaire-ids are read
# openaireDataInput=${nameNode}/tmp/beta_provision/graph/21_graph_cleaned/
@ -89,9 +89,12 @@ synonymFolder=${nameNode}/${workingDir}/openaireid_to_dois/
openaireGraphInputPath=${nameNode}/${workingDir}/openaire_id_graph
# The workflow application path
wfAppPath=${nameNode}/${oozieWorkflowPath}
wfAppPath=${oozieTopWfApplicationPath}
# The following is needed as a property of a workflow
oozie.wf.application.path=${wfAppPath}
#oozie.wf.application.path=${wfAppPath}
oozie.wf.application.path=${oozieTopWfApplicationPath}
# Path where the final output should be?
actionSetOutputPath=${workingDir}/bip_actionsets/
@ -99,3 +102,4 @@ actionSetOutputPath=${workingDir}/bip_actionsets/
# The directory to store project impact indicators
projectImpactIndicatorsOutput=${workingDir}/project_indicators
resume=create-openaire-ranking-graph

View File

@ -1,21 +1,33 @@
<workflow-app xmlns="uri:oozie:workflow:0.5" name="ranking-wf">
<!-- Global params -->
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<!-- start using a decision node, so as to determine from which point onwards a job will continue -->
<!-- <start to="get-doi-synonyms" /> -->
<start to="entry-point-decision" />
<decision name="entry-point-decision">
<switch>
<!-- The default will be set as the normal start, a.k.a. get-doi-synonyms -->
<!-- If any different condition is set, go to the corresponding start -->
<case to="non-iterative-rankings">${resume eq "rankings-start"}</case>
<case to="spark-impulse">${resume eq "impulse"}</case>
<case to="iterative-rankings">${resume eq "rankings-iterative"}</case>
<case to="get-file-names">${resume eq "format-results"}</case>
<case to="map-openaire-to-doi">${resume eq "map-ids"}</case>
<case to="map-scores-to-dois">${resume eq "map-scores"}</case>
<case to="create-openaire-ranking-graph">${resume eq "start"}</case>
<case to="project-impact-indicators">${resume eq "projects-impact"}</case>
<case to="non-iterative-rankings">${wf:conf('resume') eq "rankings-start"}</case>
<case to="spark-impulse">${wf:conf('resume') eq "impulse"}</case>
<case to="iterative-rankings">${wf:conf('resume') eq "rankings-iterative"}</case>
<case to="get-file-names">${wf:conf('resume') eq "format-results"}</case>
<case to="map-openaire-to-doi">${wf:conf('resume') eq "map-ids"}</case>
<case to="map-scores-to-dois">${wf:conf('resume') eq "map-scores"}</case>
<case to="create-openaire-ranking-graph">${wf:conf('resume') eq "start"}</case>
<case to="project-impact-indicators">${wf:conf('resume') eq "projects-impact"}</case>
<!-- TODO: add action set creation here -->
<default to="create-openaire-ranking-graph" />
@ -26,10 +38,7 @@
<action name="create-openaire-ranking-graph">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- Delete previously created doi synonym folder -->
<!-- I think we don't need this given we don't have synonyms anymore
<prepare>
@ -90,10 +99,6 @@
<action name="spark-cc">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
@ -121,7 +126,7 @@
<!-- number of partitions to be used on joins -->
<arg>${sparkShufflePartitions}</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/CC.py#CC.py</file>
<file>${wfAppPath}/bip-ranker/CC.py#CC.py</file>
</spark>
<!-- Do this after finishing okay -->
@ -135,10 +140,6 @@
<action name="spark-ram">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
@ -170,7 +171,7 @@
<arg>${sparkShufflePartitions}</arg>
<arg>${checkpointDir}</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/TAR.py#TAR.py</file>
<file>${wfAppPath}/bip-ranker/TAR.py#TAR.py</file>
</spark>
<!-- Do this after finishing okay -->
@ -187,10 +188,6 @@
<action name="spark-impulse">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
@ -219,7 +216,7 @@
<arg>${sparkShufflePartitions}</arg>
<arg>3</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/CC.py#CC.py</file>
<file>${wfAppPath}/bip-ranker/CC.py#CC.py</file>
</spark>
<!-- Do this after finishing okay -->
@ -238,10 +235,6 @@
<action name="spark-pagerank">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- we could add map-reduce configs here, but I don't know if we need them -->
<!-- This is the type of master-client configuration for running spark -->
@ -281,7 +274,7 @@
<arg>${sparkShufflePartitions}</arg>
<arg>dfs</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/PageRank.py#PageRank.py</file>
<file>${wfAppPath}/bip-ranker/PageRank.py#PageRank.py</file>
</spark>
<!-- Do this after finishing okay -->
@ -295,10 +288,6 @@
<action name="spark-attrank">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
@ -335,7 +324,7 @@
<arg>${sparkShufflePartitions}</arg>
<arg>dfs</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/AttRank.py#AttRank.py</file>
<file>${wfAppPath}/bip-ranker/AttRank.py#AttRank.py</file>
</spark>
<!-- Do this after finishing okay -->
@ -353,10 +342,6 @@
<action name="get-file-names">
<!-- This is required as a tag for shell jobs -->
<shell xmlns="uri:oozie:shell-action:0.3">
<!-- Same for all -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs -->
<name-node>${nameNode}</name-node>
<!-- Exec is needed for shell commands - points to type of shell command -->
<exec>/usr/bin/bash</exec>
@ -378,7 +363,6 @@
</action>
<!-- Now we will run in parallel the formatting of ranking files for BiP! DB and openaire (json files) -->
<fork name="format-result-files">
<path start="format-bip-files"/>
@ -391,10 +375,6 @@
<action name="format-json-files">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
@ -443,10 +423,6 @@
<action name="format-bip-files">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
@ -498,10 +474,7 @@
<action name="map-openaire-to-doi">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- Delete previously created doi synonym folder -->
<prepare>
<delete path="${synonymFolder}"/>
@ -548,10 +521,6 @@
<action name="map-scores-to-dois">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
@ -636,10 +605,7 @@
<action name="project-impact-indicators">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
<mode>cluster</mode>
@ -714,47 +680,54 @@
<error to="actionset-project-creation-fail"/>
</action>
<!-- TODO: end the workflow-->
<!-- Define ending node -->
<end name="end" />
<!-- Definitions of failure messages -->
<kill name="pagerank-fail">
<message>PageRank failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
<!-- Definitions of failure messages -->
<kill name="openaire-graph-error">
<message>Creation of openaire-graph failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="attrank-fail">
<message>AttRank failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="cc-fail">
<message>CC failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="impulse-fail">
<message>Impulse failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="ram-fail">
<message>RAM failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
</kill>
<kill name="openaire-graph-error">
<message>Creation of openaire-graph failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="impulse-fail">
<message>Impulse failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="pagerank-fail">
<message>PageRank failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="attrank-fail">
<message>AttRank failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="filename-getting-error">
<message>Error getting key-value pairs for output files, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="json-formatting-fail">
<message>Error formatting json files, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="bip-formatting-fail">
<message>Error formatting BIP files, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="synonym-collection-fail">
<message>Synonym collection failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
</kill>
<kill name="map-scores-fail">
<message>Mapping scores to DOIs failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
</kill>
<kill name="actionset-delete-fail">
<message>Deleting output path for actionsets failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
</kill>
<kill name="actionset-creation-fail">
<message>ActionSet creation for results failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -767,4 +740,8 @@
<kill name="actionset-project-creation-fail">
<message>ActionSet creation for projects failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<!-- Define ending node -->
<end name="end" />
</workflow-app>