From 815a4ddbbaf6fa68a23d576189db2ee03f97f828 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Wed, 26 Apr 2023 20:40:06 +0300 Subject: [PATCH] Add actionset creation for project bip indicators in workflow --- .../bipfinder/SparkAtomicActionScoreJob.java | 7 +- .../impact_indicators/oozie_app/workflow.xml | 81 +++++++++++-------- 2 files changed, 53 insertions(+), 35 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index 13ce1440a8..8b8e057238 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -41,7 +41,8 @@ import scala.Tuple2; */ public class SparkAtomicActionScoreJob implements Serializable { - private static final String DOI = "doi"; + private static final String RESULT = "result"; + private static final String PROJECT = "project"; private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJob.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -79,10 +80,10 @@ public class SparkAtomicActionScoreJob implements Serializable { // follow different procedures for different target entities switch (targetEntity) { - case "result": + case RESULT: prepareResults(spark, inputPath, outputPath); break; - case "project": + case PROJECT: prepareProjects(spark, inputPath, outputPath); break; default: diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index ac44d5c05c..c77443bd9d 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -34,7 +34,6 @@ - yarn-cluster @@ -90,9 +89,8 @@ ${nameNode} - - - yarn-cluster + + yarn-cluster cluster @@ -131,7 +129,6 @@ ${jobTracker} ${nameNode} - yarn-cluster @@ -181,9 +178,8 @@ ${nameNode} - - - yarn-cluster + + yarn-cluster cluster @@ -235,7 +231,7 @@ - + yarn-cluster @@ -336,12 +332,12 @@ ${nameNode} - - /usr/bin/bash - - get_ranking_files.sh - - /${workflowDataDir} + + /usr/bin/bash + + get_ranking_files.sh + + /${workflowDataDir} ${wfAppPath}/get_ranking_files.sh#get_ranking_files.sh @@ -374,8 +370,8 @@ ${nameNode} - - yarn-cluster + + yarn-cluster cluster @@ -422,8 +418,8 @@ ${nameNode} - - yarn-cluster + + yarn-cluster cluster @@ -476,7 +472,6 @@ - yarn-cluster @@ -520,7 +515,6 @@ ${nameNode} - yarn-cluster cluster @@ -564,17 +558,12 @@ - - + - - + yarn cluster @@ -593,12 +582,12 @@ --inputPath${bipScorePath} --outputPath${actionSetOutputPath} - + --targetEntityresult + - @@ -645,13 +634,38 @@ - + + + + yarn + cluster + Produces the atomic action with the bip finder scores for projects + eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --inputPath${projectImpactIndicatorsOutput} + --outputPath${actionSetOutputPath} + --targetEntityproject + + + + + @@ -695,11 +709,14 @@ - ActionSet creation failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + ActionSet creation for results failed, error message[${wf:errorMessage(wf:lastErrorNode())}] Calculating project impact indicators failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + ActionSet creation for projects failed, error message[${wf:errorMessage(wf:lastErrorNode())}] +