From b5c252865c15605e8b6ff154891d82a7544763d8 Mon Sep 17 00:00:00 2001 From: ikanellos Date: Mon, 20 Mar 2023 15:38:36 +0200 Subject: [PATCH 1/2] Add filtering based on citation source --- .../main/resources/create_openaire_ranking_graph.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/create_openaire_ranking_graph.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/create_openaire_ranking_graph.py index 4cffa86a3..cda12a77c 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/create_openaire_ranking_graph.py +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/create_openaire_ranking_graph.py @@ -126,12 +126,19 @@ oa_objects_df = oa_objects_df.drop('deletedbyinference').drop('invisible').disti # Collect only valid citations i.e., invisible = false & deletedbyinference=false cites_df = spark.read.json(graph_folder + "/relation")\ - .select(F.col('source').alias('citing'), F.col('target').alias('cited'), 'relClass', 'dataInfo.deletedbyinference', 'dataInfo.invisible')\ + .select(F.col('source').alias('citing'), F.col('target').alias('cited'), 'collectedfrom.value', 'relClass', 'dataInfo.deletedbyinference', 'dataInfo.invisible')\ .where( (F.col('relClass') == "Cites") \ & (F.col('dataInfo.deletedbyinference') == "false")\ & (F.col('dataInfo.invisible') == "false"))\ .drop('dataInfo.deletedbyinference').drop('dataInfo.invisible')\ - .repartition(num_partitions, 'citing').drop('relClass') + .repartition(num_partitions, 'citing').drop('relClass')\ + .withColumn('collected_lower', F.expr('transform(collectedfrom.value, x -> lower(x))'))\ + .drop('collectedfrom.value')\ + .where( + (F.array_contains(F.col('collected_lower'), "opencitations")) + | (F.array_contains(F.col('collected_lower'), "crossref")) + | (F.array_contains(F.col('collected_lower'), "mag")) + ).drop('collected_lower') # print ("Cited df has: " + str(cites_df.count()) + " entries") # DEPRECATED From 9dc8f0f05f2d527bccbde92680f864dbb635710f Mon Sep 17 00:00:00 2001 From: ikanellos Date: Tue, 21 Mar 2023 16:14:15 +0200 Subject: [PATCH 2/2] Add ActionSet step --- .../src/main/resources/job.properties | 6 +++ .../src/main/resources/workflow.xml | 51 ++++++++++++++++++- 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/job.properties b/dhp-workflows/dhp-impact-indicators/src/main/resources/job.properties index 9ad9def21..a902c413f 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/job.properties +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/job.properties @@ -63,6 +63,9 @@ oozieWorkflowPath=user/ilias.kanellos/workflow_example/ # The directory where the workflow data is/should be stored workflowDataDir=user/ilias.kanellos/ranking_workflow +# Directory where json data containing scores will be output +bipScorePath=${workflowDataDir}/openaire_universe_scores/ + # Directory where dataframes are checkpointed checkpointDir=${nameNode}/${workflowDataDir}/check/ @@ -84,3 +87,6 @@ wfAppPath=${nameNode}/${oozieWorkflowPath} # The following is needed as a property of a workflow oozie.wf.application.path=${wfAppPath} +# Path where the final output should be? +actionSetOutputPath=${workflowDataDir}/bip_actionsets/ + diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/workflow.xml index 807c32063..d99dc16a2 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/workflow.xml @@ -552,11 +552,50 @@ - + - + + + + + + + + + + + + + + + + yarn + cluster + Produces the atomic action with the bip finder scores for publications + eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --inputPath${bipScorePath} + --outputPath${actionSetOutputPath} + + + + + @@ -597,4 +636,12 @@ Mapping scores to DOIs failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + Deleting output path for actionsets failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + ActionSet creation failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + +