Add script for aggregating impact indicators at the project level

This commit is contained in:
Serafeim Chatzopoulos 2023-04-07 16:30:12 +03:00
parent 102aa5ab81
commit 7256c8d3c7
3 changed files with 176 additions and 6 deletions

View File

@ -90,3 +90,6 @@ oozie.wf.application.path=${wfAppPath}
# Path where the final output should be?
actionSetOutputPath=${workflowDataDir}/bip_actionsets/
# The directory to store project impact indicators
projectImpactIndicatorsOutput=${workflowDataDir}/project_indicators

View File

@ -0,0 +1,109 @@
import sys
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
if len(sys.argv) < 8:
print("Usage: projects_impact.py <relations_folder> <influence_file> <popularity_file> <cc_file> <impulse_file> <num_partitions> <output_dir>")
sys.exit(-1)
appName = 'Project Impact Indicators'
conf = SparkConf().setAppName(appName)
sc = SparkContext(conf = conf)
spark = SparkSession.builder.appName(appName).getOrCreate()
sc.setLogLevel('OFF')
# input parameters
relations_fd = sys.argv[1]
influence_fd = sys.argv[2]
popularity_fd = sys.argv[3]
cc_fd = sys.argv[4]
impulse_fd = sys.argv[5]
num_partitions = int(sys.argv[6])
output_dir = sys.argv[7]
# schema for impact indicator files
impact_files_schema = StructType([
StructField('resultId', StringType(), False),
StructField('score', IntegerType(), False),
StructField('class', StringType(), False),
])
# list of impact indicators
impact_indicators = [
('influence', influence_fd, 'class'),
('popularity', popularity_fd, 'class'),
('impulse', impulse_fd, 'score'),
('citation_count', cc_fd, 'score')
]
'''
* Read impact indicator file and return a dataframe with the following schema:
* resultId: String
* indicator_name: Integer
'''
def read_df(fd, indicator_name, column_name):
return spark.read.schema(impact_files_schema)\
.option('delimiter', '\t')\
.option('header', False)\
.csv(fd)\
.select('resultId', F.col(column_name).alias(indicator_name))\
.repartition(num_partitions, 'resultId')
# Print dataframe schema, first 5 rows, and count
def print_df(df):
df.show(50)
df.printSchema()
print(df.count())
# Sets a null value to the column if the value is equal to the given value
def set_class_value_to_null(column, value):
return F.when(column != value, column).otherwise(F.lit(None))
# load and filter Project-to-Result relations
print("Reading relations")
relations = spark.read.json(relations_fd)\
.select(F.col('source').alias('projectId'), F.col('target').alias('resultId'), 'relClass', 'dataInfo.deletedbyinference', 'dataInfo.invisible')\
.where( (F.col('relClass') == 'produces') \
& (F.col('deletedbyinference') == "false")\
& (F.col('invisible') == "false"))\
.drop('deletedbyinference')\
.drop('invisible')\
.drop('relClass')\
.repartition(num_partitions, 'resultId')
for indicator_name, fd, column_name in impact_indicators:
print("Reading {} '{}' field from file".format(indicator_name, column_name))
df = read_df(fd, indicator_name, column_name)
# sets a zero value to the indicator column if the value is C5
if (column_name == 'class'):
df = df.withColumn(indicator_name, F.when(F.col(indicator_name).isin("C5"), 0).otherwise(1))
# print_df(df)
print("Joining {} to relations".format(indicator_name))
# NOTE: we use inner join because we want to keep only the results that have an impact score
# also note that all impact scores have the same set of results
relations = relations.join(df, 'resultId', 'inner')\
.repartition(num_partitions, 'resultId')
# uncomment to print non-null values count for each indicator
# for indicator_name, fd, column_name in impact_indicators:
# print("Counting non null values for {}".format(indicator_name))
# print(relations.filter(F.col(indicator_name).isNotNull()).count())
sum the impact indicator values for each project
relations.groupBy('projectId')\
.agg(\
F.sum('influence').alias('influence'),\
F.sum('popularity').alias('popularity'),\
F.sum('impulse').alias('impulse'),\
F.sum('citation_count').alias('citation_count')\
)\
.write.mode("overwrite")\
.option("delimiter", "\t")\
.csv(output_dir, compression="gzip")

View File

@ -15,6 +15,8 @@
<case to="map-openaire-to-doi">${resume eq "map-ids"}</case>
<case to="map-scores-to-dois">${resume eq "map-scores"}</case>
<case to="create-openaire-ranking-graph">${resume eq "start"}</case>
<case to="project-impact-indicators">${resume eq "projects-impact"}</case>
<!-- TODO: add action set creation here -->
<default to="create-openaire-ranking-graph" />
</switch>
@ -334,7 +336,7 @@
<!-- This should give the machine/root of the hdfs -->
<name-node>${nameNode}</name-node>
<!-- Exec is needed foor shell comands - points to type of shell command -->
<!-- Exec is needed for shell commands - points to type of shell command -->
<exec>/usr/bin/bash</exec>
<!-- name of script to run -->
<argument>get_ranking_files.sh</argument>
@ -558,7 +560,7 @@
</action>
<action name="deleteOutputPathForActionSet">
<action name="deleteOutputPathForActionSet">
<fs>
<delete path="${actionSetOutputPath}"/>
<mkdir path="${actionSetOutputPath}"/>
@ -592,11 +594,63 @@
<arg>--inputPath</arg><arg>${bipScorePath}</arg>
<arg>--outputPath</arg><arg>${actionSetOutputPath}</arg>
</spark>
<ok to="end"/>
<ok to="project-impact-indicators"/>
<error to="actionset-creation-fail"/>
</action>
<!-- PAGERANK here -->
<action name="project-impact-indicators">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
<mode>cluster</mode>
<!-- This is the name of our job -->
<name>Spark Pagerank</name>
<!-- Script name goes here -->
<jar>PageRank.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 18G --executor-cores 4 --driver-memory 10G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
<!-- Script arguments here -->
<!-- graph data folder from which to read relations -->
<arg>${openaireDataInput}/relations</arg>
<!-- input files with impact indicators for results -->
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['cc_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['impulse_file']}</arg>
<!-- number of partitions to be used on joins -->
<arg>7680</arg>
<arg>${projectImpactIndicatorsOutput}</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/projects_impact.py#projects_impact.py</file>
</spark>
<!-- Do this after finishing okay -->
<ok to="end" />
<!-- Go there if we have an error -->
<error to="project-impact-indicators-fail" />
</action>
<!-- TODO: end the workflow-->
@ -642,6 +696,10 @@
<kill name="actionset-creation-fail">
<message>ActionSet creation failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
</kill>
<kill name="project-impact-indicators-fail">
<message>Calculating project impact indicators failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
</workflow-app>