From f992ecb6573b507351773096af78d65faef1baac Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Tue, 21 Mar 2023 18:03:55 +0200 Subject: [PATCH 1/8] Checkout BIP-Ranker during 'prepare-package' && add it in the oozie-package.tar.gz --- dhp-workflows/dhp-impact-indicators/README.md | 14 ++++++-------- dhp-workflows/dhp-impact-indicators/pom.xml | 19 +++++++++++++++---- .../create_openaire_ranking_graph.py | 0 .../format_ranking_results.py | 0 .../{ => eu.dnetlib}/get_ranking_files.sh | 0 .../resources/{ => eu.dnetlib}/job.properties | 0 .../map_openaire_ids_to_dois.py | 0 .../{ => eu.dnetlib}/map_scores_to_dois.py | 0 .../resources/{ => eu.dnetlib}/workflow.xml | 0 dhp-workflows/pom.xml | 1 + 10 files changed, 22 insertions(+), 12 deletions(-) rename dhp-workflows/dhp-impact-indicators/src/main/resources/{ => eu.dnetlib}/create_openaire_ranking_graph.py (100%) rename dhp-workflows/dhp-impact-indicators/src/main/resources/{ => eu.dnetlib}/format_ranking_results.py (100%) rename dhp-workflows/dhp-impact-indicators/src/main/resources/{ => eu.dnetlib}/get_ranking_files.sh (100%) rename dhp-workflows/dhp-impact-indicators/src/main/resources/{ => eu.dnetlib}/job.properties (100%) rename dhp-workflows/dhp-impact-indicators/src/main/resources/{ => eu.dnetlib}/map_openaire_ids_to_dois.py (100%) rename dhp-workflows/dhp-impact-indicators/src/main/resources/{ => eu.dnetlib}/map_scores_to_dois.py (100%) rename dhp-workflows/dhp-impact-indicators/src/main/resources/{ => eu.dnetlib}/workflow.xml (100%) diff --git a/dhp-workflows/dhp-impact-indicators/README.md b/dhp-workflows/dhp-impact-indicators/README.md index 14f489da3..45a4701e7 100644 --- a/dhp-workflows/dhp-impact-indicators/README.md +++ b/dhp-workflows/dhp-impact-indicators/README.md @@ -1,4 +1,4 @@ -# Ranking Workflow for Openaire Publications +# Ranking Workflow for OpenAIRE Publications This project contains the files for running a paper ranking workflow on the openaire graph using apache oozie. All scripts are written in python and the project setup follows the typical oozie workflow structure: @@ -7,17 +7,15 @@ All scripts are written in python and the project setup follows the typical oozi - a job.properties file specifying parameter values for the parameters used by the workflow - a set of python scripts used by the workflow -**NOTE**: the workflow depends on the external library of ranking scripts called BiP! Ranker. +**NOTE**: the workflow depends on the external library of ranking scripts called [BiP! Ranker](https://github.com/athenarc/Bip-Ranker). You can check out a specific tag/release of BIP! Ranker using maven, as described in the following section. -## Check out a specific tag/release of BIP-Ranker +## Build and deploy -* Edit the `scmVersion` of the maven-scm-plugin in the pom.xml to point to the tag/release version you want to check out. - -* Then, use maven to perform the checkout: +Use the following command for packaging: ``` -mvn scm:checkout +mvn package -Poozie-package -Dworkflow.source.dir=eu/dnetlib/dhp/oa/graph/impact_indicators -DskipTests ``` -* The code should be visible under `src/main/bip-ranker` folder. \ No newline at end of file +Note: edit the property `bip.ranker.tag` of the `pom.xml` file to specify the tag of [BIP-Ranker](https://github.com/athenarc/Bip-Ranker) that you want to use. diff --git a/dhp-workflows/dhp-impact-indicators/pom.xml b/dhp-workflows/dhp-impact-indicators/pom.xml index b510635a6..644b82c7b 100644 --- a/dhp-workflows/dhp-impact-indicators/pom.xml +++ b/dhp-workflows/dhp-impact-indicators/pom.xml @@ -5,9 +5,8 @@ 4.0.0 eu.dnetlib.dhp - dhp + dhp-workflows 1.2.5-SNAPSHOT - ../pom.xml dhp-impact-indicators @@ -16,6 +15,9 @@ 8 8 UTF-8 + + + v1.0.0 @@ -32,9 +34,18 @@ connection tag - v1.0.0 - ${project.build.directory}/../src/main/bip-ranker + ${bip.ranker.tag} + ${project.build.directory}/${oozie.package.file.name}/${oozieAppDir}/bip-ranker + + + checkout-bip-ranker + prepare-package + + checkout + + + diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/create_openaire_ranking_graph.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/create_openaire_ranking_graph.py similarity index 100% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/create_openaire_ranking_graph.py rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/create_openaire_ranking_graph.py diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/format_ranking_results.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/format_ranking_results.py similarity index 100% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/format_ranking_results.py rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/format_ranking_results.py diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/get_ranking_files.sh b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/get_ranking_files.sh similarity index 100% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/get_ranking_files.sh rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/get_ranking_files.sh diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/job.properties b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/job.properties similarity index 100% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/job.properties rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/job.properties diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/map_openaire_ids_to_dois.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/map_openaire_ids_to_dois.py similarity index 100% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/map_openaire_ids_to_dois.py rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/map_openaire_ids_to_dois.py diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/map_scores_to_dois.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/map_scores_to_dois.py similarity index 100% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/map_scores_to_dois.py rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/map_scores_to_dois.py diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/workflow.xml similarity index 100% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/workflow.xml rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/workflow.xml diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index 541d59007..d054ba39b 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -38,6 +38,7 @@ dhp-usage-raw-data-update dhp-broker-events dhp-doiboost + dhp-impact-indicators From 3e8a4cf9521fdab068e47f48536e707d14f0ea18 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Tue, 21 Mar 2023 18:24:12 +0200 Subject: [PATCH 2/8] Rearrange resources folder structure --- .../create_openaire_ranking_graph.py | 11 +++- .../oozie_app}/format_ranking_results.py | 0 .../oozie_app}/get_ranking_files.sh | 0 .../oozie_app}/job.properties | 6 +++ .../oozie_app}/map_openaire_ids_to_dois.py | 0 .../oozie_app}/map_scores_to_dois.py | 0 .../impact_indicators/oozie_app}/workflow.xml | 51 ++++++++++++++++++- 7 files changed, 64 insertions(+), 4 deletions(-) rename dhp-workflows/dhp-impact-indicators/src/main/resources/{eu.dnetlib => eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app}/create_openaire_ranking_graph.py (95%) rename dhp-workflows/dhp-impact-indicators/src/main/resources/{eu.dnetlib => eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app}/format_ranking_results.py (100%) rename dhp-workflows/dhp-impact-indicators/src/main/resources/{eu.dnetlib => eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app}/get_ranking_files.sh (100%) rename dhp-workflows/dhp-impact-indicators/src/main/resources/{eu.dnetlib => eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app}/job.properties (93%) rename dhp-workflows/dhp-impact-indicators/src/main/resources/{eu.dnetlib => eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app}/map_openaire_ids_to_dois.py (100%) rename dhp-workflows/dhp-impact-indicators/src/main/resources/{eu.dnetlib => eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app}/map_scores_to_dois.py (100%) rename dhp-workflows/dhp-impact-indicators/src/main/resources/{eu.dnetlib => eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app}/workflow.xml (93%) diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/create_openaire_ranking_graph.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/create_openaire_ranking_graph.py similarity index 95% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/create_openaire_ranking_graph.py rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/create_openaire_ranking_graph.py index 4cffa86a3..cda12a77c 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/create_openaire_ranking_graph.py +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/create_openaire_ranking_graph.py @@ -126,12 +126,19 @@ oa_objects_df = oa_objects_df.drop('deletedbyinference').drop('invisible').disti # Collect only valid citations i.e., invisible = false & deletedbyinference=false cites_df = spark.read.json(graph_folder + "/relation")\ - .select(F.col('source').alias('citing'), F.col('target').alias('cited'), 'relClass', 'dataInfo.deletedbyinference', 'dataInfo.invisible')\ + .select(F.col('source').alias('citing'), F.col('target').alias('cited'), 'collectedfrom.value', 'relClass', 'dataInfo.deletedbyinference', 'dataInfo.invisible')\ .where( (F.col('relClass') == "Cites") \ & (F.col('dataInfo.deletedbyinference') == "false")\ & (F.col('dataInfo.invisible') == "false"))\ .drop('dataInfo.deletedbyinference').drop('dataInfo.invisible')\ - .repartition(num_partitions, 'citing').drop('relClass') + .repartition(num_partitions, 'citing').drop('relClass')\ + .withColumn('collected_lower', F.expr('transform(collectedfrom.value, x -> lower(x))'))\ + .drop('collectedfrom.value')\ + .where( + (F.array_contains(F.col('collected_lower'), "opencitations")) + | (F.array_contains(F.col('collected_lower'), "crossref")) + | (F.array_contains(F.col('collected_lower'), "mag")) + ).drop('collected_lower') # print ("Cited df has: " + str(cites_df.count()) + " entries") # DEPRECATED diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/format_ranking_results.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/format_ranking_results.py similarity index 100% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/format_ranking_results.py rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/format_ranking_results.py diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/get_ranking_files.sh b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/get_ranking_files.sh similarity index 100% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/get_ranking_files.sh rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/get_ranking_files.sh diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/job.properties b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties similarity index 93% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/job.properties rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties index 9ad9def21..a902c413f 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/job.properties +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties @@ -63,6 +63,9 @@ oozieWorkflowPath=user/ilias.kanellos/workflow_example/ # The directory where the workflow data is/should be stored workflowDataDir=user/ilias.kanellos/ranking_workflow +# Directory where json data containing scores will be output +bipScorePath=${workflowDataDir}/openaire_universe_scores/ + # Directory where dataframes are checkpointed checkpointDir=${nameNode}/${workflowDataDir}/check/ @@ -84,3 +87,6 @@ wfAppPath=${nameNode}/${oozieWorkflowPath} # The following is needed as a property of a workflow oozie.wf.application.path=${wfAppPath} +# Path where the final output should be? +actionSetOutputPath=${workflowDataDir}/bip_actionsets/ + diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/map_openaire_ids_to_dois.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_openaire_ids_to_dois.py similarity index 100% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/map_openaire_ids_to_dois.py rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_openaire_ids_to_dois.py diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/map_scores_to_dois.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_scores_to_dois.py similarity index 100% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/map_scores_to_dois.py rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_scores_to_dois.py diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml similarity index 93% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/workflow.xml rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index 807c32063..d99dc16a2 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -552,11 +552,50 @@ - + - + + + + + + + + + + + + + + + + yarn + cluster + Produces the atomic action with the bip finder scores for publications + eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --inputPath${bipScorePath} + --outputPath${actionSetOutputPath} + + + + + @@ -597,4 +636,12 @@ Mapping scores to DOIs failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + Deleting output path for actionsets failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + ActionSet creation failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + From 102aa5ab81bf2acf6b758b0255d4383f050d31d6 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Tue, 21 Mar 2023 19:25:29 +0200 Subject: [PATCH 3/8] Add dependency to dhp-aggregation --- dhp-workflows/dhp-impact-indicators/pom.xml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/dhp-workflows/dhp-impact-indicators/pom.xml b/dhp-workflows/dhp-impact-indicators/pom.xml index 644b82c7b..a9eb0a4a1 100644 --- a/dhp-workflows/dhp-impact-indicators/pom.xml +++ b/dhp-workflows/dhp-impact-indicators/pom.xml @@ -49,4 +49,14 @@ + + + + eu.dnetlib.dhp + dhp-aggregation + ${projectVersion} + compile + + + \ No newline at end of file From 7256c8d3c71c632ae0537e2c5ce585da738662b5 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Fri, 7 Apr 2023 16:30:12 +0300 Subject: [PATCH 4/8] Add script for aggregating impact indicators at the project level --- .../oozie_app/job.properties | 3 + .../oozie_app/projects_impact.py | 109 ++++++++++++++++++ .../impact_indicators/oozie_app/workflow.xml | 70 ++++++++++- 3 files changed, 176 insertions(+), 6 deletions(-) create mode 100644 dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties index a902c413f..f9f5519cc 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties @@ -90,3 +90,6 @@ oozie.wf.application.path=${wfAppPath} # Path where the final output should be? actionSetOutputPath=${workflowDataDir}/bip_actionsets/ +# The directory to store project impact indicators +projectImpactIndicatorsOutput=${workflowDataDir}/project_indicators + diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py new file mode 100644 index 000000000..f01c92a0d --- /dev/null +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py @@ -0,0 +1,109 @@ +import sys +from pyspark.sql import SparkSession +from pyspark import SparkConf, SparkContext +import pyspark.sql.functions as F +from pyspark.sql.types import StringType, IntegerType, StructType, StructField + +if len(sys.argv) < 8: + print("Usage: projects_impact.py ") + sys.exit(-1) + +appName = 'Project Impact Indicators' +conf = SparkConf().setAppName(appName) +sc = SparkContext(conf = conf) +spark = SparkSession.builder.appName(appName).getOrCreate() +sc.setLogLevel('OFF') + +# input parameters +relations_fd = sys.argv[1] +influence_fd = sys.argv[2] +popularity_fd = sys.argv[3] +cc_fd = sys.argv[4] +impulse_fd = sys.argv[5] +num_partitions = int(sys.argv[6]) +output_dir = sys.argv[7] + +# schema for impact indicator files +impact_files_schema = StructType([ + StructField('resultId', StringType(), False), + StructField('score', IntegerType(), False), + StructField('class', StringType(), False), +]) + +# list of impact indicators +impact_indicators = [ + ('influence', influence_fd, 'class'), + ('popularity', popularity_fd, 'class'), + ('impulse', impulse_fd, 'score'), + ('citation_count', cc_fd, 'score') +] + +''' + * Read impact indicator file and return a dataframe with the following schema: + * resultId: String + * indicator_name: Integer +''' +def read_df(fd, indicator_name, column_name): + return spark.read.schema(impact_files_schema)\ + .option('delimiter', '\t')\ + .option('header', False)\ + .csv(fd)\ + .select('resultId', F.col(column_name).alias(indicator_name))\ + .repartition(num_partitions, 'resultId') + +# Print dataframe schema, first 5 rows, and count +def print_df(df): + df.show(50) + df.printSchema() + print(df.count()) + +# Sets a null value to the column if the value is equal to the given value +def set_class_value_to_null(column, value): + return F.when(column != value, column).otherwise(F.lit(None)) + +# load and filter Project-to-Result relations +print("Reading relations") +relations = spark.read.json(relations_fd)\ + .select(F.col('source').alias('projectId'), F.col('target').alias('resultId'), 'relClass', 'dataInfo.deletedbyinference', 'dataInfo.invisible')\ + .where( (F.col('relClass') == 'produces') \ + & (F.col('deletedbyinference') == "false")\ + & (F.col('invisible') == "false"))\ + .drop('deletedbyinference')\ + .drop('invisible')\ + .drop('relClass')\ + .repartition(num_partitions, 'resultId') + +for indicator_name, fd, column_name in impact_indicators: + + print("Reading {} '{}' field from file".format(indicator_name, column_name)) + df = read_df(fd, indicator_name, column_name) + + # sets a zero value to the indicator column if the value is C5 + if (column_name == 'class'): + df = df.withColumn(indicator_name, F.when(F.col(indicator_name).isin("C5"), 0).otherwise(1)) + + # print_df(df) + + print("Joining {} to relations".format(indicator_name)) + + # NOTE: we use inner join because we want to keep only the results that have an impact score + # also note that all impact scores have the same set of results + relations = relations.join(df, 'resultId', 'inner')\ + .repartition(num_partitions, 'resultId') + +# uncomment to print non-null values count for each indicator +# for indicator_name, fd, column_name in impact_indicators: +# print("Counting non null values for {}".format(indicator_name)) +# print(relations.filter(F.col(indicator_name).isNotNull()).count()) + +sum the impact indicator values for each project +relations.groupBy('projectId')\ + .agg(\ + F.sum('influence').alias('influence'),\ + F.sum('popularity').alias('popularity'),\ + F.sum('impulse').alias('impulse'),\ + F.sum('citation_count').alias('citation_count')\ + )\ + .write.mode("overwrite")\ + .option("delimiter", "\t")\ + .csv(output_dir, compression="gzip") \ No newline at end of file diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index d99dc16a2..8cd0b0d5d 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -15,6 +15,8 @@ ${resume eq "map-ids"} ${resume eq "map-scores"} ${resume eq "start"} + ${resume eq "projects-impact"} + @@ -334,7 +336,7 @@ ${nameNode} - + /usr/bin/bash get_ranking_files.sh @@ -558,7 +560,7 @@ - + @@ -592,11 +594,63 @@ --inputPath${bipScorePath} --outputPath${actionSetOutputPath} - + - - + + + + + + + ${jobTracker} + + ${nameNode} + + yarn-cluster + cluster + + + Spark Pagerank + + PageRank.py + + --executor-memory 18G --executor-cores 4 --driver-memory 10G + --master yarn + --deploy-mode cluster + --conf spark.sql.shuffle.partitions=7680 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + + + + ${openaireDataInput}/relations + + + ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']} + ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']} + ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['cc_file']} + ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['impulse_file']} + + + 7680 + + ${projectImpactIndicatorsOutput} + + + ${wfAppPath}/projects_impact.py#projects_impact.py + + + + + + + + + @@ -642,6 +696,10 @@ ActionSet creation failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + + + + Calculating project impact indicators failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + From 23f58a86f177ac7fcbef5b3d5bff28e654299f07 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Tue, 18 Apr 2023 12:26:01 +0300 Subject: [PATCH 5/8] Change jar param in project impact indicators action --- .../dhp/oa/graph/impact_indicators/oozie_app/workflow.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index 8cd0b0d5d..ac44d5c05 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -611,9 +611,9 @@ cluster - Spark Pagerank + Project Impact Indicators - PageRank.py + projects_impact.py --executor-memory 18G --executor-cores 4 --driver-memory 10G --master yarn From ee04cf92bf4030f9be3b4a34703198c3dd5ce424 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Wed, 26 Apr 2023 20:23:46 +0300 Subject: [PATCH 6/8] Add actionsets for project impact indicators --- .../bipfinder/SparkAtomicActionScoreJob.java | 63 +++++++---- .../score/deserializers/BipProjectModel.java | 69 ++++++++++++ .../deserializers/BipResultModel.java} | 8 +- .../PrepareBipFinder.java | 6 +- .../bipfinder/input_actionset_parameter.json | 6 ++ .../SparkAtomicActionScoreJobTest.java | 102 ++++++++++++++---- .../bipfinder/project_bip_scores.json | 4 + ...scores_oid.json => result_bip_scores.json} | 0 .../oozie_app/projects_impact.py | 13 ++- 9 files changed, 218 insertions(+), 53 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/{BipDeserialize.java => score/deserializers/BipResultModel.java} (65%) create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json rename dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/{bip_scores_oid.json => result_bip_scores.json} (100%) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index ddf5f4adf..13ce1440a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipProjectModel; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -24,7 +25,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.bipmodel.BipDeserialize; +import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel; import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; @@ -56,18 +57,17 @@ public class SparkAtomicActionScoreJob implements Serializable { parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String inputPath = parser.get("inputPath"); - log.info("inputPath {}: ", inputPath); + log.info("inputPath: {}", inputPath); final String outputPath = parser.get("outputPath"); - log.info("outputPath {}: ", outputPath); + log.info("outputPath: {}", outputPath); + + final String targetEntity = parser.get("targetEntity"); + log.info("targetEntity: {}", targetEntity); SparkConf conf = new SparkConf(); @@ -76,17 +76,48 @@ public class SparkAtomicActionScoreJob implements Serializable { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareResults(spark, inputPath, outputPath); - }); + + // follow different procedures for different target entities + switch (targetEntity) { + case "result": + prepareResults(spark, inputPath, outputPath); + break; + case "project": + prepareProjects(spark, inputPath, outputPath); + break; + default: + throw new RuntimeException("Unknown target entity: " + targetEntity); + } + } + ); + } + + private static void prepareProjects(SparkSession spark, String inputPath, String outputPath) { + + // read input bip project scores + Dataset projectScores = readPath(spark, inputPath, BipProjectModel.class); + + projectScores.map( (MapFunction) bipProjectScores -> { + Project project = new Project(); + project.setId(bipProjectScores.getProjectId()); + project.setMeasures(bipProjectScores.toMeasures()); + return project; + }, Encoders.bean(Project.class)) + .toJavaRDD() + .map(p -> new AtomicAction(Project.class, p)) + .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + } private static void prepareResults(SparkSession spark, String bipScorePath, String outputPath) { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD bipDeserializeJavaRDD = sc + JavaRDD bipDeserializeJavaRDD = sc .textFile(bipScorePath) - .map(item -> OBJECT_MAPPER.readValue(item, BipDeserialize.class)); + .map(item -> OBJECT_MAPPER.readValue(item, BipResultModel.class)); Dataset bipScores = spark .createDataset(bipDeserializeJavaRDD.flatMap(entry -> entry.keySet().stream().map(key -> { @@ -159,12 +190,4 @@ public class SparkAtomicActionScoreJob implements Serializable { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } - public static Dataset readPath( - SparkSession spark, String inputPath, Class clazz) { - return spark - .read() - .textFile(inputPath) - .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); - } - } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java new file mode 100644 index 000000000..77c1567a8 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java @@ -0,0 +1,69 @@ +package eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers; + +import com.opencsv.bean.CsvBindByPosition; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import eu.dnetlib.dhp.schema.oaf.Measure; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static eu.dnetlib.dhp.actionmanager.Constants.*; + +@NoArgsConstructor +@AllArgsConstructor +@Getter +@Setter +public class BipProjectModel { + String projectId; + + String numOfInfluentialResults; + + String numOfPopularResults; + + String totalImpulse; + + String totalCitationCount; + + // each project bip measure has exactly one value, hence one key-value pair + private Measure createMeasure(String measureId, String measureValue) { + + KeyValue kv = new KeyValue(); + kv.setKey("score"); + kv.setValue(measureValue); + kv.setDataInfo( + OafMapperUtils.dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils.qualifier( + UPDATE_MEASURE_BIP_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "") + ); + + Measure measure = new Measure(); + measure.setId(measureId); + measure.setUnit(Collections.singletonList(kv)); + return measure; + } + public List toMeasures() { + return Arrays.asList( + createMeasure("numOfInfluentialResults", numOfInfluentialResults), + createMeasure("numOfPopularResults", numOfPopularResults), + createMeasure("totalImpulse", totalImpulse), + createMeasure("totalCitationCount", totalCitationCount) + ); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/BipDeserialize.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java similarity index 65% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/BipDeserialize.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java index a70bca618..06a173413 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/BipDeserialize.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java @@ -1,5 +1,7 @@ -package eu.dnetlib.dhp.actionmanager.bipmodel; +package eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers; + +import eu.dnetlib.dhp.actionmanager.bipmodel.Score; import java.io.Serializable; import java.util.ArrayList; @@ -11,9 +13,9 @@ import java.util.List; * Only needed for deserialization purposes */ -public class BipDeserialize extends HashMap> implements Serializable { +public class BipResultModel extends HashMap> implements Serializable { - public BipDeserialize() { + public BipResultModel() { super(); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java index 80573c71a..efcb96a85 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java @@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.bipmodel.BipDeserialize; +import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel; import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; @@ -82,9 +82,9 @@ public class PrepareBipFinder implements Serializable { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD bipDeserializeJavaRDD = sc + JavaRDD bipDeserializeJavaRDD = sc .textFile(inputPath) - .map(item -> OBJECT_MAPPER.readValue(item, BipDeserialize.class)); + .map(item -> OBJECT_MAPPER.readValue(item, BipResultModel.class)); spark .createDataset(bipDeserializeJavaRDD.flatMap(entry -> entry.keySet().stream().map(key -> { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json index 7663a454b..d6b93c5af 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json @@ -16,5 +16,11 @@ "paramLongName": "outputPath", "paramDescription": "the path of the new ActionSet", "paramRequired": true + }, + { + "paramName": "te", + "paramLongName": "targetEntity", + "paramDescription": "the type of target entity to be enriched; currently supported one of { 'result', 'project' }", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java index be82b9fc3..aa5a19f11 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java @@ -6,8 +6,9 @@ import static org.junit.jupiter.api.Assertions.*; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.List; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Project; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -27,7 +28,6 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Result; public class SparkAtomicActionScoreJobTest { @@ -37,8 +37,11 @@ public class SparkAtomicActionScoreJobTest { private static SparkSession spark; private static Path workingDir; - private static final Logger log = LoggerFactory - .getLogger(SparkAtomicActionScoreJobTest.class); + + private final static String RESULT = "result"; + private final static String PROJECT = "project"; + + private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJobTest.class); @BeforeAll public static void beforeAll() throws IOException { @@ -69,29 +72,31 @@ public class SparkAtomicActionScoreJobTest { spark.stop(); } + private void runJob(String inputPath, String outputPath, String targetEntity) throws Exception { + SparkAtomicActionScoreJob.main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-inputPath", inputPath, + "-outputPath", outputPath, + "-targetEntity", targetEntity, + } + ); + } @Test - void testMatch() throws Exception { - String bipScoresPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores_oid.json") + void testResultScores() throws Exception { + final String targetEntity = RESULT; + String inputResultScores = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/result_bip_scores.json") .getPath(); + String outputPath = workingDir.toString() + "/" + targetEntity + "/actionSet"; - SparkAtomicActionScoreJob - .main( - new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-inputPath", - - bipScoresPath, - - "-outputPath", - workingDir.toString() + "/actionSet" - }); + // execute the job to generate the action sets for result scores + runJob(inputResultScores, outputPath, targetEntity); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .sequenceFile(outputPath, Text.class, Text.class) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(aa -> ((Result) aa.getPayload())); @@ -140,4 +145,61 @@ public class SparkAtomicActionScoreJobTest { } + @Test + void testProjectScores() throws Exception { + String targetEntity = PROJECT; + String inputResultScores = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json") + .getPath(); + String outputPath = workingDir.toString() + "/" + targetEntity + "/actionSet"; + + // execute the job to generate the action sets for project scores + runJob(inputResultScores, outputPath, PROJECT); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD projects = sc + .sequenceFile(outputPath, Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Project) aa.getPayload())); + + // test the number of projects + assertEquals(4, projects.count()); + + String testProjectId = "40|nih_________::c02a8233e9b60f05bb418f0c9b714833"; + + // count that the project with id testProjectId is present + assertEquals(1, projects.filter(row -> row.getId().equals(testProjectId)).count()); + + projects.filter(row -> row.getId().equals(testProjectId)) + .flatMap(r -> r.getMeasures().iterator()) + .foreach(m -> { + log.info(m.getId() + " " + m.getUnit()); + + // ensure that only one score is present for each bip impact measure + assertEquals(1, m.getUnit().size()); + + KeyValue kv = m.getUnit().get(0); + + // ensure that the correct key is provided, i.e. score + assertEquals("score", kv.getKey()); + + switch(m.getId()) { + case "numOfInfluentialResults": + assertEquals("0", kv.getValue()); + break; + case "numOfPopularResults": + assertEquals("1", kv.getValue()); + break; + case "totalImpulse": + assertEquals("25", kv.getValue()); + break; + case "totalCitationCount": + assertEquals("43", kv.getValue()); + break; + default: + fail("Unknown measure id in the context of projects"); + } + }); + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json new file mode 100644 index 000000000..096268287 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json @@ -0,0 +1,4 @@ +{"projectId":"40|nsf_________::d93e50d22374a1cf59f6a232413ea027","numOfInfluentialResults":0,"numOfPopularResults":10,"totalImpulse":181,"totalCitationCount":235} +{"projectId":"40|nih_________::1c93debc7085e440f245fbe70b2e8b21","numOfInfluentialResults":14,"numOfPopularResults":17,"totalImpulse":1558,"totalCitationCount":4226} +{"projectId":"40|nih_________::c02a8233e9b60f05bb418f0c9b714833","numOfInfluentialResults":0,"numOfPopularResults":1,"totalImpulse":25,"totalCitationCount":43} +{"projectId":"40|corda_______::d91dcf3a87dd7f72248fab0b8a4ba273","numOfInfluentialResults":2,"numOfPopularResults":3,"totalImpulse":78,"totalCitationCount":178} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores_oid.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/result_bip_scores.json similarity index 100% rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores_oid.json rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/result_bip_scores.json diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py index f01c92a0d..d60f86e88 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py @@ -96,14 +96,13 @@ for indicator_name, fd, column_name in impact_indicators: # print("Counting non null values for {}".format(indicator_name)) # print(relations.filter(F.col(indicator_name).isNotNull()).count()) -sum the impact indicator values for each project +# sum the impact indicator values for each project relations.groupBy('projectId')\ .agg(\ - F.sum('influence').alias('influence'),\ - F.sum('popularity').alias('popularity'),\ - F.sum('impulse').alias('impulse'),\ - F.sum('citation_count').alias('citation_count')\ + F.sum('influence').alias('numOfInfluentialResults'),\ + F.sum('popularity').alias('numOfPopularResults'),\ + F.sum('impulse').alias('totalImpulse'),\ + F.sum('citation_count').alias('totalCitationCount')\ )\ .write.mode("overwrite")\ - .option("delimiter", "\t")\ - .csv(output_dir, compression="gzip") \ No newline at end of file + .json(output_dir, compression="gzip") \ No newline at end of file From 815a4ddbbaf6fa68a23d576189db2ee03f97f828 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Wed, 26 Apr 2023 20:40:06 +0300 Subject: [PATCH 7/8] Add actionset creation for project bip indicators in workflow --- .../bipfinder/SparkAtomicActionScoreJob.java | 7 +- .../impact_indicators/oozie_app/workflow.xml | 81 +++++++++++-------- 2 files changed, 53 insertions(+), 35 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index 13ce1440a..8b8e05723 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -41,7 +41,8 @@ import scala.Tuple2; */ public class SparkAtomicActionScoreJob implements Serializable { - private static final String DOI = "doi"; + private static final String RESULT = "result"; + private static final String PROJECT = "project"; private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJob.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -79,10 +80,10 @@ public class SparkAtomicActionScoreJob implements Serializable { // follow different procedures for different target entities switch (targetEntity) { - case "result": + case RESULT: prepareResults(spark, inputPath, outputPath); break; - case "project": + case PROJECT: prepareProjects(spark, inputPath, outputPath); break; default: diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index ac44d5c05..c77443bd9 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -34,7 +34,6 @@ - yarn-cluster @@ -90,9 +89,8 @@ ${nameNode} - - - yarn-cluster + + yarn-cluster cluster @@ -131,7 +129,6 @@ ${jobTracker} ${nameNode} - yarn-cluster @@ -181,9 +178,8 @@ ${nameNode} - - - yarn-cluster + + yarn-cluster cluster @@ -235,7 +231,7 @@ - + yarn-cluster @@ -336,12 +332,12 @@ ${nameNode} - - /usr/bin/bash - - get_ranking_files.sh - - /${workflowDataDir} + + /usr/bin/bash + + get_ranking_files.sh + + /${workflowDataDir} ${wfAppPath}/get_ranking_files.sh#get_ranking_files.sh @@ -374,8 +370,8 @@ ${nameNode} - - yarn-cluster + + yarn-cluster cluster @@ -422,8 +418,8 @@ ${nameNode} - - yarn-cluster + + yarn-cluster cluster @@ -476,7 +472,6 @@ - yarn-cluster @@ -520,7 +515,6 @@ ${nameNode} - yarn-cluster cluster @@ -564,17 +558,12 @@ - - + - - + yarn cluster @@ -593,12 +582,12 @@ --inputPath${bipScorePath} --outputPath${actionSetOutputPath} - + --targetEntityresult + - @@ -645,13 +634,38 @@ - + + + + yarn + cluster + Produces the atomic action with the bip finder scores for projects + eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --inputPath${projectImpactIndicatorsOutput} + --outputPath${actionSetOutputPath} + --targetEntityproject + + + + + @@ -695,11 +709,14 @@ - ActionSet creation failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + ActionSet creation for results failed, error message[${wf:errorMessage(wf:lastErrorNode())}] Calculating project impact indicators failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + ActionSet creation for projects failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + From 614cc1089b975f8dc05df4f671029b5bdaa31d44 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Thu, 27 Apr 2023 12:37:15 +0300 Subject: [PATCH 8/8] Add separate forder for results && project actionsets --- .../graph/impact_indicators/oozie_app/workflow.xml | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index c77443bd9..5f67bb914 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -556,9 +556,12 @@ - - - + + + + + + @@ -581,7 +584,7 @@ --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --inputPath${bipScorePath} - --outputPath${actionSetOutputPath} + --outputPath${actionSetOutputPath}/results/ --targetEntityresult @@ -659,7 +662,7 @@ --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --inputPath${projectImpactIndicatorsOutput} - --outputPath${actionSetOutputPath} + --outputPath${actionSetOutputPath}/projects/ --targetEntityproject