diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties index a902c413f..f9f5519cc 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties @@ -90,3 +90,6 @@ oozie.wf.application.path=${wfAppPath} # Path where the final output should be? actionSetOutputPath=${workflowDataDir}/bip_actionsets/ +# The directory to store project impact indicators +projectImpactIndicatorsOutput=${workflowDataDir}/project_indicators + diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py new file mode 100644 index 000000000..f01c92a0d --- /dev/null +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py @@ -0,0 +1,109 @@ +import sys +from pyspark.sql import SparkSession +from pyspark import SparkConf, SparkContext +import pyspark.sql.functions as F +from pyspark.sql.types import StringType, IntegerType, StructType, StructField + +if len(sys.argv) < 8: + print("Usage: projects_impact.py ") + sys.exit(-1) + +appName = 'Project Impact Indicators' +conf = SparkConf().setAppName(appName) +sc = SparkContext(conf = conf) +spark = SparkSession.builder.appName(appName).getOrCreate() +sc.setLogLevel('OFF') + +# input parameters +relations_fd = sys.argv[1] +influence_fd = sys.argv[2] +popularity_fd = sys.argv[3] +cc_fd = sys.argv[4] +impulse_fd = sys.argv[5] +num_partitions = int(sys.argv[6]) +output_dir = sys.argv[7] + +# schema for impact indicator files +impact_files_schema = StructType([ + StructField('resultId', StringType(), False), + StructField('score', IntegerType(), False), + StructField('class', StringType(), False), +]) + +# list of impact indicators +impact_indicators = [ + ('influence', influence_fd, 'class'), + ('popularity', popularity_fd, 'class'), + ('impulse', impulse_fd, 'score'), + ('citation_count', cc_fd, 'score') +] + +''' + * Read impact indicator file and return a dataframe with the following schema: + * resultId: String + * indicator_name: Integer +''' +def read_df(fd, indicator_name, column_name): + return spark.read.schema(impact_files_schema)\ + .option('delimiter', '\t')\ + .option('header', False)\ + .csv(fd)\ + .select('resultId', F.col(column_name).alias(indicator_name))\ + .repartition(num_partitions, 'resultId') + +# Print dataframe schema, first 5 rows, and count +def print_df(df): + df.show(50) + df.printSchema() + print(df.count()) + +# Sets a null value to the column if the value is equal to the given value +def set_class_value_to_null(column, value): + return F.when(column != value, column).otherwise(F.lit(None)) + +# load and filter Project-to-Result relations +print("Reading relations") +relations = spark.read.json(relations_fd)\ + .select(F.col('source').alias('projectId'), F.col('target').alias('resultId'), 'relClass', 'dataInfo.deletedbyinference', 'dataInfo.invisible')\ + .where( (F.col('relClass') == 'produces') \ + & (F.col('deletedbyinference') == "false")\ + & (F.col('invisible') == "false"))\ + .drop('deletedbyinference')\ + .drop('invisible')\ + .drop('relClass')\ + .repartition(num_partitions, 'resultId') + +for indicator_name, fd, column_name in impact_indicators: + + print("Reading {} '{}' field from file".format(indicator_name, column_name)) + df = read_df(fd, indicator_name, column_name) + + # sets a zero value to the indicator column if the value is C5 + if (column_name == 'class'): + df = df.withColumn(indicator_name, F.when(F.col(indicator_name).isin("C5"), 0).otherwise(1)) + + # print_df(df) + + print("Joining {} to relations".format(indicator_name)) + + # NOTE: we use inner join because we want to keep only the results that have an impact score + # also note that all impact scores have the same set of results + relations = relations.join(df, 'resultId', 'inner')\ + .repartition(num_partitions, 'resultId') + +# uncomment to print non-null values count for each indicator +# for indicator_name, fd, column_name in impact_indicators: +# print("Counting non null values for {}".format(indicator_name)) +# print(relations.filter(F.col(indicator_name).isNotNull()).count()) + +sum the impact indicator values for each project +relations.groupBy('projectId')\ + .agg(\ + F.sum('influence').alias('influence'),\ + F.sum('popularity').alias('popularity'),\ + F.sum('impulse').alias('impulse'),\ + F.sum('citation_count').alias('citation_count')\ + )\ + .write.mode("overwrite")\ + .option("delimiter", "\t")\ + .csv(output_dir, compression="gzip") \ No newline at end of file diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index d99dc16a2..8cd0b0d5d 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -15,6 +15,8 @@ ${resume eq "map-ids"} ${resume eq "map-scores"} ${resume eq "start"} + ${resume eq "projects-impact"} + @@ -334,7 +336,7 @@ ${nameNode} - + /usr/bin/bash get_ranking_files.sh @@ -558,7 +560,7 @@ - + @@ -592,11 +594,63 @@ --inputPath${bipScorePath} --outputPath${actionSetOutputPath} - + - - + + + + + + + ${jobTracker} + + ${nameNode} + + yarn-cluster + cluster + + + Spark Pagerank + + PageRank.py + + --executor-memory 18G --executor-cores 4 --driver-memory 10G + --master yarn + --deploy-mode cluster + --conf spark.sql.shuffle.partitions=7680 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + + + + ${openaireDataInput}/relations + + + ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']} + ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']} + ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['cc_file']} + ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['impulse_file']} + + + 7680 + + ${projectImpactIndicatorsOutput} + + + ${wfAppPath}/projects_impact.py#projects_impact.py + + + + + + + + + @@ -642,6 +696,10 @@ ActionSet creation failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + + + + Calculating project impact indicators failed, error message[${wf:errorMessage(wf:lastErrorNode())}] +