diff --git a/dhp-workflows/dhp-impact-indicators/README.md b/dhp-workflows/dhp-impact-indicators/README.md
new file mode 100644
index 0000000000..14f489da33
--- /dev/null
+++ b/dhp-workflows/dhp-impact-indicators/README.md
@@ -0,0 +1,23 @@
+# Ranking Workflow for Openaire Publications
+
+This project contains the files for running a paper ranking workflow on the openaire graph using apache oozie.
+All scripts are written in python and the project setup follows the typical oozie workflow structure:
+
+- a workflow.xml file containing the workflow specification
+- a job.properties file specifying parameter values for the parameters used by the workflow
+- a set of python scripts used by the workflow
+
+**NOTE**: the workflow depends on the external library of ranking scripts called BiP! Ranker.
+You can check out a specific tag/release of BIP! Ranker using maven, as described in the following section.
+
+## Check out a specific tag/release of BIP-Ranker
+
+* Edit the `scmVersion` of the maven-scm-plugin in the pom.xml to point to the tag/release version you want to check out.
+
+* Then, use maven to perform the checkout:
+
+```
+mvn scm:checkout
+```
+
+* The code should be visible under `src/main/bip-ranker` folder.
\ No newline at end of file
diff --git a/dhp-workflows/dhp-impact-indicators/README.txt b/dhp-workflows/dhp-impact-indicators/README.txt
deleted file mode 100644
index 788534c02f..0000000000
--- a/dhp-workflows/dhp-impact-indicators/README.txt
+++ /dev/null
@@ -1,13 +0,0 @@
-
-
-## Checkout a specific release of the BIP-Ranker git repository
-
-* Edit the `scmVersion` of the maven-scm-plugin in the pom.xml to point to the tag/release version you want to check out.
-
-* Then perform the checkout with:
-
-```
-mvn scm:checkout
-```
-
-* The code should be visible under `src/main/bip-ranker` folder.
\ No newline at end of file
diff --git a/dhp-workflows/dhp-impact-indicators/pom.xml b/dhp-workflows/dhp-impact-indicators/pom.xml
index b827f42a48..b510635a6f 100644
--- a/dhp-workflows/dhp-impact-indicators/pom.xml
+++ b/dhp-workflows/dhp-impact-indicators/pom.xml
@@ -20,7 +20,7 @@
https://github.com/athenarc/Bip-Ranker
- https://github.com/athenarc/Bip-Ranker.git
+ scm:git:https://github.com/athenarc/Bip-Ranker.git
@@ -31,8 +31,8 @@
1.8.1
connection
- 2
- tag
+ tag
+ v1.0.0
${project.build.directory}/../src/main/bip-ranker
diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/create_openaire_ranking_graph.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/create_openaire_ranking_graph.py
new file mode 100644
index 0000000000..4cffa86a3e
--- /dev/null
+++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/create_openaire_ranking_graph.py
@@ -0,0 +1,234 @@
+#!/usr/bin/python3
+
+# Create openaire id - openaire id graph from openaire data
+
+#############################################################################################################
+# Program proceeds as follows:
+# 1. We read the input folder provided from hdfs.
+# This contains subfolders with openaire graph objects and openaire graph relations
+# 2. We select all openaire graph objects of interest. We filter out based on visibility
+# and inference criteria. We also filter out based on the availability of publication year
+# 3. Get reference type dataframes from openaire. Then filter each one of them based on the
+# existence of citing and cited in the above filtered dataset. Get only citations
+# produced by publication objects, or otherresearchproducts of types:
+# [TBD]
+# 4. Get objects that don't appear in the relations (from those gathered in step 1) and add
+# them to the graph
+# 5. Group relations by citing paper and do graph-specific formatting
+#############################################################################################################
+# ---------- Imports ------------- #
+import sys
+# import pyspark
+# from pyspark import SparkConf, SparkContext
+from pyspark.sql import SparkSession
+# Functions to effectively handle data
+# manipulation for DataFrames
+import pyspark.sql.functions as F
+# Diagnostics
+from timeit import default_timer as timer
+# from datetime import timedelta, datetime
+# -------------------------------- #
+
+if len(sys.argv) < 5:
+ print ("Usage: ./create_openaire_ranking_graph.py ")
+ sys.exit(0)
+
+# Inputs will be:
+
+# 1. Folder where openaire graph is stored
+graph_folder = sys.argv[1]
+# 2. Current year (this will be needed for filtering)
+current_year = int(sys.argv[2])
+# 3. Number of partitions
+num_partitions = int(sys.argv[3])
+# 4. where to write output
+output_folder = sys.argv[4]
+
+# Lists of results types we want to inclued in the citations
+# valid_result_types = ['publication', 'other']
+valid_result_types = ['publication']
+# list of types in otherresearchproduct which are considered valid for citations
+valid_other = ['']
+
+# Create the spark session
+spark = SparkSession.builder.appName('oa ranking graph creation').getOrCreate()
+# Set context level logging to WARN
+spark.sparkContext.setLogLevel("WARN")
+
+############################################################################################################################
+# 1. Get the research objects and filter based on conditions.
+# These will also be the unique identifiers we should find in the final graph
+
+# Initialize an empty dataframe
+oa_objects_df = None
+
+# There is a directory structure on hdfs under the provided path.
+# We need to parse data from the folders: ["publication", "dataset", "software", "otherresearchproduct"]
+# which are rankable oa result objects.
+
+# Loop subfolders
+for sub_folder in ["publication", "dataset", "software", "otherresearchproduct"]:
+ # Read the json data of the graph into a dataframe initially
+ if not oa_objects_df:
+ oa_objects_df = spark.read.json(graph_folder + "/" + sub_folder).select('id', 'resulttype.classname', 'datainfo.deletedbyinference', 'datainfo.invisible', F.year('dateofacceptance.value').alias('year'))
+ oa_objects_df = oa_objects_df.where( 'datainfo.deletedbyinference = false' ).where( 'datainfo.invisible = false' ).repartition(num_partitions, 'id').cache()
+ # If we already have data, simply add more to it
+ else:
+ sub_df = spark.read.json(graph_folder + "/" + sub_folder).select('id', 'resulttype.classname','datainfo.deletedbyinference', 'datainfo.invisible', F.year('dateofacceptance.value').alias('year'))
+ sub_df = sub_df.where( 'datainfo.deletedbyinference = false ' ).where( 'datainfo.invisible = false ').cache()
+ # Add the data to the openaire objects dataframe
+ oa_objects_df = oa_objects_df.union(sub_df).repartition(num_partitions, 'id').cache()
+ # Clear memory
+ sub_df.unpersist(True)
+
+# Remove those records without year
+oa_objects_df = oa_objects_df.where(F.col('year').isNotNull())
+
+
+# Now replace years where > (current_year+1) with 0
+oa_objects_df = oa_objects_df.withColumn('clean_year', F.when(F.col('year').cast('int') > (current_year+1), 0).otherwise(F.col('year')))\
+ .drop('year').withColumnRenamed('clean_year', 'year').repartition(num_partitions, 'id')
+
+# -------------------------------------------------------------------- #
+'''
+# Some diagnostics
+print ("Min and max years:" )
+oa_objects_df.select(F.max('year')).show()
+oa_objects_df.select(F.min('year')).show()
+
+# This should be slow due to not repartitioning by year
+print ("Distinct years:")
+oa_objects_df.select('year').distinct().sort(F.col('year')).show(5000, False)
+
+# Show distinct values of deletedbyinference and invisible to ensure we have the correct data
+print ("Distinct deleted by inference:")
+oa_objects_df.select('deletedbyinference').distinct().show()
+print ("Distinct invisible values:")
+oa_objects_df.select('invisible').distinct().show()
+
+# Output total count
+print ("Total num of research objects: " + str(oa_objects_df.count()))
+'''
+# -------------------------------------------------------------------- #
+
+# Keep only required fields - we still keep resulttype.classname to
+# filter the citation relationships we consider valid
+oa_objects_df = oa_objects_df.drop('deletedbyinference').drop('invisible').distinct().cache()
+############################################################################################################################
+# 2. Get the relation objects and filter them based on their existence in the oa_objects_df
+# NOTE: we are only interested in citations of type "cites"
+# Further, we
+
+# Deprecated line
+# references_df = spark.read.json(graph_folder + "/relation").select(F.col('source').alias('citing'), F.col('target').alias('cited'), 'relClass')\
+# .where( 'relClass = "References"' ).repartition(num_partitions, 'citing').drop('relClass')
+# print ("References df has: " + str(references_df.count()) + " entries")
+
+# Collect only valid citations i.e., invisible = false & deletedbyinference=false
+cites_df = spark.read.json(graph_folder + "/relation")\
+ .select(F.col('source').alias('citing'), F.col('target').alias('cited'), 'relClass', 'dataInfo.deletedbyinference', 'dataInfo.invisible')\
+ .where( (F.col('relClass') == "Cites") \
+ & (F.col('dataInfo.deletedbyinference') == "false")\
+ & (F.col('dataInfo.invisible') == "false"))\
+ .drop('dataInfo.deletedbyinference').drop('dataInfo.invisible')\
+ .repartition(num_partitions, 'citing').drop('relClass')
+# print ("Cited df has: " + str(cites_df.count()) + " entries")
+
+# DEPRECATED
+# cited_by_df = spark.read.json(graph_folder + "/relation").select(F.col('target').alias('citing'), F.col('source').alias('cited'), 'relClass')\
+# .where( 'relClass = "IsCitedBy"' ).repartition(num_partitions, 'citing').drop('relClass')
+# print ("Cited by df has: " + str(cited_by_df.count()) + " entries")
+
+# DEPRECATED
+# Keep only relations where citing and cited are in the oa_objects_df
+# references_df = references_df.join(oa_objects_df.select('id'), references_df.citing == oa_objects_df.id).drop('id')
+# references_df = references_df.repartition(num_partitions, 'cited').join(oa_objects_df.select('id'), references_df.cited == oa_objects_df.id).drop('id').distinct().repartition(num_partitions, 'citing').cache()
+# print ("References df now has: " + str(references_df.count()) + " entries")
+
+cites_df = cites_df.join(oa_objects_df.select('id'), cites_df.citing == oa_objects_df.id).where( F.col('resulttype.classname').isin(valid_result_types) ).drop('id').drop('resulttype.classname')
+cites_df = cites_df.repartition(num_partitions, 'cited').join(oa_objects_df.select('id'), cites_df.cited == oa_objects_df.id).drop('id').drop('resulttype.classname').distinct().repartition(num_partitions, 'citing').cache()
+# TODO: add here a clause filtering out the citations
+# originating from "other" types of research objects which we consider valid
+
+# print ("Cites df now has: " + str(cites_df.count()) + " entries")
+
+# DEPRECATED
+# cited_by_df = cited_by_df.join(oa_objects_df.select('id'), cited_by_df.citing == oa_objects_df.id).drop('id')
+# cited_by_df = cited_by_df.repartition(num_partitions, 'cited').join(oa_objects_df.select('id'), cited_by_df.cited == oa_objects_df.id).drop('id').distinct().repartition(num_partitions, 'citing').cache()
+# print ("Cited BY df now has: " + str(cited_by_df.count()) + " entries")
+
+# DEPRECATED
+# Join all the above into a single set
+# citations_df = references_df.union(cites_df).distinct().repartition(num_partitions, 'citing').cache()
+# Free space
+# references_df.unpersist(True)
+# cites_df.unpersist(True)
+
+# citations_df = citations_df.union(cited_by_df).distinct().repartition(num_partitions, 'citing').cache()
+
+# ALL citations we keep are in the cited_df dataframe
+citations_df = cites_df
+
+'''
+# Show schema
+print ("Citation schema:")
+citations_df.printSchema()
+print ("Objects schema:")
+oa_objects_df.printSchema()
+'''
+
+# Free space
+# cited_by_df.unpersist(True)
+
+# Show total num of unique citations
+num_unique_citations = citations_df.count()
+print ("Total unique citations: " + str(num_unique_citations))
+############################################################################################################################
+# 3. Get any potentially missing 'citing' papers from references (these are dangling nodes w/o any outgoing references)
+dangling_nodes = oa_objects_df.join(citations_df.select('citing').distinct(), citations_df.citing == oa_objects_df.id, 'left_anti')\
+ .select(F.col('id').alias('citing')).withColumn('cited', F.array([F.lit("0")])).repartition(num_partitions, 'citing')
+# Count dangling nodes
+dangling_num = dangling_nodes.count()
+print ("Number of dangling nodes: " + str(dangling_num))
+# print ("Dangling nodes sample:")
+# dangling_nodes.show(10, False)
+############################################################################################################################
+# 4. Group the citation dataframe by citing doi, and create the cited dois list. Add dangling nodes to the result
+graph = citations_df.groupBy('citing').agg(F.collect_set('cited').alias('cited')).repartition(num_partitions, 'citing').cache()
+# Free space
+citations_df.unpersist(True)
+
+num_nodes = graph.count()
+print ("Entries in graph before dangling nodes:" + str(num_nodes))
+# print ("Sample in graph: ")
+# graph.show(10, False)
+
+# Add dangling nodes
+graph = graph.union(dangling_nodes).repartition(num_partitions, 'citing')
+# Count current number of results
+num_nodes = graph.count()
+print ("Num entries after adding dangling nodes: " + str(num_nodes))
+
+# Add publication year
+graph = graph.join(oa_objects_df, graph.citing == oa_objects_df.id).select('citing', 'cited', 'year').cache()
+num_nodes_final = graph.count()
+print ("After adding year: " + str(num_nodes_final))
+# print ("Graph sample:")
+# graph.show(20, False)
+# Calculate initial score of nodes (1/N)
+initial_score = float(1)/float(num_nodes_final)
+############################################################################################################################
+# 5. Write graph to output file!
+print("Writing output to: " + output_folder)
+
+graph.select('citing', F.concat_ws("|", F.concat_ws(",",'cited'), F.when(F.col('cited').getItem(1) != "0", F.size('cited')).otherwise(F.lit("0")), F.lit(str(initial_score)) ).alias('cited'), 'year').withColumn('prev_pr', F.lit("0")).select('citing', 'cited', 'prev_pr', 'year')\
+ .write.mode("overwrite").option("delimiter","\t").csv(output_folder, compression="gzip")
+
+if num_nodes_final != num_nodes:
+ print ("WARNING: the number of nodes after keeping only nodes where year is available went from: " + str(num_nodes) + " to " + str(num_nodes_final) + "\n")
+ print ("Check for any mistakes...")
+
+############################################################################################################################
+print ("\nDONE!\n\n")
+# Wrap up
+spark.stop()
diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/format_ranking_results.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/format_ranking_results.py
new file mode 100644
index 0000000000..60c71e52fe
--- /dev/null
+++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/format_ranking_results.py
@@ -0,0 +1,770 @@
+# This program reads hdfs directories containing ranking results from openaire's cluster.
+# Based on the parameters provided by the user, it will create different types of output files.
+
+# Modes available are:
+# 1. bip
+# This will result in output of the form required for bip-finder's update.
+# Its lines conform to the following format:
+# \t \t \t \t \t \t \t <3y_cc> \t <3y_cc_normalized> \t \t
+
+# 2. zenodo
+# This is the format used in zenodo for Bip-DB. (6 way classes will be named C1, C2, ..., C6)
+# This should output two files per ranking method with each line having the following data:
+# a. <6-way-class>
+# NOTE: this should also run for openaire-id files, hence we should have a total of 4 files per ranking (2 for each type of identifier)
+# In 'zenodo' mode the user specifies only a single file, for which zenodo-based output will be created
+
+# 3. json
+# This if the format used to provide openAIRE / claudio with data containing 1 json per identifier
+# An example of such a json format follows:
+#{
+# "50|dedup_wf_001::08823c8f5c3ca2eae523817036cdda67": [
+# {
+# "id": "influence",
+# "unit": [
+# {
+# "key": "score",
+# "value": "5.06690394631e-09"
+# },
+# {
+# "key": "class",
+# "value": "C"
+# }
+# ]
+# },
+# {
+# "id": "popularity_alt",
+# "unit": [
+# {
+# "key": "score",
+# "value": "0.0"
+# },
+# {
+# "key": "class",
+# "value": "C"
+# }
+# ]
+# },
+# {
+# "id": "popularity",
+# "unit": [
+# {
+# "key": "score",
+# "value": "3.11855618382e-09"
+# },
+# {
+# "key": "class",
+# "value": "C"
+# }
+# ]
+# },
+# {
+# "id": "influence_alt",
+# "unit": [
+# {
+# "key": "score",
+# "value": "0.0"
+# },
+# {
+# "key": "class",
+# "value": "C"
+# }
+# ]
+# },
+# {
+# "id": "impulse",
+# "unit": [
+# {
+# "key": "score",
+# "value": "0.0"
+# },
+# {
+# "key": "class",
+# "value": "C"
+# }
+# ]
+# }
+# ]
+#}
+
+
+#################################################################################################
+# Imports
+import sys
+import time
+
+# Sparksession lib to communicate with cluster via session object
+from pyspark.sql import SparkSession
+
+# Import sql types to define the schema of score output files
+from pyspark.sql.types import *
+
+# Import sql functions with shorthand alias
+import pyspark.sql.functions as F
+from pyspark.sql.functions import udf
+
+# Json specific encoding
+import json
+#################################################################################################
+# Clean up directory name
+def clean_directory_name(dir_name):
+ # We have a name with the form *_bip_universe_* or *_graph_universe_*
+ # and we need to keep the parts in *
+ dir_name_parts = dir_name.split('_')
+ dir_name_parts = [part for part in dir_name_parts if ('bip' not in part and 'graph' not in part and 'universe' not in part and 'from' not in part)]
+
+ clean_name = '_'.join(dir_name_parts)
+ clean_name = clean_name.replace('_id', '_ids')
+
+ clean_name = clean_name.replace('.txt', '')
+ clean_name = clean_name.replace('.gz', '')
+
+ if 'openaire_ids_' in clean_name:
+ clean_name = clean_name.replace('openaire_ids_', '')
+ clean_name = clean_name + '_openaire_ids.txt.gz'
+ else:
+ clean_name = clean_name + '.txt.gz/'
+
+ return clean_name
+# --------------------------------------------------------------------------------------------- #
+# User defined function to escape special characters in a string that will turn into a json key
+@udf(StringType())
+def json_encode_key(doi_string):
+ return json.dumps(doi_string)
+#################################################################################################
+# --------------------------------------------------------------------------------------------- #
+# Arguments from command line and initializations
+
+# Time initialization
+start_time = time.time()
+
+# Check whether input is correct, otherwise exit with appropriate message
+if len(sys.argv) < 2:
+ print ("Usage: ./format_ranking_results.py ")
+ sys.exit(0)
+
+# Define valid modes:
+valid_modes = ['json', 'zenodo', 'bip', 'json-5-way']
+# Read mode provided by user
+mode = sys.argv[1].strip()
+
+# If mode isn't valid, exit
+if mode not in valid_modes:
+ print ("Usage: ./format_ranking_results.py \n")
+ print ("Invalid mode provided. Valid modes: ['zenodo', 'bip', 'json', 'json-5-way']")
+ sys.exit(0)
+
+
+# Once here, we should be more or less okay to run.
+
+# Define the spark session object
+spark = SparkSession.builder.appName('Parse Scores - ' + str(mode) + ' mode').getOrCreate()
+# Set Log Level for spark session
+spark.sparkContext.setLogLevel('WARN')
+
+# Here we define the schema shared by all score output files
+# - citation count variants have a slightly different schema, due to their scores being integers
+float_schema = StructType([
+ StructField('id', StringType(), False),
+ StructField('score', FloatType(), False),
+ StructField('normalized_score', FloatType(), False),
+ StructField('3-way-class', StringType(), False),
+ StructField('5-way-class', StringType(), False)
+ ])
+
+int_schema = StructType([
+ StructField('id', StringType(), False),
+ StructField('score', IntegerType(), False),
+ StructField('normalized_score', FloatType(), False),
+ StructField('3-way-class', StringType(), False),
+ StructField('5-way-class', StringType(), False)
+ ])
+
+# This schema concerns the output of the file
+# containing the number of references of each doi
+refs_schema = StructType([
+ StructField('id', StringType(), False),
+ StructField('num_refs', IntegerType(), False),
+ ])
+
+print("--- Initialization time: %s seconds ---" % (time.time() - start_time))
+
+# --------------------------------------------------------------------------------------------- #
+
+# Time the main program execution
+start_time = time.time()
+
+# The following is executed when the user requests the bip-update specific file
+if mode == 'bip':
+
+ # Read the remaining input files
+ if len(sys.argv) < 8:
+ print ("\n\nInsufficient input for 'bip' mode.")
+ print ("File list required: <3-year citation count> \n")
+ sys.exit(0)
+
+
+ # Read number of partitions:
+ num_partitions = int(sys.argv[-1])
+
+
+ pagerank_dir = sys.argv[2]
+ attrank_dir = sys.argv[3]
+ cc_dir = sys.argv[4]
+ impulse_dir = sys.argv[5]
+ ram_dir = sys.argv[6]
+ refs_dir = sys.argv[7]
+
+ # Score-specific dataframe
+ pagerank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(pagerank_dir).repartition(num_partitions, 'id')
+ attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(attrank_dir).repartition(num_partitions, 'id')
+ cc_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(cc_dir).repartition(num_partitions, 'id')
+ impulse_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(impulse_dir).repartition(num_partitions, 'id')
+ ram_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header', True).csv(ram_dir).repartition(num_partitions, 'id')
+ refs_df = spark.read.schema(refs_schema).option('delimiter', '\t').option('header',True).csv(refs_dir).repartition(num_partitions, 'id')
+
+ # ----------- TESTING CODE --------------- #
+ # pagerank_entries = pagerank_df.count()
+ # attrank_entries = attrank_df.count()
+ # cc_entries = cc_df.count()
+ # impulse_entries = impulse_df.count()
+ # ram_entries = ram_df.count()
+ # refs_entries = refs_df.count()
+
+ # print ("Pagerank:" + str(pagerank_entries))
+ # print ("AttRank:" + str(attrank_entries))
+ # print ("CC entries: " + str(cc_entries))
+ # print ("Impulse entries: " + str(impulse_entries))
+ # print ("Refs: " + str(refs_entries))
+ # ---------------------------------------- #
+
+ # Create a new dataframe with the required data
+ results_df = pagerank_df.select('id', F.col('score').alias('pagerank'), F.col('normalized_score').alias('pagerank_normalized'))
+ # Add attrank dataframe
+ results_df = results_df.join(attrank_df.select('id', 'score', 'normalized_score'), ['id'])\
+ .select(results_df.id, 'pagerank', 'pagerank_normalized', F.col('score').alias('attrank'), F.col('normalized_score').alias('attrank_normalized'))
+
+ # Add citation count dataframe
+ results_df = results_df.join(cc_df.select('id', 'score', 'normalized_score'), ['id'])\
+ .select(results_df.id, 'pagerank', 'pagerank_normalized', 'attrank', 'attrank_normalized', F.col('score').alias('cc'), F.col('normalized_score').alias('cc_normalized'))
+
+ # Add 3-year df
+ results_df = results_df.join(impulse_df.select('id', 'score', 'normalized_score'), ['id'])\
+ .select(results_df.id, 'pagerank', 'pagerank_normalized', 'attrank', 'attrank_normalized', 'cc', 'cc_normalized', \
+ F.col('score').alias('3-cc'), F.col('normalized_score').alias('3-cc_normalized'))
+
+ # Add ram df
+ results_df = results_df.join(ram_df.select('id', 'score'), ['id'])\
+ .select(results_df.id, 'pagerank', 'pagerank_normalized', 'attrank', 'attrank_normalized', 'cc', 'cc_normalized',\
+ '3-cc', '3-cc_normalized', F.col('score').alias('ram'))
+
+ # Add references
+ results_df = results_df.join(refs_df, ['id']).select(results_df.id, 'pagerank', 'pagerank_normalized', 'attrank', 'attrank_normalized', \
+ 'cc', 'cc_normalized', '3-cc', '3-cc_normalized', 'ram', 'num_refs')
+
+ # Write resulting dataframe to file
+ output_dir = "/".join(pagerank_dir.split('/')[:-1])
+ output_dir = output_dir + '/bip_update_data.txt.gz'
+
+ print("Writing to:" + output_dir)
+ results_df.write.mode('overwrite').option('delimiter','\t').option('header',True).csv(output_dir, compression='gzip')
+
+# The following is executed when the user requests the zenodo-specific file
+elif mode == 'zenodo':
+
+ # Read the remaining input files
+ if len(sys.argv) < 9:
+ print ("\n\nInsufficient input for 'zenodo' mode.")
+ print ("File list required: <3-year citation count> \n")
+ sys.exit(0)
+
+ # Read number of partitions:
+ num_partitions = int(sys.argv[-2])
+ graph_type = sys.argv[-1]
+
+ if graph_type not in ['bip', 'openaire']:
+ graph_type = 'bip'
+
+ pagerank_dir = sys.argv[2]
+ attrank_dir = sys.argv[3]
+ cc_dir = sys.argv[4]
+ impulse_dir = sys.argv[5]
+ ram_dir = sys.argv[6]
+
+ # Output directory is common for all files
+ output_dir_prefix = "/".join(pagerank_dir.split('/')[:-1])
+ # Method-specific outputs
+ pagerank_output = clean_directory_name(pagerank_dir.split('/')[-1])
+ attrank_output = clean_directory_name(attrank_dir.split('/')[-1])
+ cc_output = clean_directory_name(cc_dir.split('/')[-1])
+ impulse_output = clean_directory_name(impulse_dir.split('/')[-1])
+ ram_output = clean_directory_name(ram_dir.split('/')[-1])
+
+ # --------- PageRank ----------- #
+ # Get per file the doi - score - 6-way classes and write it to output
+ print("Writing to: " + output_dir_prefix + '/' + pagerank_output)
+ pagerank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(pagerank_dir).repartition(num_partitions, 'id').select('id', 'score', '5-way-class')
+ # Replace dataframe class names
+ pagerank_df = pagerank_df.withColumn('class', F.lit('C6'))
+ pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('E'), F.lit('C5')).otherwise(F.col('class')) )
+ pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
+ pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
+ pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
+ pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
+ pagerank_df = pagerank_df.drop('5-way-class')
+
+ if graph_type == 'openaire':
+ pagerank_df = pagerank_df.where( ~F.col('id').like('10.%') )
+
+ # Write output
+ pagerank_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_dir_prefix + '/' + pagerank_output, compression='gzip')
+ # --------- AttRank ----------- #
+ print("Writing to: " + output_dir_prefix + '/' + attrank_output)
+ attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(attrank_dir).repartition(num_partitions, 'id').select('id', 'score', '5-way-class')
+ # Replace dataframe class names
+ attrank_df = attrank_df.withColumn('class', F.lit('C6'))
+ attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('E'), F.lit('C5')).otherwise(F.col('class')) )
+ attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
+ attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
+ attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
+ attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
+ attrank_df = attrank_df.drop('5-way-class')
+
+ if graph_type == 'openaire':
+ attrank_df = attrank_df.where( ~F.col('id').like('10.%') )
+
+ # Write output
+ attrank_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_dir_prefix + '/' + attrank_output, compression='gzip')
+ # --------- Citation Count ----------- #
+ print("Writing to: " + output_dir_prefix + '/' + cc_output)
+ cc_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(cc_dir).repartition(num_partitions, 'id').select('id', 'score', '5-way-class')
+ # Replace dataframe class names
+ cc_df = cc_df.withColumn('class', F.lit('C5'))
+ # cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('E'), F.lit('C5')).otherwise(F.col('class')) )
+ cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
+ cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
+ cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
+ cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
+ cc_df = cc_df.drop('5-way-class')
+
+ if graph_type == 'openaire':
+ cc_df = cc_df.where( ~F.col('id').like('10.%') )
+
+ # Write output
+ cc_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_dir_prefix + '/' + cc_output, compression='gzip')
+ # --------- Impulse ----------- #
+ print("Writing to: " + output_dir_prefix + '/' + impulse_output)
+ impulse_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(impulse_dir).repartition(num_partitions, 'id').select('id', 'score', '5-way-class')
+ # Replace dataframe class names
+ impulse_df = impulse_df.withColumn('class', F.lit('C5'))
+ # impulse_df = impulse_df.withColumn('class', F.when(F.col('6-way-class') == F.lit('E'), F.lit('C5')).otherwise(F.col('class')) )
+ impulse_df = impulse_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
+ impulse_df = impulse_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
+ impulse_df = impulse_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
+ impulse_df = impulse_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
+ impulse_df = impulse_df.drop('5-way-class')
+
+ if graph_type == 'openaire':
+ impulse_df = impulse_df.where( ~F.col('id').like('10.%') )
+
+ # Write output
+ impulse_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_dir_prefix + '/' + impulse_output, compression='gzip')
+ # --------- RAM ----------- #
+ print("Writing to: " + output_dir_prefix + '/' + ram_output)
+ ram_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header', True).csv(ram_dir).repartition(num_partitions, 'id').select('id', 'score', '5-way-class')
+ # Replace dataframe class names
+ ram_df = ram_df.withColumn('class', F.lit('C5'))
+ # ram_df = ram_df.withColumn('class', F.when(F.col('6-way-class') == F.lit('E'), F.lit('C5')).otherwise(F.col('class')) )
+ ram_df = ram_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
+ ram_df = ram_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
+ ram_df = ram_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
+ ram_df = ram_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
+ ram_df = ram_df.drop('5-way-class')
+
+ if graph_type == 'openaire':
+ ram_df = ram_df.where( ~F.col('id').like('10.%') )
+
+ # Write output
+ ram_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_dir_prefix + '/' + ram_output, compression='gzip')
+
+# The following produces the json file required by openaire
+elif mode == 'json':
+
+ # Read the remaining input files
+ if len(sys.argv) < 9:
+ print ("\n\nInsufficient input for 'json' mode.")
+ print ("File list required: <3-year citation count> \n")
+ sys.exit(0)
+
+ # Read number of partitions:
+ num_partitions = int(sys.argv[-2])
+ graph_type = sys.argv[-1]
+
+ if graph_type not in ['bip', 'openaire']:
+ graph_type = 'bip'
+
+ print ("Graph type: " + str(graph_type))
+
+ # File directories
+ pagerank_dir = sys.argv[2]
+ attrank_dir = sys.argv[3]
+ cc_dir = sys.argv[4]
+ impulse_dir = sys.argv[5]
+ ram_dir = sys.argv[6]
+
+ print ("Reading files:")
+ print (pagerank_dir)
+ print (attrank_dir)
+ print (cc_dir)
+ print (impulse_dir)
+ print (ram_dir)
+
+ # Score-specific dataframe - read inputs
+ pagerank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(pagerank_dir).repartition(num_partitions, 'id')
+ attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',False).csv(attrank_dir).repartition(num_partitions, 'id')
+ cc_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(cc_dir).repartition(num_partitions, 'id')
+ impulse_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(impulse_dir).repartition(num_partitions, 'id')
+ ram_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header', True).csv(ram_dir).repartition(num_partitions, 'id')
+ # --- Join the data of the various scores --- #
+
+ # Create json data for pagerank
+ pagerank_df = pagerank_df.select('id', F.map_concat(
+ F.create_map(F.lit('key'), F.lit('score')),
+ F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
+ F.map_concat(
+ F.create_map(F.lit('key'), F.lit('class')),
+ F.create_map(F.lit('value'), F.col('3-way-class'))).alias('class_map'))
+
+ pagerank_df = pagerank_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('influence_values') )
+ pagerank_df = pagerank_df.select('id', F.create_map(F.lit('id'), F.lit('influence')).alias('id_map'), F.col('influence_values'))
+ pagerank_df = pagerank_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('influence'))).alias('influence_key'), F.to_json(F.col('influence_values')).alias('influence_values') )
+ pagerank_df = pagerank_df.select('id', F.expr('substring(influence_key, 0, length(influence_key)-1)').alias('influence_key'), 'influence_values')
+ pagerank_df = pagerank_df.select('id', 'influence_key', F.expr('substring(influence_values, 2, length(influence_values))').alias('influence_values'))
+ pagerank_df = pagerank_df.select('id', F.concat_ws(', ', F.col('influence_key'), F.col('influence_values')).alias('influence_json'))
+
+ # Create json data for attrank
+ attrank_df = attrank_df.select('id', F.map_concat(
+ F.create_map(F.lit('key'), F.lit('score')),
+ F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
+ F.map_concat(
+ F.create_map(F.lit('key'), F.lit('class')),
+ F.create_map(F.lit('value'), F.col('3-way-class'))).alias('class_map'))
+
+ attrank_df = attrank_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('popularity_values') )
+ attrank_df = attrank_df.select('id', F.create_map(F.lit('id'), F.lit('popularity')).alias('id_map'), F.col('popularity_values'))
+ attrank_df = attrank_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('popularity'))).alias('popularity_key'), F.to_json(F.col('popularity_values')).alias('popularity_values') )
+ attrank_df = attrank_df.select('id', F.expr('substring(popularity_key, 0, length(popularity_key)-1)').alias('popularity_key'), 'popularity_values')
+ attrank_df = attrank_df.select('id', 'popularity_key', F.expr('substring(popularity_values, 2, length(popularity_values))').alias('popularity_values'))
+ attrank_df = attrank_df.select('id', F.concat_ws(', ', F.col('popularity_key'), F.col('popularity_values')).alias('popularity_json'))
+
+ # Create json data for CC
+ cc_df = cc_df.select('id', F.map_concat(
+ F.create_map(F.lit('key'), F.lit('score')),
+ F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
+ F.map_concat(
+ F.create_map(F.lit('key'), F.lit('class')),
+ F.create_map(F.lit('value'), F.col('3-way-class'))).alias('class_map'))
+
+ cc_df = cc_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('influence_alt_values') )
+ cc_df = cc_df.select('id', F.create_map(F.lit('id'), F.lit('influence_alt')).alias('id_map'), F.col('influence_alt_values'))
+ cc_df = cc_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('influence_alt'))).alias('influence_alt_key'), F.to_json(F.col('influence_alt_values')).alias('influence_alt_values') )
+ cc_df = cc_df.select('id', F.expr('substring(influence_alt_key, 0, length(influence_alt_key)-1)').alias('influence_alt_key'), 'influence_alt_values')
+ cc_df = cc_df.select('id', 'influence_alt_key', F.expr('substring(influence_alt_values, 2, length(influence_alt_values))').alias('influence_alt_values'))
+ cc_df = cc_df.select('id', F.concat_ws(', ', F.col('influence_alt_key'), F.col('influence_alt_values')).alias('influence_alt_json'))
+
+
+ # Create json data for RAM
+ ram_df = ram_df.select('id', F.map_concat(
+ F.create_map(F.lit('key'), F.lit('score')),
+ F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
+ F.map_concat(
+ F.create_map(F.lit('key'), F.lit('class')),
+ F.create_map(F.lit('value'), F.col('3-way-class'))).alias('class_map'))
+
+ ram_df = ram_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('popularity_alt_values') )
+ ram_df = ram_df.select('id', F.create_map(F.lit('id'), F.lit('popularity_alt')).alias('id_map'), F.col('popularity_alt_values'))
+ ram_df = ram_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('popularity_alt'))).alias('popularity_alt_key'), F.to_json(F.col('popularity_alt_values')).alias('popularity_alt_values') )
+ ram_df = ram_df.select('id', F.expr('substring(popularity_alt_key, 0, length(popularity_alt_key)-1)').alias('popularity_alt_key'), 'popularity_alt_values')
+ ram_df = ram_df.select('id', 'popularity_alt_key', F.expr('substring(popularity_alt_values, 2, length(popularity_alt_values))').alias('popularity_alt_values'))
+ ram_df = ram_df.select('id', F.concat_ws(', ', F.col('popularity_alt_key'), F.col('popularity_alt_values')).alias('popularity_alt_json'))
+
+ # Create json data for impulse
+ impulse_df = impulse_df.select('id', F.map_concat(
+ F.create_map(F.lit('key'), F.lit('score')),
+ F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
+ F.map_concat(
+ F.create_map(F.lit('key'), F.lit('class')),
+ F.create_map(F.lit('value'), F.col('3-way-class'))).alias('class_map'))
+
+ impulse_df = impulse_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('impulse_values') )
+ impulse_df = impulse_df.select('id', F.create_map(F.lit('id'), F.lit('impulse')).alias('id_map'), F.col('impulse_values'))
+ impulse_df = impulse_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('impulse'))).alias('impulse_key'), F.to_json(F.col('impulse_values')).alias('impulse_values') )
+ impulse_df = impulse_df.select('id', F.expr('substring(impulse_key, 0, length(impulse_key)-1)').alias('impulse_key'), 'impulse_values')
+ impulse_df = impulse_df.select('id', 'impulse_key', F.expr('substring(impulse_values, 2, length(impulse_values))').alias('impulse_values'))
+ impulse_df = impulse_df.select('id', F.concat_ws(', ', F.col('impulse_key'), F.col('impulse_values')).alias('impulse_json'))
+
+ #Join dataframes together
+ results_df = pagerank_df.join(attrank_df, ['id'])
+ results_df = results_df.join(cc_df, ['id'])
+ results_df = results_df.join(ram_df, ['id'])
+ results_df = results_df.join(impulse_df, ['id'])
+
+ print ("Json encoding DOI keys")
+ # Json encode doi strings
+ results_df = results_df.select(json_encode_key('id').alias('id'), 'influence_json', 'popularity_json', 'influence_alt_json', 'popularity_alt_json', 'impulse_json')
+
+ # Concatenate individual json columns
+ results_df = results_df.select('id', F.concat_ws(', ', F.col('influence_json'), F.col('popularity_json'), F.col('influence_alt_json'), F.col('popularity_alt_json'), F.col('impulse_json') ).alias('json_data'))
+ results_df = results_df.select('id', F.concat_ws('', F.lit('['), F.col('json_data'), F.lit(']')).alias('json_data') )
+
+ # Filter out non-openaire ids if need
+ if graph_type == 'openaire':
+ results_df = results_df.where( ~F.col('id').like('"10.%') )
+
+ # Concatenate paper id and add opening and ending brackets
+ results_df = results_df.select(F.concat_ws('', F.lit('{'), F.col('id'), F.lit(': '), F.col('json_data'), F.lit('}')).alias('json') )
+
+ # -------------------------------------------- #
+ # Write json output - set the directory here
+ output_dir = "/".join(pagerank_dir.split('/')[:-1])
+ if graph_type == 'bip':
+ output_dir = output_dir + '/bip_universe_doi_scores/'
+ else:
+ output_dir = output_dir + '/openaire_universe_scores/'
+
+ # Write the dataframe
+ print ("Writing output to: " + output_dir)
+ results_df.write.mode('overwrite').option('header', False).text(output_dir, compression='gzip')
+
+ # Rename the files to .json.gz now
+ sc = spark.sparkContext
+ URI = sc._gateway.jvm.java.net.URI
+ Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
+ FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
+ # Get master prefix from input file path
+ master_prefix = "/".join(pagerank_dir.split('/')[:5])
+ fs = FileSystem.get(URI(master_prefix), sc._jsc.hadoopConfiguration())
+ path = Path(output_dir)
+ print ("Path is:" + path.toString())
+ file_list = fs.listStatus(Path(output_dir))
+ print ("Renaming files:")
+ for f in file_list:
+ initial_filename = f.getPath().toString()
+ if "part" in initial_filename:
+ print (initial_filename + " => " + initial_filename.replace(".txt.gz", ".json.gz"))
+ fs.rename(Path(initial_filename), Path(initial_filename.replace(".txt.gz", ".json.gz")))
+
+
+ '''
+ DEPRECATED:
+ # -------------------------------------------- #
+ # Write json output
+ output_dir = "/".join(pagerank_dir.split('/')[:-1])
+ if graph_type == 'bip':
+ output_dir = output_dir + '/bip_universe_doi_scores_txt/'
+ else:
+ output_dir = output_dir + '/openaire_universe_scores_txt/'
+
+ print ("Writing output to: " + output_dir)
+ results_df.write.mode('overwrite').option('header', False).text(output_dir, compression='gzip')
+ print ("Done writing first results")
+ # Read results df as json and write it as json file
+ print ("Reading json input from: " + str(output_dir))
+ resulds_df_json = spark.read.json(output_dir).cache()
+ # Write json to different dir
+ print ("Writing json output to: " + output_dir.replace("_txt", ""))
+ resulds_df_json.write.mode('overwrite').json(output_dir.replace("_txt", ""), compression='gzip')
+ '''
+
+# The following produces the json file required by openaire
+elif mode == 'json-5-way':
+
+ # Read the remaining input files
+ if len(sys.argv) < 9:
+ print ("\n\nInsufficient input for 'json-5-way' mode.")
+ print ("File list required: <3-year citation count> \n")
+ sys.exit(0)
+
+ # Read number of partitions:
+ num_partitions = int(sys.argv[-2])
+ graph_type = sys.argv[-1]
+
+ if graph_type not in ['bip', 'openaire']:
+ graph_type = 'bip'
+
+ # File directories
+ pagerank_dir = sys.argv[2]
+ attrank_dir = sys.argv[3]
+ cc_dir = sys.argv[4]
+ impulse_dir = sys.argv[5]
+ ram_dir = sys.argv[6]
+
+ # Score-specific dataframe - read inputs
+ pagerank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(pagerank_dir).repartition(num_partitions, 'id')
+ attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',False).csv(attrank_dir).repartition(num_partitions, 'id')
+ cc_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(cc_dir).repartition(num_partitions, 'id')
+ impulse_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(impulse_dir).repartition(num_partitions, 'id')
+ ram_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header', True).csv(ram_dir).repartition(num_partitions, 'id')
+ # --- Join the data of the various scores --- #
+
+
+ # Replace 6-way classes with 5-way values
+ pagerank_df = pagerank_df.withColumn('class', F.lit('C5'))
+ pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
+ pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
+ pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
+ pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
+ pagerank_df = pagerank_df.drop('5-way-class').withColumnRenamed('class', '5-way-class')
+
+
+ # Create json data for pagerank
+ pagerank_df = pagerank_df.select('id', F.map_concat(
+ F.create_map(F.lit('key'), F.lit('score')),
+ F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
+ F.map_concat(
+ F.create_map(F.lit('key'), F.lit('class')),
+ F.create_map(F.lit('value'), F.col('5-way-class'))).alias('class_map'))
+
+
+
+ pagerank_df = pagerank_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('influence_values') )
+ pagerank_df = pagerank_df.select('id', F.create_map(F.lit('id'), F.lit('influence')).alias('id_map'), F.col('influence_values'))
+ pagerank_df = pagerank_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('influence'))).alias('influence_key'), F.to_json(F.col('influence_values')).alias('influence_values') )
+ pagerank_df = pagerank_df.select('id', F.expr('substring(influence_key, 0, length(influence_key)-1)').alias('influence_key'), 'influence_values')
+ pagerank_df = pagerank_df.select('id', 'influence_key', F.expr('substring(influence_values, 2, length(influence_values))').alias('influence_values'))
+ pagerank_df = pagerank_df.select('id', F.concat_ws(', ', F.col('influence_key'), F.col('influence_values')).alias('influence_json'))
+
+ # Replace 6-way classes with 5 way classes for attrank
+ attrank_df = attrank_df.withColumn('class', F.lit('C5'))
+ attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
+ attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
+ attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
+ attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
+ attrank_df = attrank_df.drop('5-way-class').withColumnRenamed('class', '5-way-class')
+
+ # Create json data for attrank
+ attrank_df = attrank_df.select('id', F.map_concat(
+ F.create_map(F.lit('key'), F.lit('score')),
+ F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
+ F.map_concat(
+ F.create_map(F.lit('key'), F.lit('class')),
+ F.create_map(F.lit('value'), F.col('5-way-class'))).alias('class_map'))
+
+ attrank_df = attrank_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('popularity_values') )
+ attrank_df = attrank_df.select('id', F.create_map(F.lit('id'), F.lit('popularity')).alias('id_map'), F.col('popularity_values'))
+ attrank_df = attrank_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('popularity'))).alias('popularity_key'), F.to_json(F.col('popularity_values')).alias('popularity_values') )
+ attrank_df = attrank_df.select('id', F.expr('substring(popularity_key, 0, length(popularity_key)-1)').alias('popularity_key'), 'popularity_values')
+ attrank_df = attrank_df.select('id', 'popularity_key', F.expr('substring(popularity_values, 2, length(popularity_values))').alias('popularity_values'))
+ attrank_df = attrank_df.select('id', F.concat_ws(', ', F.col('popularity_key'), F.col('popularity_values')).alias('popularity_json'))
+
+ # Replace 6-way classes with 5 way classes for attrank
+ cc_df = cc_df.withColumn('class', F.lit('C5'))
+ cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
+ cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
+ cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
+ cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
+ cc_df = cc_df.drop('5-way-class').withColumnRenamed('class', '5-way-class')
+
+ # Create json data for CC
+ cc_df = cc_df.select('id', F.map_concat(
+ F.create_map(F.lit('key'), F.lit('score')),
+ F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
+ F.map_concat(
+ F.create_map(F.lit('key'), F.lit('class')),
+ F.create_map(F.lit('value'), F.col('5-way-class'))).alias('class_map'))
+
+ cc_df = cc_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('influence_alt_values') )
+ cc_df = cc_df.select('id', F.create_map(F.lit('id'), F.lit('influence_alt')).alias('id_map'), F.col('influence_alt_values'))
+ cc_df = cc_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('influence_alt'))).alias('influence_alt_key'), F.to_json(F.col('influence_alt_values')).alias('influence_alt_values') )
+ cc_df = cc_df.select('id', F.expr('substring(influence_alt_key, 0, length(influence_alt_key)-1)').alias('influence_alt_key'), 'influence_alt_values')
+ cc_df = cc_df.select('id', 'influence_alt_key', F.expr('substring(influence_alt_values, 2, length(influence_alt_values))').alias('influence_alt_values'))
+ cc_df = cc_df.select('id', F.concat_ws(', ', F.col('influence_alt_key'), F.col('influence_alt_values')).alias('influence_alt_json'))
+
+ # Replace 6-way classes with 5 way classes for attrank
+ ram_df = ram_df.withColumn('class', F.lit('C5'))
+ ram_df = ram_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
+ ram_df = ram_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
+ ram_df = ram_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
+ ram_df = ram_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
+ ram_df = ram_df.drop('5-way-class').withColumnRenamed('class', '5-way-class')
+
+ # Create json data for RAM
+ ram_df = ram_df.select('id', F.map_concat(
+ F.create_map(F.lit('key'), F.lit('score')),
+ F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
+ F.map_concat(
+ F.create_map(F.lit('key'), F.lit('class')),
+ F.create_map(F.lit('value'), F.col('5-way-class'))).alias('class_map'))
+
+ ram_df = ram_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('popularity_alt_values') )
+ ram_df = ram_df.select('id', F.create_map(F.lit('id'), F.lit('popularity_alt')).alias('id_map'), F.col('popularity_alt_values'))
+ ram_df = ram_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('popularity_alt'))).alias('popularity_alt_key'), F.to_json(F.col('popularity_alt_values')).alias('popularity_alt_values') )
+ ram_df = ram_df.select('id', F.expr('substring(popularity_alt_key, 0, length(popularity_alt_key)-1)').alias('popularity_alt_key'), 'popularity_alt_values')
+ ram_df = ram_df.select('id', 'popularity_alt_key', F.expr('substring(popularity_alt_values, 2, length(popularity_alt_values))').alias('popularity_alt_values'))
+ ram_df = ram_df.select('id', F.concat_ws(', ', F.col('popularity_alt_key'), F.col('popularity_alt_values')).alias('popularity_alt_json'))
+
+ # Replace 6-way classes with 5 way classes for attrank
+ impulse_df = impulse_df.withColumn('class', F.lit('C5'))
+ impulse_df = impulse_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
+ impulse_df = impulse_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
+ impulse_df = impulse_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
+ impulse_df = impulse_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
+ impulse_df = impulse_df.drop('5-way-class').withColumnRenamed('class', '5-way-class')
+
+ # Create json data for impulse
+ impulse_df = impulse_df.select('id', F.map_concat(
+ F.create_map(F.lit('key'), F.lit('score')),
+ F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
+ F.map_concat(
+ F.create_map(F.lit('key'), F.lit('class')),
+ F.create_map(F.lit('value'), F.col('5-way-class'))).alias('class_map'))
+
+ impulse_df = impulse_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('impulse_values') )
+ impulse_df = impulse_df.select('id', F.create_map(F.lit('id'), F.lit('impulse')).alias('id_map'), F.col('impulse_values'))
+ impulse_df = impulse_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('impulse'))).alias('impulse_key'), F.to_json(F.col('impulse_values')).alias('impulse_values') )
+ impulse_df = impulse_df.select('id', F.expr('substring(impulse_key, 0, length(impulse_key)-1)').alias('impulse_key'), 'impulse_values')
+ impulse_df = impulse_df.select('id', 'impulse_key', F.expr('substring(impulse_values, 2, length(impulse_values))').alias('impulse_values'))
+ impulse_df = impulse_df.select('id', F.concat_ws(', ', F.col('impulse_key'), F.col('impulse_values')).alias('impulse_json'))
+
+ #Join dataframes together
+ results_df = pagerank_df.join(attrank_df, ['id'])
+ results_df = results_df.join(cc_df, ['id'])
+ results_df = results_df.join(ram_df, ['id'])
+ results_df = results_df.join(impulse_df, ['id'])
+
+ print ("Json encoding DOI keys")
+ # Json encode doi strings
+ results_df = results_df.select(json_encode_key('id').alias('id'), 'influence_json', 'popularity_json', 'influence_alt_json', 'popularity_alt_json', 'impulse_json')
+
+ # Concatenate individual json columns
+ results_df = results_df.select('id', F.concat_ws(', ', F.col('influence_json'), F.col('popularity_json'), F.col('influence_alt_json'), F.col('popularity_alt_json'), F.col('impulse_json') ).alias('json_data'))
+ results_df = results_df.select('id', F.concat_ws('', F.lit('['), F.col('json_data'), F.lit(']')).alias('json_data') )
+
+ # Filter out non-openaire ids if need
+ if graph_type == 'openaire':
+ results_df = results_df.where( ~F.col('id').like('10.%') )
+
+ # Concatenate paper id and add opening and ending brackets
+ results_df = results_df.select(F.concat_ws('', F.lit('{'), F.col('id'), F.lit(': '), F.col('json_data'), F.lit('}')).alias('json') )
+
+ # TEST output and count
+ # results_df.show(20, False)
+ # print ("Results #" + str(results_df.count()))
+
+ # -------------------------------------------- #
+ # Write json output
+ output_dir = "/".join(pagerank_dir.split('/')[:-1])
+ if graph_type == 'bip':
+ output_dir = output_dir + '/bip_universe_doi_scores_5_classes/'
+ else:
+ output_dir = output_dir + '/openaire_universe_scores_5_classes/'
+
+ print ("Writing output to: " + output_dir)
+ results_df.write.mode('overwrite').option('header', False).text(output_dir, compression='gzip')
+
+# Close spark session
+spark.stop()
+
+print("--- Main program execution time: %s seconds ---" % (time.time() - start_time))
+print("--- Finished --- \n\n")
+
diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/get_ranking_files.sh b/dhp-workflows/dhp-impact-indicators/src/main/resources/get_ranking_files.sh
new file mode 100644
index 0000000000..4d0fedba92
--- /dev/null
+++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/get_ranking_files.sh
@@ -0,0 +1,14 @@
+ranking_results_folder=$1;
+
+pr_file=`hdfs dfs -ls ${ranking_results_folder}/ | grep "/PR_.*" | grep -o "PR.*"`;
+attrank_file=`hdfs dfs -ls ${ranking_results_folder}/ | grep "/AttRank.*" | grep -o "AttRank.*"`;
+cc_file=`hdfs dfs -ls ${ranking_results_folder}/ | grep "/CC_.*" | grep -o "CC.*"`;
+impulse_file=`hdfs dfs -ls ${ranking_results_folder}/ | grep "/3-year_.*" | grep -o "3-year.*"`;
+ram_file=`hdfs dfs -ls ${ranking_results_folder}/ | grep "/RAM_.*" | grep -o "RAM.*"`;
+
+echo "pr_file=${pr_file}";
+echo "attrank_file=${attrank_file}";
+echo "cc_file=${cc_file}";
+echo "impulse_file=${impulse_file}";
+echo "ram_file=${ram_file}";
+# echo "TEST=`hdfs dfs -ls ${ranking_results_folder}/`";
diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/job.properties b/dhp-workflows/dhp-impact-indicators/src/main/resources/job.properties
new file mode 100644
index 0000000000..9ad9def218
--- /dev/null
+++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/job.properties
@@ -0,0 +1,86 @@
+# The following set of properties are defined in https://support.openaire.eu/projects/openaire/wiki/Hadoop_clusters
+# and concern the parameterization required for running workflows on the @GARR cluster
+
+dhp.hadoop.frontend.temp.dir=/home/ilias.kanellos
+dhp.hadoop.frontend.user.name=ilias.kanellos
+dhp.hadoop.frontend.host.name=iis-cdh5-test-gw.ocean.icm.edu.pl
+dhp.hadoop.frontend.port.ssh=22
+oozieServiceLoc=http://iis-cdh5-test-m3:11000/oozie
+jobTracker=yarnRM
+nameNode=hdfs://nameservice1
+oozie.execution.log.file.location = target/extract-and-run-on-remote-host.log
+maven.executable=mvn
+sparkDriverMemory=7G
+sparkExecutorMemory=7G
+sparkExecutorCores=4
+# The above is given differently in an example I found online
+oozie.action.sharelib.for.spark=spark2
+oozieActionShareLibForSpark2=spark2
+spark2YarnHistoryServerAddress=http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089
+spark2EventLogDir=/user/spark/spark2ApplicationHistory
+sparkSqlWarehouseDir=/user/hive/warehouse
+hiveMetastoreUris=thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+# This MAY avoid the no library used error
+oozie.use.system.libpath=true
+# Some stuff copied from openaire's jobs
+spark2ExtraListeners=com.cloudera.spark.lineage.NavigatorAppListener
+spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener
+
+
+# Some stuff copied from openaire's jobs
+spark2ExtraListeners=com.cloudera.spark.lineage.NavigatorAppListener
+spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener
+
+# ------------------------------------------------------------------------------ #
+# The following set of properties are my own custom ones
+
+# Based on the page linked to at the start of the file, if we use yarn as a resource manager, its address is given as follows
+resourceManager=http://iis-cdh5-test-m2.ocean.icm.edu.pl:8088/cluster
+
+# current year used when creating graph / by some ranking methods
+currentYear=2024
+
+# Alpha value for pagerank
+pageRankAlpha=0.5
+# AttRank values
+attrankAlpha=0.2
+attrankBeta=0.5
+attrankGamma=0.3
+attrankRho=-0.16
+# attrankCurrentYear=2023
+attrankStartYear=2021
+
+# Ram values
+ramGamma=0.6
+# ramCurrentYear=2023
+
+# Convergence error for pagerank
+convergenceError=0.000000000001
+
+# I think this should be the oozie workflow directory
+oozieWorkflowPath=user/ilias.kanellos/workflow_example/
+
+# The directory where the workflow data is/should be stored
+workflowDataDir=user/ilias.kanellos/ranking_workflow
+
+# Directory where dataframes are checkpointed
+checkpointDir=${nameNode}/${workflowDataDir}/check/
+
+# The directory for the doi-based bip graph
+bipGraphFilePath=${nameNode}/${workflowDataDir}/bipdbv8_graph
+
+# The folder from which synonyms of openaire-ids are read
+# openaireDataInput=${nameNode}/tmp/beta_provision/graph/21_graph_cleaned/
+openaireDataInput=${/tmp/prod_provision/graph/18_graph_blacklisted}
+
+# A folder where we will write the openaire to doi mapping
+synonymFolder=${nameNode}/${workflowDataDir}/openaireid_to_dois/
+
+# This will be where we store the openaire graph input. They told us on GARR to use a directory under /data
+openaireGraphInputPath=${nameNode}/${workflowDataDir}/openaire_id_graph
+
+# The workflow application path
+wfAppPath=${nameNode}/${oozieWorkflowPath}
+# The following is needed as a property of a workflow
+oozie.wf.application.path=${wfAppPath}
+
diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/map_openaire_ids_to_dois.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/map_openaire_ids_to_dois.py
new file mode 100644
index 0000000000..7997eec82c
--- /dev/null
+++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/map_openaire_ids_to_dois.py
@@ -0,0 +1,60 @@
+import json
+import sys
+from pyspark.sql import SparkSession
+from pyspark import SparkConf, SparkContext
+
+if len(sys.argv) != 3:
+ print("Usage: map_openaire_ids_to_dois.py ")
+ sys.exit(-1)
+
+conf = SparkConf().setAppName('BIP!: Map OpenAIRE IDs to DOIs')
+sc = SparkContext(conf = conf)
+spark = SparkSession.builder.appName('BIP!: Map OpenAIRE IDs to DOIs').getOrCreate()
+sc.setLogLevel('OFF')
+
+src_dir = sys.argv[1]
+output = sys.argv[2]
+
+# src_dir = "/tmp/beta_provision/graph/21_graph_cleaned/"
+# output = '/tmp/openaireid_to_dois/'
+
+def transform(doc):
+
+ # get publication year from 'doc.dateofacceptance.value'
+ dateofacceptance = doc.get('dateofacceptance', {}).get('value')
+
+ year = 0
+
+ if (dateofacceptance is not None):
+ year = dateofacceptance.split('-')[0]
+
+ # for each pid get 'pid.value' if 'pid.qualifier.classid' equals to 'doi'
+ dois = [ pid['value'] for pid in doc.get('pid', []) if (pid.get('qualifier', {}).get('classid') == 'doi' and pid['value'] is not None)]
+
+ num_dois = len(dois)
+
+ # exlcude openaire ids that do not correspond to DOIs
+ if (num_dois == 0):
+ return None
+
+ fields = [ doc['id'], str(num_dois), chr(0x02).join(dois), str(year) ]
+
+ return '\t'.join([ v.encode('utf-8') for v in fields ])
+
+docs = None
+
+for result_type in ["publication", "dataset", "software", "otherresearchproduct"]:
+
+ tmp = sc.textFile(src_dir + result_type).map(json.loads)
+
+ if (docs is None):
+ docs = tmp
+ else:
+ # append all result types in one RDD
+ docs = docs.union(tmp)
+
+docs = docs.filter(lambda d: d.get('dataInfo', {}).get('deletedbyinference') == False and d.get('dataInfo', {}).get('invisible') == False)
+
+docs = docs.map(transform).filter(lambda d: d is not None)
+
+docs.saveAsTextFile(output)
diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/map_scores_to_dois.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/map_scores_to_dois.py
new file mode 100644
index 0000000000..0d294e0458
--- /dev/null
+++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/map_scores_to_dois.py
@@ -0,0 +1,145 @@
+# This program reads the openaire to doi mapping from the ${synonymFolder} of the workflow
+# and uses this mapping to create doi-based score files in the format required by BiP! DB.
+# This is done by reading each openaire-id based ranking file and joining the openaire based
+# score and classes to all the corresponding dois.
+#################################################################################################
+# Imports
+import sys
+
+# Sparksession lib to communicate with cluster via session object
+from pyspark.sql import SparkSession
+
+# Import sql types to define schemas
+from pyspark.sql.types import *
+
+# Import sql functions with shorthand alias
+import pyspark.sql.functions as F
+# from pyspark.sql.functions import udf
+#################################################################################################
+#################################################################################################
+# Clean up directory name
+def clean_directory_name(dir_name):
+ # We have a name with the form *_bip_universe_* or *_graph_universe_*
+ # and we need to keep the parts in *
+ dir_name_parts = dir_name.split('_')
+ dir_name_parts = [part for part in dir_name_parts if ('bip' not in part and 'graph' not in part and 'universe' not in part and 'from' not in part)]
+
+ clean_name = '_'.join(dir_name_parts)
+
+ if '_ids' not in clean_name:
+ clean_name = clean_name.replace('id_', 'ids_')
+
+ # clean_name = clean_name.replace('.txt', '')
+ # clean_name = clean_name.replace('.gz', '')
+
+ if 'openaire_ids_' in clean_name:
+ clean_name = clean_name.replace('openaire_ids_', '')
+ # clean_name = clean_name + '.txt.gz'
+ # else:
+ # clean_name = clean_name + '.txt.gz'
+
+ return clean_name
+#################################################################################################
+if len(sys.argv) < 3:
+ print ("Usage: ./map_scores_to_dois.py <...etc...>")
+ sys.exit(-1)
+
+# Read arguments
+synonyms_folder = sys.argv[1]
+num_partitions = int(sys.argv[2])
+input_file_list = [argument for argument in sys.argv[3:]]
+input_file_list = [clean_directory_name(item) for item in input_file_list]
+
+# Prepare output specific variables
+output_file_list = [item.replace("_openaire_ids", "") for item in input_file_list]
+output_file_list = [item + ".gz" if not item.endswith(".gz") else item for item in output_file_list]
+
+# --- INFO MESSAGES --- #
+print ("\n\n----------------------------")
+print ("Mpping openaire ids to DOIs")
+print ("Reading input from: " + synonyms_folder)
+print ("Num partitions: " + str(num_partitions))
+print ("Input files:" + " -- ".join(input_file_list))
+print ("Output files: " + " -- ".join(output_file_list))
+print ("----------------------------\n\n")
+#######################################################################################
+# We weill define the following schemas:
+# --> the schema of the openaire - doi mapping file [string - int - doi_list] (the separator of the doi-list is a non printable character)
+# --> a schema for floating point ranking scores [string - float - string] (the latter string is the class)
+# --> a schema for integer ranking scores [string - int - string] (the latter string is the class)
+
+float_schema = StructType([
+ StructField('id', StringType(), False),
+ StructField('score', FloatType(), False),
+ StructField('class', StringType(), False)
+ ])
+
+int_schema = StructType([
+ StructField('id', StringType(), False),
+ StructField('score', IntegerType(), False),
+ StructField('class', StringType(), False)
+ ])
+
+# This schema concerns the output of the file
+# containing the number of references of each doi
+synonyms_schema = StructType([
+ StructField('id', StringType(), False),
+ StructField('num_synonyms', IntegerType(), False),
+ StructField('doi_list', StringType(), False),
+ ])
+#######################################################################################
+# Start spark session
+spark = SparkSession.builder.appName('Map openaire scores to DOIs').getOrCreate()
+# Set Log Level for spark session
+spark.sparkContext.setLogLevel('WARN')
+#######################################################################################
+# MAIN Program
+
+# Read and repartition the synonym folder - also cache it since we will need to perform multiple joins
+synonym_df = spark.read.schema(synonyms_schema).option('delimiter', '\t').csv(synonyms_folder)
+synonym_df = synonym_df.select('id', F.split(F.col('doi_list'), chr(0x02)).alias('doi_list'))
+synonym_df = synonym_df.select('id', F.explode('doi_list').alias('doi')).repartition(num_partitions, 'id').cache()
+
+# TESTING
+# print ("Synonyms: " + str(synonym_df.count()))
+# print ("DF looks like this:" )
+# synonym_df.show(1000, False)
+
+print ("\n\n-----------------------------")
+# Now we need to join the score files on the openaire-id with the synonyms and then keep
+# only doi - score - class and write this to the output
+for offset, input_file in enumerate(input_file_list):
+
+ print ("Mapping scores from " + input_file)
+
+ # Select correct schema
+ schema = int_schema
+ if "attrank" in input_file.lower() or "pr" in input_file.lower() or "ram" in input_file.lower():
+ schema = float_schema
+
+ # Load file to dataframe
+ ranking_df = spark.read.schema(schema).option('delimiter', '\t').csv(input_file).repartition(num_partitions, 'id')
+
+ # TESTING
+ # print ("Loaded df sample:")
+ # ranking_df.show(1000, False)
+
+ # Join scores to synonyms and keep required fields
+ doi_score_df = synonym_df.join(ranking_df, ['id']).select('doi', 'score', 'class').repartition(num_partitions, 'doi').cache()
+ # Write output
+ output_file = output_file_list[offset]
+ print ("Writing to: " + output_file)
+ doi_score_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_file, compression='gzip')
+ # Free memory?
+ ranking_df.unpersist(True)
+
+print ("-----------------------------")
+print ("\n\nFinished!\n\n")
+
+
+
+
+
+
+
+
diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/workflow.xml
new file mode 100644
index 0000000000..807c32063c
--- /dev/null
+++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/workflow.xml
@@ -0,0 +1,600 @@
+
+
+
+
+
+
+
+
+
+
+ ${resume eq "rankings-start"}
+ ${resume eq "impulse"}
+ ${resume eq "rankings-iterative"}
+ ${resume eq "format-results"}
+ ${resume eq "map-ids"}
+ ${resume eq "map-scores"}
+ ${resume eq "start"}
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+
+ ${nameNode}
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+
+
+ Openaire Ranking Graph Creation
+
+ create_openaire_ranking_graph.py
+
+ --executor-memory 20G --executor-cores 4 --driver-memory 20G
+ --master yarn
+ --deploy-mode cluster
+ --conf spark.sql.shuffle.partitions=7680
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+
+ ${openaireDataInput}
+
+ ${currentYear}
+
+ 7680
+
+ ${openaireGraphInputPath}
+
+ ${wfAppPath}/create_openaire_ranking_graph.py#create_openaire_ranking_graph.py
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+
+ ${nameNode}
+
+
+
+ yarn-cluster
+ cluster
+
+
+ Spark CC
+
+ CC.py
+
+ --executor-memory 18G --executor-cores 4 --driver-memory 10G
+ --master yarn
+ --deploy-mode cluster
+ --conf spark.sql.shuffle.partitions=7680
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ ${openaireGraphInputPath}
+
+ 7680
+
+ ${wfAppPath}/CC.py#CC.py
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+
+ ${nameNode}
+
+
+
+ yarn-cluster
+ cluster
+
+
+ Spark RAM
+
+ TAR.py
+
+ --executor-memory 18G --executor-cores 4 --driver-memory 10G
+ --master yarn
+ --deploy-mode cluster
+ --conf spark.sql.shuffle.partitions=7680
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ ${openaireGraphInputPath}
+ ${ramGamma}
+ ${currentYear}
+ RAM
+
+ 7680
+ ${γιτ α}
+
+ ${wfAppPath}/TAR.py#TAR.py
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+
+ ${nameNode}
+
+
+
+ yarn-cluster
+ cluster
+
+
+ Spark Impulse
+
+ CC.py
+
+ --executor-memory 18G --executor-cores 4 --driver-memory 10G
+ --master yarn
+ --deploy-mode cluster
+ --conf spark.sql.shuffle.partitions=7680
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ ${openaireGraphInputPath}
+
+ 7680
+ 3
+
+ ${wfAppPath}/CC.py#CC.py
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+
+ ${nameNode}
+
+
+
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+
+
+ Spark Pagerank
+
+ PageRank.py
+
+ --executor-memory 18G --executor-cores 4 --driver-memory 10G
+ --master yarn
+ --deploy-mode cluster
+ --conf spark.sql.shuffle.partitions=7680
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ ${openaireGraphInputPath}
+ ${pageRankAlpha}
+ ${convergenceError}
+ ${checkpointDir}
+
+ 7680
+ dfs
+
+ ${wfAppPath}/PageRank.py#PageRank.py
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+
+ ${nameNode}
+
+
+ yarn-cluster
+ cluster
+
+
+ Spark AttRank
+
+ AttRank.py
+
+ --executor-memory 18G --executor-cores 4 --driver-memory 10G
+ --master yarn
+ --deploy-mode cluster
+ --conf spark.sql.shuffle.partitions=7680
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ ${openaireGraphInputPath}
+ ${attrankAlpha}
+ ${attrankBeta}
+ ${attrankGamma}
+ ${attrankRho}
+ ${currentYear}
+ ${attrankStartYear}
+ ${convergenceError}
+ ${checkpointDir}
+
+ 7680
+ dfs
+
+ ${wfAppPath}/AttRank.py#AttRank.py
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+
+ ${nameNode}
+
+
+ /usr/bin/bash
+
+ get_ranking_files.sh
+
+ /${workflowDataDir}
+
+
+ ${wfAppPath}/get_ranking_files.sh#get_ranking_files.sh
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+
+ ${nameNode}
+
+
+ yarn-cluster
+ cluster
+
+
+ Format Ranking Results JSON
+
+ format_ranking_results.py
+
+ --executor-memory 10G --executor-cores 4 --driver-memory 10G
+ --master yarn
+ --deploy-mode cluster
+ --conf spark.sql.shuffle.partitions=7680
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ json
+
+ ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']}
+ ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']}
+ ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['cc_file']}
+ ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['impulse_file']}
+ ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['ram_file']}
+
+ 7680
+
+ openaire
+
+ ${wfAppPath}/format_ranking_results.py#format_ranking_results.py
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+
+ ${nameNode}
+
+
+ yarn-cluster
+ cluster
+
+
+ Format Ranking Results BiP! DB
+
+ format_ranking_results.py
+
+ --executor-memory 10G --executor-cores 4 --driver-memory 10G
+ --master yarn
+ --deploy-mode cluster
+ --conf spark.sql.shuffle.partitions=7680
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ zenodo
+
+ ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']}
+ ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']}
+ ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['cc_file']}
+ ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['impulse_file']}
+ ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['ram_file']}
+
+ 7680
+
+ openaire
+
+ ${wfAppPath}/format_ranking_results.py#format_ranking_results.py
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+
+ ${nameNode}
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+
+
+ Openaire-DOI synonym collection
+
+ map_openaire_ids_to_dois.py
+
+ --executor-memory 18G --executor-cores 4 --driver-memory 15G
+ --master yarn
+ --deploy-mode cluster
+ --conf spark.sql.shuffle.partitions=7680
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ ${openaireDataInput}
+
+ ${synonymFolder}
+
+ ${wfAppPath}/map_openaire_ids_to_dois.py#map_openaire_ids_to_dois.py
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+
+ ${nameNode}
+
+
+
+ yarn-cluster
+ cluster
+
+
+ Mapping Openaire Scores to DOIs
+
+ map_scores_to_dois.py
+
+ --executor-memory 18G --executor-cores 4 --driver-memory 15G
+ --master yarn
+ --deploy-mode cluster
+ --conf spark.sql.shuffle.partitions=7680
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ ${synonymFolder}
+
+ 7680
+
+ ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']}
+ ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']}
+ ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['cc_file']}
+ ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['impulse_file']}
+ ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['ram_file']}
+
+
+ ${wfAppPath}/map_scores_to_dois.py#map_scores_to_dois.py
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ PageRank failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+ AttRank failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+ CC failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+ Impulse failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+ RAM failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+ Creation of openaire-graph failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+ Synonym collection failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+ Mapping scores to DOIs failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+