Add dhp-impact-indicators workflow files

This commit is contained in:
Serafeim Chatzopoulos 2023-03-14 19:28:27 +02:00
parent c6e39b7f33
commit 720fd19b39
10 changed files with 1935 additions and 16 deletions

View File

@ -0,0 +1,23 @@
# Ranking Workflow for Openaire Publications
This project contains the files for running a paper ranking workflow on the openaire graph using apache oozie.
All scripts are written in python and the project setup follows the typical oozie workflow structure:
- a workflow.xml file containing the workflow specification
- a job.properties file specifying parameter values for the parameters used by the workflow
- a set of python scripts used by the workflow
**NOTE**: the workflow depends on the external library of ranking scripts called BiP! Ranker.
You can check out a specific tag/release of BIP! Ranker using maven, as described in the following section.
## Check out a specific tag/release of BIP-Ranker
* Edit the `scmVersion` of the maven-scm-plugin in the pom.xml to point to the tag/release version you want to check out.
* Then, use maven to perform the checkout:
```
mvn scm:checkout
```
* The code should be visible under `src/main/bip-ranker` folder.

View File

@ -1,13 +0,0 @@
## Checkout a specific release of the BIP-Ranker git repository
* Edit the `scmVersion` of the maven-scm-plugin in the pom.xml to point to the tag/release version you want to check out.
* Then perform the checkout with:
```
mvn scm:checkout
```
* The code should be visible under `src/main/bip-ranker` folder.

View File

@ -20,7 +20,7 @@
<scm>
<url>https://github.com/athenarc/Bip-Ranker</url>
<connection>https://github.com/athenarc/Bip-Ranker.git</connection>
<connection>scm:git:https://github.com/athenarc/Bip-Ranker.git</connection>
</scm>
<build>
@ -31,8 +31,8 @@
<version>1.8.1</version>
<configuration>
<connectionType>connection</connectionType>
<scmVersion>2</scmVersion>
<scmVersionType>tag</scmVersionType>
<scmVersionType>tag</scmVersionType><!-- 'branch' can also be provided here -->
<scmVersion>v1.0.0</scmVersion><!-- in case of scmVersionType == 'branch', this field points to the branch name -->
<checkoutDirectory>${project.build.directory}/../src/main/bip-ranker</checkoutDirectory>
</configuration>
</plugin>

View File

@ -0,0 +1,234 @@
#!/usr/bin/python3
# Create openaire id - openaire id graph from openaire data
#############################################################################################################
# Program proceeds as follows:
# 1. We read the input folder provided from hdfs.
# This contains subfolders with openaire graph objects and openaire graph relations
# 2. We select all openaire graph objects of interest. We filter out based on visibility
# and inference criteria. We also filter out based on the availability of publication year
# 3. Get reference type dataframes from openaire. Then filter each one of them based on the
# existence of citing and cited in the above filtered dataset. Get only citations
# produced by publication objects, or otherresearchproducts of types:
# [TBD]
# 4. Get objects that don't appear in the relations (from those gathered in step 1) and add
# them to the graph
# 5. Group relations by citing paper and do graph-specific formatting
#############################################################################################################
# ---------- Imports ------------- #
import sys
# import pyspark
# from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
# Functions to effectively handle data
# manipulation for DataFrames
import pyspark.sql.functions as F
# Diagnostics
from timeit import default_timer as timer
# from datetime import timedelta, datetime
# -------------------------------- #
if len(sys.argv) < 5:
print ("Usage: ./create_openaire_ranking_graph.py <openaire_graph_data_folder> <current_year> <num_partitions> <output_folder>")
sys.exit(0)
# Inputs will be:
# 1. Folder where openaire graph is stored
graph_folder = sys.argv[1]
# 2. Current year (this will be needed for filtering)
current_year = int(sys.argv[2])
# 3. Number of partitions
num_partitions = int(sys.argv[3])
# 4. where to write output
output_folder = sys.argv[4]
# Lists of results types we want to inclued in the citations
# valid_result_types = ['publication', 'other']
valid_result_types = ['publication']
# list of types in otherresearchproduct which are considered valid for citations
valid_other = ['']
# Create the spark session
spark = SparkSession.builder.appName('oa ranking graph creation').getOrCreate()
# Set context level logging to WARN
spark.sparkContext.setLogLevel("WARN")
############################################################################################################################
# 1. Get the research objects and filter based on conditions.
# These will also be the unique identifiers we should find in the final graph
# Initialize an empty dataframe
oa_objects_df = None
# There is a directory structure on hdfs under the provided path.
# We need to parse data from the folders: ["publication", "dataset", "software", "otherresearchproduct"]
# which are rankable oa result objects.
# Loop subfolders
for sub_folder in ["publication", "dataset", "software", "otherresearchproduct"]:
# Read the json data of the graph into a dataframe initially
if not oa_objects_df:
oa_objects_df = spark.read.json(graph_folder + "/" + sub_folder).select('id', 'resulttype.classname', 'datainfo.deletedbyinference', 'datainfo.invisible', F.year('dateofacceptance.value').alias('year'))
oa_objects_df = oa_objects_df.where( 'datainfo.deletedbyinference = false' ).where( 'datainfo.invisible = false' ).repartition(num_partitions, 'id').cache()
# If we already have data, simply add more to it
else:
sub_df = spark.read.json(graph_folder + "/" + sub_folder).select('id', 'resulttype.classname','datainfo.deletedbyinference', 'datainfo.invisible', F.year('dateofacceptance.value').alias('year'))
sub_df = sub_df.where( 'datainfo.deletedbyinference = false ' ).where( 'datainfo.invisible = false ').cache()
# Add the data to the openaire objects dataframe
oa_objects_df = oa_objects_df.union(sub_df).repartition(num_partitions, 'id').cache()
# Clear memory
sub_df.unpersist(True)
# Remove those records without year
oa_objects_df = oa_objects_df.where(F.col('year').isNotNull())
# Now replace years where > (current_year+1) with 0
oa_objects_df = oa_objects_df.withColumn('clean_year', F.when(F.col('year').cast('int') > (current_year+1), 0).otherwise(F.col('year')))\
.drop('year').withColumnRenamed('clean_year', 'year').repartition(num_partitions, 'id')
# -------------------------------------------------------------------- #
'''
# Some diagnostics
print ("Min and max years:" )
oa_objects_df.select(F.max('year')).show()
oa_objects_df.select(F.min('year')).show()
# This should be slow due to not repartitioning by year
print ("Distinct years:")
oa_objects_df.select('year').distinct().sort(F.col('year')).show(5000, False)
# Show distinct values of deletedbyinference and invisible to ensure we have the correct data
print ("Distinct deleted by inference:")
oa_objects_df.select('deletedbyinference').distinct().show()
print ("Distinct invisible values:")
oa_objects_df.select('invisible').distinct().show()
# Output total count
print ("Total num of research objects: " + str(oa_objects_df.count()))
'''
# -------------------------------------------------------------------- #
# Keep only required fields - we still keep resulttype.classname to
# filter the citation relationships we consider valid
oa_objects_df = oa_objects_df.drop('deletedbyinference').drop('invisible').distinct().cache()
############################################################################################################################
# 2. Get the relation objects and filter them based on their existence in the oa_objects_df
# NOTE: we are only interested in citations of type "cites"
# Further, we
# Deprecated line
# references_df = spark.read.json(graph_folder + "/relation").select(F.col('source').alias('citing'), F.col('target').alias('cited'), 'relClass')\
# .where( 'relClass = "References"' ).repartition(num_partitions, 'citing').drop('relClass')
# print ("References df has: " + str(references_df.count()) + " entries")
# Collect only valid citations i.e., invisible = false & deletedbyinference=false
cites_df = spark.read.json(graph_folder + "/relation")\
.select(F.col('source').alias('citing'), F.col('target').alias('cited'), 'relClass', 'dataInfo.deletedbyinference', 'dataInfo.invisible')\
.where( (F.col('relClass') == "Cites") \
& (F.col('dataInfo.deletedbyinference') == "false")\
& (F.col('dataInfo.invisible') == "false"))\
.drop('dataInfo.deletedbyinference').drop('dataInfo.invisible')\
.repartition(num_partitions, 'citing').drop('relClass')
# print ("Cited df has: " + str(cites_df.count()) + " entries")
# DEPRECATED
# cited_by_df = spark.read.json(graph_folder + "/relation").select(F.col('target').alias('citing'), F.col('source').alias('cited'), 'relClass')\
# .where( 'relClass = "IsCitedBy"' ).repartition(num_partitions, 'citing').drop('relClass')
# print ("Cited by df has: " + str(cited_by_df.count()) + " entries")
# DEPRECATED
# Keep only relations where citing and cited are in the oa_objects_df
# references_df = references_df.join(oa_objects_df.select('id'), references_df.citing == oa_objects_df.id).drop('id')
# references_df = references_df.repartition(num_partitions, 'cited').join(oa_objects_df.select('id'), references_df.cited == oa_objects_df.id).drop('id').distinct().repartition(num_partitions, 'citing').cache()
# print ("References df now has: " + str(references_df.count()) + " entries")
cites_df = cites_df.join(oa_objects_df.select('id'), cites_df.citing == oa_objects_df.id).where( F.col('resulttype.classname').isin(valid_result_types) ).drop('id').drop('resulttype.classname')
cites_df = cites_df.repartition(num_partitions, 'cited').join(oa_objects_df.select('id'), cites_df.cited == oa_objects_df.id).drop('id').drop('resulttype.classname').distinct().repartition(num_partitions, 'citing').cache()
# TODO: add here a clause filtering out the citations
# originating from "other" types of research objects which we consider valid
# print ("Cites df now has: " + str(cites_df.count()) + " entries")
# DEPRECATED
# cited_by_df = cited_by_df.join(oa_objects_df.select('id'), cited_by_df.citing == oa_objects_df.id).drop('id')
# cited_by_df = cited_by_df.repartition(num_partitions, 'cited').join(oa_objects_df.select('id'), cited_by_df.cited == oa_objects_df.id).drop('id').distinct().repartition(num_partitions, 'citing').cache()
# print ("Cited BY df now has: " + str(cited_by_df.count()) + " entries")
# DEPRECATED
# Join all the above into a single set
# citations_df = references_df.union(cites_df).distinct().repartition(num_partitions, 'citing').cache()
# Free space
# references_df.unpersist(True)
# cites_df.unpersist(True)
# citations_df = citations_df.union(cited_by_df).distinct().repartition(num_partitions, 'citing').cache()
# ALL citations we keep are in the cited_df dataframe
citations_df = cites_df
'''
# Show schema
print ("Citation schema:")
citations_df.printSchema()
print ("Objects schema:")
oa_objects_df.printSchema()
'''
# Free space
# cited_by_df.unpersist(True)
# Show total num of unique citations
num_unique_citations = citations_df.count()
print ("Total unique citations: " + str(num_unique_citations))
############################################################################################################################
# 3. Get any potentially missing 'citing' papers from references (these are dangling nodes w/o any outgoing references)
dangling_nodes = oa_objects_df.join(citations_df.select('citing').distinct(), citations_df.citing == oa_objects_df.id, 'left_anti')\
.select(F.col('id').alias('citing')).withColumn('cited', F.array([F.lit("0")])).repartition(num_partitions, 'citing')
# Count dangling nodes
dangling_num = dangling_nodes.count()
print ("Number of dangling nodes: " + str(dangling_num))
# print ("Dangling nodes sample:")
# dangling_nodes.show(10, False)
############################################################################################################################
# 4. Group the citation dataframe by citing doi, and create the cited dois list. Add dangling nodes to the result
graph = citations_df.groupBy('citing').agg(F.collect_set('cited').alias('cited')).repartition(num_partitions, 'citing').cache()
# Free space
citations_df.unpersist(True)
num_nodes = graph.count()
print ("Entries in graph before dangling nodes:" + str(num_nodes))
# print ("Sample in graph: ")
# graph.show(10, False)
# Add dangling nodes
graph = graph.union(dangling_nodes).repartition(num_partitions, 'citing')
# Count current number of results
num_nodes = graph.count()
print ("Num entries after adding dangling nodes: " + str(num_nodes))
# Add publication year
graph = graph.join(oa_objects_df, graph.citing == oa_objects_df.id).select('citing', 'cited', 'year').cache()
num_nodes_final = graph.count()
print ("After adding year: " + str(num_nodes_final))
# print ("Graph sample:")
# graph.show(20, False)
# Calculate initial score of nodes (1/N)
initial_score = float(1)/float(num_nodes_final)
############################################################################################################################
# 5. Write graph to output file!
print("Writing output to: " + output_folder)
graph.select('citing', F.concat_ws("|", F.concat_ws(",",'cited'), F.when(F.col('cited').getItem(1) != "0", F.size('cited')).otherwise(F.lit("0")), F.lit(str(initial_score)) ).alias('cited'), 'year').withColumn('prev_pr', F.lit("0")).select('citing', 'cited', 'prev_pr', 'year')\
.write.mode("overwrite").option("delimiter","\t").csv(output_folder, compression="gzip")
if num_nodes_final != num_nodes:
print ("WARNING: the number of nodes after keeping only nodes where year is available went from: " + str(num_nodes) + " to " + str(num_nodes_final) + "\n")
print ("Check for any mistakes...")
############################################################################################################################
print ("\nDONE!\n\n")
# Wrap up
spark.stop()

View File

@ -0,0 +1,770 @@
# This program reads hdfs directories containing ranking results from openaire's cluster.
# Based on the parameters provided by the user, it will create different types of output files.
# Modes available are:
# 1. bip
# This will result in output of the form required for bip-finder's update.
# Its lines conform to the following format:
# <doi> \t <pagerank> \t <pagerank_normalized> \t <attrank> \t <attrank_normalized> \t <citation_count> \t <citation_count_normalized> \t <3y_cc> \t <3y_cc_normalized> \t <tar_ram> \t <references_count>
# 2. zenodo
# This is the format used in zenodo for Bip-DB. (6 way classes will be named C1, C2, ..., C6)
# This should output two files per ranking method with each line having the following data:
# a. <id> <score> <6-way-class>
# NOTE: this should also run for openaire-id files, hence we should have a total of 4 files per ranking (2 for each type of identifier)
# In 'zenodo' mode the user specifies only a single file, for which zenodo-based output will be created
# 3. json
# This if the format used to provide openAIRE / claudio with data containing 1 json per identifier
# An example of such a json format follows:
#{
# "50|dedup_wf_001::08823c8f5c3ca2eae523817036cdda67": [
# {
# "id": "influence",
# "unit": [
# {
# "key": "score",
# "value": "5.06690394631e-09"
# },
# {
# "key": "class",
# "value": "C"
# }
# ]
# },
# {
# "id": "popularity_alt",
# "unit": [
# {
# "key": "score",
# "value": "0.0"
# },
# {
# "key": "class",
# "value": "C"
# }
# ]
# },
# {
# "id": "popularity",
# "unit": [
# {
# "key": "score",
# "value": "3.11855618382e-09"
# },
# {
# "key": "class",
# "value": "C"
# }
# ]
# },
# {
# "id": "influence_alt",
# "unit": [
# {
# "key": "score",
# "value": "0.0"
# },
# {
# "key": "class",
# "value": "C"
# }
# ]
# },
# {
# "id": "impulse",
# "unit": [
# {
# "key": "score",
# "value": "0.0"
# },
# {
# "key": "class",
# "value": "C"
# }
# ]
# }
# ]
#}
#################################################################################################
# Imports
import sys
import time
# Sparksession lib to communicate with cluster via session object
from pyspark.sql import SparkSession
# Import sql types to define the schema of score output files
from pyspark.sql.types import *
# Import sql functions with shorthand alias
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
# Json specific encoding
import json
#################################################################################################
# Clean up directory name
def clean_directory_name(dir_name):
# We have a name with the form *_bip_universe<digits>_* or *_graph_universe<digits>_*
# and we need to keep the parts in *
dir_name_parts = dir_name.split('_')
dir_name_parts = [part for part in dir_name_parts if ('bip' not in part and 'graph' not in part and 'universe' not in part and 'from' not in part)]
clean_name = '_'.join(dir_name_parts)
clean_name = clean_name.replace('_id', '_ids')
clean_name = clean_name.replace('.txt', '')
clean_name = clean_name.replace('.gz', '')
if 'openaire_ids_' in clean_name:
clean_name = clean_name.replace('openaire_ids_', '')
clean_name = clean_name + '_openaire_ids.txt.gz'
else:
clean_name = clean_name + '.txt.gz/'
return clean_name
# --------------------------------------------------------------------------------------------- #
# User defined function to escape special characters in a string that will turn into a json key
@udf(StringType())
def json_encode_key(doi_string):
return json.dumps(doi_string)
#################################################################################################
# --------------------------------------------------------------------------------------------- #
# Arguments from command line and initializations
# Time initialization
start_time = time.time()
# Check whether input is correct, otherwise exit with appropriate message
if len(sys.argv) < 2:
print ("Usage: ./format_ranking_results.py <mode> <input_file|input_file_list> <num_partitions>")
sys.exit(0)
# Define valid modes:
valid_modes = ['json', 'zenodo', 'bip', 'json-5-way']
# Read mode provided by user
mode = sys.argv[1].strip()
# If mode isn't valid, exit
if mode not in valid_modes:
print ("Usage: ./format_ranking_results.py <mode> <input_file|input_file_list> <num_partitions>\n")
print ("Invalid mode provided. Valid modes: ['zenodo', 'bip', 'json', 'json-5-way']")
sys.exit(0)
# Once here, we should be more or less okay to run.
# Define the spark session object
spark = SparkSession.builder.appName('Parse Scores - ' + str(mode) + ' mode').getOrCreate()
# Set Log Level for spark session
spark.sparkContext.setLogLevel('WARN')
# Here we define the schema shared by all score output files
# - citation count variants have a slightly different schema, due to their scores being integers
float_schema = StructType([
StructField('id', StringType(), False),
StructField('score', FloatType(), False),
StructField('normalized_score', FloatType(), False),
StructField('3-way-class', StringType(), False),
StructField('5-way-class', StringType(), False)
])
int_schema = StructType([
StructField('id', StringType(), False),
StructField('score', IntegerType(), False),
StructField('normalized_score', FloatType(), False),
StructField('3-way-class', StringType(), False),
StructField('5-way-class', StringType(), False)
])
# This schema concerns the output of the file
# containing the number of references of each doi
refs_schema = StructType([
StructField('id', StringType(), False),
StructField('num_refs', IntegerType(), False),
])
print("--- Initialization time: %s seconds ---" % (time.time() - start_time))
# --------------------------------------------------------------------------------------------- #
# Time the main program execution
start_time = time.time()
# The following is executed when the user requests the bip-update specific file
if mode == 'bip':
# Read the remaining input files
if len(sys.argv) < 8:
print ("\n\nInsufficient input for 'bip' mode.")
print ("File list required: <pagerank> <attrank> <citation count> <3-year citation count> <tar-ram> <number of references> <num_partitions>\n")
sys.exit(0)
# Read number of partitions:
num_partitions = int(sys.argv[-1])
pagerank_dir = sys.argv[2]
attrank_dir = sys.argv[3]
cc_dir = sys.argv[4]
impulse_dir = sys.argv[5]
ram_dir = sys.argv[6]
refs_dir = sys.argv[7]
# Score-specific dataframe
pagerank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(pagerank_dir).repartition(num_partitions, 'id')
attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(attrank_dir).repartition(num_partitions, 'id')
cc_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(cc_dir).repartition(num_partitions, 'id')
impulse_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(impulse_dir).repartition(num_partitions, 'id')
ram_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header', True).csv(ram_dir).repartition(num_partitions, 'id')
refs_df = spark.read.schema(refs_schema).option('delimiter', '\t').option('header',True).csv(refs_dir).repartition(num_partitions, 'id')
# ----------- TESTING CODE --------------- #
# pagerank_entries = pagerank_df.count()
# attrank_entries = attrank_df.count()
# cc_entries = cc_df.count()
# impulse_entries = impulse_df.count()
# ram_entries = ram_df.count()
# refs_entries = refs_df.count()
# print ("Pagerank:" + str(pagerank_entries))
# print ("AttRank:" + str(attrank_entries))
# print ("CC entries: " + str(cc_entries))
# print ("Impulse entries: " + str(impulse_entries))
# print ("Refs: " + str(refs_entries))
# ---------------------------------------- #
# Create a new dataframe with the required data
results_df = pagerank_df.select('id', F.col('score').alias('pagerank'), F.col('normalized_score').alias('pagerank_normalized'))
# Add attrank dataframe
results_df = results_df.join(attrank_df.select('id', 'score', 'normalized_score'), ['id'])\
.select(results_df.id, 'pagerank', 'pagerank_normalized', F.col('score').alias('attrank'), F.col('normalized_score').alias('attrank_normalized'))
# Add citation count dataframe
results_df = results_df.join(cc_df.select('id', 'score', 'normalized_score'), ['id'])\
.select(results_df.id, 'pagerank', 'pagerank_normalized', 'attrank', 'attrank_normalized', F.col('score').alias('cc'), F.col('normalized_score').alias('cc_normalized'))
# Add 3-year df
results_df = results_df.join(impulse_df.select('id', 'score', 'normalized_score'), ['id'])\
.select(results_df.id, 'pagerank', 'pagerank_normalized', 'attrank', 'attrank_normalized', 'cc', 'cc_normalized', \
F.col('score').alias('3-cc'), F.col('normalized_score').alias('3-cc_normalized'))
# Add ram df
results_df = results_df.join(ram_df.select('id', 'score'), ['id'])\
.select(results_df.id, 'pagerank', 'pagerank_normalized', 'attrank', 'attrank_normalized', 'cc', 'cc_normalized',\
'3-cc', '3-cc_normalized', F.col('score').alias('ram'))
# Add references
results_df = results_df.join(refs_df, ['id']).select(results_df.id, 'pagerank', 'pagerank_normalized', 'attrank', 'attrank_normalized', \
'cc', 'cc_normalized', '3-cc', '3-cc_normalized', 'ram', 'num_refs')
# Write resulting dataframe to file
output_dir = "/".join(pagerank_dir.split('/')[:-1])
output_dir = output_dir + '/bip_update_data.txt.gz'
print("Writing to:" + output_dir)
results_df.write.mode('overwrite').option('delimiter','\t').option('header',True).csv(output_dir, compression='gzip')
# The following is executed when the user requests the zenodo-specific file
elif mode == 'zenodo':
# Read the remaining input files
if len(sys.argv) < 9:
print ("\n\nInsufficient input for 'zenodo' mode.")
print ("File list required: <pagerank> <attrank> <citation count> <3-year citation count> <tar-ram> <num_partitions> <graph_type>\n")
sys.exit(0)
# Read number of partitions:
num_partitions = int(sys.argv[-2])
graph_type = sys.argv[-1]
if graph_type not in ['bip', 'openaire']:
graph_type = 'bip'
pagerank_dir = sys.argv[2]
attrank_dir = sys.argv[3]
cc_dir = sys.argv[4]
impulse_dir = sys.argv[5]
ram_dir = sys.argv[6]
# Output directory is common for all files
output_dir_prefix = "/".join(pagerank_dir.split('/')[:-1])
# Method-specific outputs
pagerank_output = clean_directory_name(pagerank_dir.split('/')[-1])
attrank_output = clean_directory_name(attrank_dir.split('/')[-1])
cc_output = clean_directory_name(cc_dir.split('/')[-1])
impulse_output = clean_directory_name(impulse_dir.split('/')[-1])
ram_output = clean_directory_name(ram_dir.split('/')[-1])
# --------- PageRank ----------- #
# Get per file the doi - score - 6-way classes and write it to output
print("Writing to: " + output_dir_prefix + '/' + pagerank_output)
pagerank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(pagerank_dir).repartition(num_partitions, 'id').select('id', 'score', '5-way-class')
# Replace dataframe class names
pagerank_df = pagerank_df.withColumn('class', F.lit('C6'))
pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('E'), F.lit('C5')).otherwise(F.col('class')) )
pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
pagerank_df = pagerank_df.drop('5-way-class')
if graph_type == 'openaire':
pagerank_df = pagerank_df.where( ~F.col('id').like('10.%') )
# Write output
pagerank_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_dir_prefix + '/' + pagerank_output, compression='gzip')
# --------- AttRank ----------- #
print("Writing to: " + output_dir_prefix + '/' + attrank_output)
attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(attrank_dir).repartition(num_partitions, 'id').select('id', 'score', '5-way-class')
# Replace dataframe class names
attrank_df = attrank_df.withColumn('class', F.lit('C6'))
attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('E'), F.lit('C5')).otherwise(F.col('class')) )
attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
attrank_df = attrank_df.drop('5-way-class')
if graph_type == 'openaire':
attrank_df = attrank_df.where( ~F.col('id').like('10.%') )
# Write output
attrank_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_dir_prefix + '/' + attrank_output, compression='gzip')
# --------- Citation Count ----------- #
print("Writing to: " + output_dir_prefix + '/' + cc_output)
cc_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(cc_dir).repartition(num_partitions, 'id').select('id', 'score', '5-way-class')
# Replace dataframe class names
cc_df = cc_df.withColumn('class', F.lit('C5'))
# cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('E'), F.lit('C5')).otherwise(F.col('class')) )
cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
cc_df = cc_df.drop('5-way-class')
if graph_type == 'openaire':
cc_df = cc_df.where( ~F.col('id').like('10.%') )
# Write output
cc_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_dir_prefix + '/' + cc_output, compression='gzip')
# --------- Impulse ----------- #
print("Writing to: " + output_dir_prefix + '/' + impulse_output)
impulse_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(impulse_dir).repartition(num_partitions, 'id').select('id', 'score', '5-way-class')
# Replace dataframe class names
impulse_df = impulse_df.withColumn('class', F.lit('C5'))
# impulse_df = impulse_df.withColumn('class', F.when(F.col('6-way-class') == F.lit('E'), F.lit('C5')).otherwise(F.col('class')) )
impulse_df = impulse_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
impulse_df = impulse_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
impulse_df = impulse_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
impulse_df = impulse_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
impulse_df = impulse_df.drop('5-way-class')
if graph_type == 'openaire':
impulse_df = impulse_df.where( ~F.col('id').like('10.%') )
# Write output
impulse_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_dir_prefix + '/' + impulse_output, compression='gzip')
# --------- RAM ----------- #
print("Writing to: " + output_dir_prefix + '/' + ram_output)
ram_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header', True).csv(ram_dir).repartition(num_partitions, 'id').select('id', 'score', '5-way-class')
# Replace dataframe class names
ram_df = ram_df.withColumn('class', F.lit('C5'))
# ram_df = ram_df.withColumn('class', F.when(F.col('6-way-class') == F.lit('E'), F.lit('C5')).otherwise(F.col('class')) )
ram_df = ram_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
ram_df = ram_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
ram_df = ram_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
ram_df = ram_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
ram_df = ram_df.drop('5-way-class')
if graph_type == 'openaire':
ram_df = ram_df.where( ~F.col('id').like('10.%') )
# Write output
ram_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_dir_prefix + '/' + ram_output, compression='gzip')
# The following produces the json file required by openaire
elif mode == 'json':
# Read the remaining input files
if len(sys.argv) < 9:
print ("\n\nInsufficient input for 'json' mode.")
print ("File list required: <pagerank> <attrank> <citation count> <3-year citation count> <tar-ram> <num_partitions> <graph_type>\n")
sys.exit(0)
# Read number of partitions:
num_partitions = int(sys.argv[-2])
graph_type = sys.argv[-1]
if graph_type not in ['bip', 'openaire']:
graph_type = 'bip'
print ("Graph type: " + str(graph_type))
# File directories
pagerank_dir = sys.argv[2]
attrank_dir = sys.argv[3]
cc_dir = sys.argv[4]
impulse_dir = sys.argv[5]
ram_dir = sys.argv[6]
print ("Reading files:")
print (pagerank_dir)
print (attrank_dir)
print (cc_dir)
print (impulse_dir)
print (ram_dir)
# Score-specific dataframe - read inputs
pagerank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(pagerank_dir).repartition(num_partitions, 'id')
attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',False).csv(attrank_dir).repartition(num_partitions, 'id')
cc_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(cc_dir).repartition(num_partitions, 'id')
impulse_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(impulse_dir).repartition(num_partitions, 'id')
ram_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header', True).csv(ram_dir).repartition(num_partitions, 'id')
# --- Join the data of the various scores --- #
# Create json data for pagerank
pagerank_df = pagerank_df.select('id', F.map_concat(
F.create_map(F.lit('key'), F.lit('score')),
F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
F.map_concat(
F.create_map(F.lit('key'), F.lit('class')),
F.create_map(F.lit('value'), F.col('3-way-class'))).alias('class_map'))
pagerank_df = pagerank_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('influence_values') )
pagerank_df = pagerank_df.select('id', F.create_map(F.lit('id'), F.lit('influence')).alias('id_map'), F.col('influence_values'))
pagerank_df = pagerank_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('influence'))).alias('influence_key'), F.to_json(F.col('influence_values')).alias('influence_values') )
pagerank_df = pagerank_df.select('id', F.expr('substring(influence_key, 0, length(influence_key)-1)').alias('influence_key'), 'influence_values')
pagerank_df = pagerank_df.select('id', 'influence_key', F.expr('substring(influence_values, 2, length(influence_values))').alias('influence_values'))
pagerank_df = pagerank_df.select('id', F.concat_ws(', ', F.col('influence_key'), F.col('influence_values')).alias('influence_json'))
# Create json data for attrank
attrank_df = attrank_df.select('id', F.map_concat(
F.create_map(F.lit('key'), F.lit('score')),
F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
F.map_concat(
F.create_map(F.lit('key'), F.lit('class')),
F.create_map(F.lit('value'), F.col('3-way-class'))).alias('class_map'))
attrank_df = attrank_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('popularity_values') )
attrank_df = attrank_df.select('id', F.create_map(F.lit('id'), F.lit('popularity')).alias('id_map'), F.col('popularity_values'))
attrank_df = attrank_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('popularity'))).alias('popularity_key'), F.to_json(F.col('popularity_values')).alias('popularity_values') )
attrank_df = attrank_df.select('id', F.expr('substring(popularity_key, 0, length(popularity_key)-1)').alias('popularity_key'), 'popularity_values')
attrank_df = attrank_df.select('id', 'popularity_key', F.expr('substring(popularity_values, 2, length(popularity_values))').alias('popularity_values'))
attrank_df = attrank_df.select('id', F.concat_ws(', ', F.col('popularity_key'), F.col('popularity_values')).alias('popularity_json'))
# Create json data for CC
cc_df = cc_df.select('id', F.map_concat(
F.create_map(F.lit('key'), F.lit('score')),
F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
F.map_concat(
F.create_map(F.lit('key'), F.lit('class')),
F.create_map(F.lit('value'), F.col('3-way-class'))).alias('class_map'))
cc_df = cc_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('influence_alt_values') )
cc_df = cc_df.select('id', F.create_map(F.lit('id'), F.lit('influence_alt')).alias('id_map'), F.col('influence_alt_values'))
cc_df = cc_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('influence_alt'))).alias('influence_alt_key'), F.to_json(F.col('influence_alt_values')).alias('influence_alt_values') )
cc_df = cc_df.select('id', F.expr('substring(influence_alt_key, 0, length(influence_alt_key)-1)').alias('influence_alt_key'), 'influence_alt_values')
cc_df = cc_df.select('id', 'influence_alt_key', F.expr('substring(influence_alt_values, 2, length(influence_alt_values))').alias('influence_alt_values'))
cc_df = cc_df.select('id', F.concat_ws(', ', F.col('influence_alt_key'), F.col('influence_alt_values')).alias('influence_alt_json'))
# Create json data for RAM
ram_df = ram_df.select('id', F.map_concat(
F.create_map(F.lit('key'), F.lit('score')),
F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
F.map_concat(
F.create_map(F.lit('key'), F.lit('class')),
F.create_map(F.lit('value'), F.col('3-way-class'))).alias('class_map'))
ram_df = ram_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('popularity_alt_values') )
ram_df = ram_df.select('id', F.create_map(F.lit('id'), F.lit('popularity_alt')).alias('id_map'), F.col('popularity_alt_values'))
ram_df = ram_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('popularity_alt'))).alias('popularity_alt_key'), F.to_json(F.col('popularity_alt_values')).alias('popularity_alt_values') )
ram_df = ram_df.select('id', F.expr('substring(popularity_alt_key, 0, length(popularity_alt_key)-1)').alias('popularity_alt_key'), 'popularity_alt_values')
ram_df = ram_df.select('id', 'popularity_alt_key', F.expr('substring(popularity_alt_values, 2, length(popularity_alt_values))').alias('popularity_alt_values'))
ram_df = ram_df.select('id', F.concat_ws(', ', F.col('popularity_alt_key'), F.col('popularity_alt_values')).alias('popularity_alt_json'))
# Create json data for impulse
impulse_df = impulse_df.select('id', F.map_concat(
F.create_map(F.lit('key'), F.lit('score')),
F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
F.map_concat(
F.create_map(F.lit('key'), F.lit('class')),
F.create_map(F.lit('value'), F.col('3-way-class'))).alias('class_map'))
impulse_df = impulse_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('impulse_values') )
impulse_df = impulse_df.select('id', F.create_map(F.lit('id'), F.lit('impulse')).alias('id_map'), F.col('impulse_values'))
impulse_df = impulse_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('impulse'))).alias('impulse_key'), F.to_json(F.col('impulse_values')).alias('impulse_values') )
impulse_df = impulse_df.select('id', F.expr('substring(impulse_key, 0, length(impulse_key)-1)').alias('impulse_key'), 'impulse_values')
impulse_df = impulse_df.select('id', 'impulse_key', F.expr('substring(impulse_values, 2, length(impulse_values))').alias('impulse_values'))
impulse_df = impulse_df.select('id', F.concat_ws(', ', F.col('impulse_key'), F.col('impulse_values')).alias('impulse_json'))
#Join dataframes together
results_df = pagerank_df.join(attrank_df, ['id'])
results_df = results_df.join(cc_df, ['id'])
results_df = results_df.join(ram_df, ['id'])
results_df = results_df.join(impulse_df, ['id'])
print ("Json encoding DOI keys")
# Json encode doi strings
results_df = results_df.select(json_encode_key('id').alias('id'), 'influence_json', 'popularity_json', 'influence_alt_json', 'popularity_alt_json', 'impulse_json')
# Concatenate individual json columns
results_df = results_df.select('id', F.concat_ws(', ', F.col('influence_json'), F.col('popularity_json'), F.col('influence_alt_json'), F.col('popularity_alt_json'), F.col('impulse_json') ).alias('json_data'))
results_df = results_df.select('id', F.concat_ws('', F.lit('['), F.col('json_data'), F.lit(']')).alias('json_data') )
# Filter out non-openaire ids if need
if graph_type == 'openaire':
results_df = results_df.where( ~F.col('id').like('"10.%') )
# Concatenate paper id and add opening and ending brackets
results_df = results_df.select(F.concat_ws('', F.lit('{'), F.col('id'), F.lit(': '), F.col('json_data'), F.lit('}')).alias('json') )
# -------------------------------------------- #
# Write json output - set the directory here
output_dir = "/".join(pagerank_dir.split('/')[:-1])
if graph_type == 'bip':
output_dir = output_dir + '/bip_universe_doi_scores/'
else:
output_dir = output_dir + '/openaire_universe_scores/'
# Write the dataframe
print ("Writing output to: " + output_dir)
results_df.write.mode('overwrite').option('header', False).text(output_dir, compression='gzip')
# Rename the files to .json.gz now
sc = spark.sparkContext
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
# Get master prefix from input file path
master_prefix = "/".join(pagerank_dir.split('/')[:5])
fs = FileSystem.get(URI(master_prefix), sc._jsc.hadoopConfiguration())
path = Path(output_dir)
print ("Path is:" + path.toString())
file_list = fs.listStatus(Path(output_dir))
print ("Renaming files:")
for f in file_list:
initial_filename = f.getPath().toString()
if "part" in initial_filename:
print (initial_filename + " => " + initial_filename.replace(".txt.gz", ".json.gz"))
fs.rename(Path(initial_filename), Path(initial_filename.replace(".txt.gz", ".json.gz")))
'''
DEPRECATED:
# -------------------------------------------- #
# Write json output
output_dir = "/".join(pagerank_dir.split('/')[:-1])
if graph_type == 'bip':
output_dir = output_dir + '/bip_universe_doi_scores_txt/'
else:
output_dir = output_dir + '/openaire_universe_scores_txt/'
print ("Writing output to: " + output_dir)
results_df.write.mode('overwrite').option('header', False).text(output_dir, compression='gzip')
print ("Done writing first results")
# Read results df as json and write it as json file
print ("Reading json input from: " + str(output_dir))
resulds_df_json = spark.read.json(output_dir).cache()
# Write json to different dir
print ("Writing json output to: " + output_dir.replace("_txt", ""))
resulds_df_json.write.mode('overwrite').json(output_dir.replace("_txt", ""), compression='gzip')
'''
# The following produces the json file required by openaire
elif mode == 'json-5-way':
# Read the remaining input files
if len(sys.argv) < 9:
print ("\n\nInsufficient input for 'json-5-way' mode.")
print ("File list required: <pagerank> <attrank> <citation count> <3-year citation count> <tar-ram> <num_partitions> <graph_type>\n")
sys.exit(0)
# Read number of partitions:
num_partitions = int(sys.argv[-2])
graph_type = sys.argv[-1]
if graph_type not in ['bip', 'openaire']:
graph_type = 'bip'
# File directories
pagerank_dir = sys.argv[2]
attrank_dir = sys.argv[3]
cc_dir = sys.argv[4]
impulse_dir = sys.argv[5]
ram_dir = sys.argv[6]
# Score-specific dataframe - read inputs
pagerank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(pagerank_dir).repartition(num_partitions, 'id')
attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',False).csv(attrank_dir).repartition(num_partitions, 'id')
cc_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(cc_dir).repartition(num_partitions, 'id')
impulse_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(impulse_dir).repartition(num_partitions, 'id')
ram_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header', True).csv(ram_dir).repartition(num_partitions, 'id')
# --- Join the data of the various scores --- #
# Replace 6-way classes with 5-way values
pagerank_df = pagerank_df.withColumn('class', F.lit('C5'))
pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
pagerank_df = pagerank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
pagerank_df = pagerank_df.drop('5-way-class').withColumnRenamed('class', '5-way-class')
# Create json data for pagerank
pagerank_df = pagerank_df.select('id', F.map_concat(
F.create_map(F.lit('key'), F.lit('score')),
F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
F.map_concat(
F.create_map(F.lit('key'), F.lit('class')),
F.create_map(F.lit('value'), F.col('5-way-class'))).alias('class_map'))
pagerank_df = pagerank_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('influence_values') )
pagerank_df = pagerank_df.select('id', F.create_map(F.lit('id'), F.lit('influence')).alias('id_map'), F.col('influence_values'))
pagerank_df = pagerank_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('influence'))).alias('influence_key'), F.to_json(F.col('influence_values')).alias('influence_values') )
pagerank_df = pagerank_df.select('id', F.expr('substring(influence_key, 0, length(influence_key)-1)').alias('influence_key'), 'influence_values')
pagerank_df = pagerank_df.select('id', 'influence_key', F.expr('substring(influence_values, 2, length(influence_values))').alias('influence_values'))
pagerank_df = pagerank_df.select('id', F.concat_ws(', ', F.col('influence_key'), F.col('influence_values')).alias('influence_json'))
# Replace 6-way classes with 5 way classes for attrank
attrank_df = attrank_df.withColumn('class', F.lit('C5'))
attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
attrank_df = attrank_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
attrank_df = attrank_df.drop('5-way-class').withColumnRenamed('class', '5-way-class')
# Create json data for attrank
attrank_df = attrank_df.select('id', F.map_concat(
F.create_map(F.lit('key'), F.lit('score')),
F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
F.map_concat(
F.create_map(F.lit('key'), F.lit('class')),
F.create_map(F.lit('value'), F.col('5-way-class'))).alias('class_map'))
attrank_df = attrank_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('popularity_values') )
attrank_df = attrank_df.select('id', F.create_map(F.lit('id'), F.lit('popularity')).alias('id_map'), F.col('popularity_values'))
attrank_df = attrank_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('popularity'))).alias('popularity_key'), F.to_json(F.col('popularity_values')).alias('popularity_values') )
attrank_df = attrank_df.select('id', F.expr('substring(popularity_key, 0, length(popularity_key)-1)').alias('popularity_key'), 'popularity_values')
attrank_df = attrank_df.select('id', 'popularity_key', F.expr('substring(popularity_values, 2, length(popularity_values))').alias('popularity_values'))
attrank_df = attrank_df.select('id', F.concat_ws(', ', F.col('popularity_key'), F.col('popularity_values')).alias('popularity_json'))
# Replace 6-way classes with 5 way classes for attrank
cc_df = cc_df.withColumn('class', F.lit('C5'))
cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
cc_df = cc_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
cc_df = cc_df.drop('5-way-class').withColumnRenamed('class', '5-way-class')
# Create json data for CC
cc_df = cc_df.select('id', F.map_concat(
F.create_map(F.lit('key'), F.lit('score')),
F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
F.map_concat(
F.create_map(F.lit('key'), F.lit('class')),
F.create_map(F.lit('value'), F.col('5-way-class'))).alias('class_map'))
cc_df = cc_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('influence_alt_values') )
cc_df = cc_df.select('id', F.create_map(F.lit('id'), F.lit('influence_alt')).alias('id_map'), F.col('influence_alt_values'))
cc_df = cc_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('influence_alt'))).alias('influence_alt_key'), F.to_json(F.col('influence_alt_values')).alias('influence_alt_values') )
cc_df = cc_df.select('id', F.expr('substring(influence_alt_key, 0, length(influence_alt_key)-1)').alias('influence_alt_key'), 'influence_alt_values')
cc_df = cc_df.select('id', 'influence_alt_key', F.expr('substring(influence_alt_values, 2, length(influence_alt_values))').alias('influence_alt_values'))
cc_df = cc_df.select('id', F.concat_ws(', ', F.col('influence_alt_key'), F.col('influence_alt_values')).alias('influence_alt_json'))
# Replace 6-way classes with 5 way classes for attrank
ram_df = ram_df.withColumn('class', F.lit('C5'))
ram_df = ram_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
ram_df = ram_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
ram_df = ram_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
ram_df = ram_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
ram_df = ram_df.drop('5-way-class').withColumnRenamed('class', '5-way-class')
# Create json data for RAM
ram_df = ram_df.select('id', F.map_concat(
F.create_map(F.lit('key'), F.lit('score')),
F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
F.map_concat(
F.create_map(F.lit('key'), F.lit('class')),
F.create_map(F.lit('value'), F.col('5-way-class'))).alias('class_map'))
ram_df = ram_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('popularity_alt_values') )
ram_df = ram_df.select('id', F.create_map(F.lit('id'), F.lit('popularity_alt')).alias('id_map'), F.col('popularity_alt_values'))
ram_df = ram_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('popularity_alt'))).alias('popularity_alt_key'), F.to_json(F.col('popularity_alt_values')).alias('popularity_alt_values') )
ram_df = ram_df.select('id', F.expr('substring(popularity_alt_key, 0, length(popularity_alt_key)-1)').alias('popularity_alt_key'), 'popularity_alt_values')
ram_df = ram_df.select('id', 'popularity_alt_key', F.expr('substring(popularity_alt_values, 2, length(popularity_alt_values))').alias('popularity_alt_values'))
ram_df = ram_df.select('id', F.concat_ws(', ', F.col('popularity_alt_key'), F.col('popularity_alt_values')).alias('popularity_alt_json'))
# Replace 6-way classes with 5 way classes for attrank
impulse_df = impulse_df.withColumn('class', F.lit('C5'))
impulse_df = impulse_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('D'), F.lit('C4')).otherwise(F.col('class')) )
impulse_df = impulse_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('C'), F.lit('C3')).otherwise(F.col('class')) )
impulse_df = impulse_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('B'), F.lit('C2')).otherwise(F.col('class')) )
impulse_df = impulse_df.withColumn('class', F.when(F.col('5-way-class') == F.lit('A'), F.lit('C1')).otherwise(F.col('class')) )
impulse_df = impulse_df.drop('5-way-class').withColumnRenamed('class', '5-way-class')
# Create json data for impulse
impulse_df = impulse_df.select('id', F.map_concat(
F.create_map(F.lit('key'), F.lit('score')),
F.create_map(F.lit('value'), F.col('score'))).alias('score_map'),
F.map_concat(
F.create_map(F.lit('key'), F.lit('class')),
F.create_map(F.lit('value'), F.col('5-way-class'))).alias('class_map'))
impulse_df = impulse_df.select('id', F.create_map(F.lit('unit'), F.array([F.col('score_map'), F.col('class_map')]) ).alias('impulse_values') )
impulse_df = impulse_df.select('id', F.create_map(F.lit('id'), F.lit('impulse')).alias('id_map'), F.col('impulse_values'))
impulse_df = impulse_df.select('id', F.to_json(F.create_map(F.lit('id'), F.lit('impulse'))).alias('impulse_key'), F.to_json(F.col('impulse_values')).alias('impulse_values') )
impulse_df = impulse_df.select('id', F.expr('substring(impulse_key, 0, length(impulse_key)-1)').alias('impulse_key'), 'impulse_values')
impulse_df = impulse_df.select('id', 'impulse_key', F.expr('substring(impulse_values, 2, length(impulse_values))').alias('impulse_values'))
impulse_df = impulse_df.select('id', F.concat_ws(', ', F.col('impulse_key'), F.col('impulse_values')).alias('impulse_json'))
#Join dataframes together
results_df = pagerank_df.join(attrank_df, ['id'])
results_df = results_df.join(cc_df, ['id'])
results_df = results_df.join(ram_df, ['id'])
results_df = results_df.join(impulse_df, ['id'])
print ("Json encoding DOI keys")
# Json encode doi strings
results_df = results_df.select(json_encode_key('id').alias('id'), 'influence_json', 'popularity_json', 'influence_alt_json', 'popularity_alt_json', 'impulse_json')
# Concatenate individual json columns
results_df = results_df.select('id', F.concat_ws(', ', F.col('influence_json'), F.col('popularity_json'), F.col('influence_alt_json'), F.col('popularity_alt_json'), F.col('impulse_json') ).alias('json_data'))
results_df = results_df.select('id', F.concat_ws('', F.lit('['), F.col('json_data'), F.lit(']')).alias('json_data') )
# Filter out non-openaire ids if need
if graph_type == 'openaire':
results_df = results_df.where( ~F.col('id').like('10.%') )
# Concatenate paper id and add opening and ending brackets
results_df = results_df.select(F.concat_ws('', F.lit('{'), F.col('id'), F.lit(': '), F.col('json_data'), F.lit('}')).alias('json') )
# TEST output and count
# results_df.show(20, False)
# print ("Results #" + str(results_df.count()))
# -------------------------------------------- #
# Write json output
output_dir = "/".join(pagerank_dir.split('/')[:-1])
if graph_type == 'bip':
output_dir = output_dir + '/bip_universe_doi_scores_5_classes/'
else:
output_dir = output_dir + '/openaire_universe_scores_5_classes/'
print ("Writing output to: " + output_dir)
results_df.write.mode('overwrite').option('header', False).text(output_dir, compression='gzip')
# Close spark session
spark.stop()
print("--- Main program execution time: %s seconds ---" % (time.time() - start_time))
print("--- Finished --- \n\n")

View File

@ -0,0 +1,14 @@
ranking_results_folder=$1;
pr_file=`hdfs dfs -ls ${ranking_results_folder}/ | grep "/PR_.*" | grep -o "PR.*"`;
attrank_file=`hdfs dfs -ls ${ranking_results_folder}/ | grep "/AttRank.*" | grep -o "AttRank.*"`;
cc_file=`hdfs dfs -ls ${ranking_results_folder}/ | grep "/CC_.*" | grep -o "CC.*"`;
impulse_file=`hdfs dfs -ls ${ranking_results_folder}/ | grep "/3-year_.*" | grep -o "3-year.*"`;
ram_file=`hdfs dfs -ls ${ranking_results_folder}/ | grep "/RAM_.*" | grep -o "RAM.*"`;
echo "pr_file=${pr_file}";
echo "attrank_file=${attrank_file}";
echo "cc_file=${cc_file}";
echo "impulse_file=${impulse_file}";
echo "ram_file=${ram_file}";
# echo "TEST=`hdfs dfs -ls ${ranking_results_folder}/`";

View File

@ -0,0 +1,86 @@
# The following set of properties are defined in https://support.openaire.eu/projects/openaire/wiki/Hadoop_clusters
# and concern the parameterization required for running workflows on the @GARR cluster
dhp.hadoop.frontend.temp.dir=/home/ilias.kanellos
dhp.hadoop.frontend.user.name=ilias.kanellos
dhp.hadoop.frontend.host.name=iis-cdh5-test-gw.ocean.icm.edu.pl
dhp.hadoop.frontend.port.ssh=22
oozieServiceLoc=http://iis-cdh5-test-m3:11000/oozie
jobTracker=yarnRM
nameNode=hdfs://nameservice1
oozie.execution.log.file.location = target/extract-and-run-on-remote-host.log
maven.executable=mvn
sparkDriverMemory=7G
sparkExecutorMemory=7G
sparkExecutorCores=4
# The above is given differently in an example I found online
oozie.action.sharelib.for.spark=spark2
oozieActionShareLibForSpark2=spark2
spark2YarnHistoryServerAddress=http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089
spark2EventLogDir=/user/spark/spark2ApplicationHistory
sparkSqlWarehouseDir=/user/hive/warehouse
hiveMetastoreUris=thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
# This MAY avoid the no library used error
oozie.use.system.libpath=true
# Some stuff copied from openaire's jobs
spark2ExtraListeners=com.cloudera.spark.lineage.NavigatorAppListener
spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener
# Some stuff copied from openaire's jobs
spark2ExtraListeners=com.cloudera.spark.lineage.NavigatorAppListener
spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener
# ------------------------------------------------------------------------------ #
# The following set of properties are my own custom ones
# Based on the page linked to at the start of the file, if we use yarn as a resource manager, its address is given as follows
resourceManager=http://iis-cdh5-test-m2.ocean.icm.edu.pl:8088/cluster
# current year used when creating graph / by some ranking methods
currentYear=2024
# Alpha value for pagerank
pageRankAlpha=0.5
# AttRank values
attrankAlpha=0.2
attrankBeta=0.5
attrankGamma=0.3
attrankRho=-0.16
# attrankCurrentYear=2023
attrankStartYear=2021
# Ram values
ramGamma=0.6
# ramCurrentYear=2023
# Convergence error for pagerank
convergenceError=0.000000000001
# I think this should be the oozie workflow directory
oozieWorkflowPath=user/ilias.kanellos/workflow_example/
# The directory where the workflow data is/should be stored
workflowDataDir=user/ilias.kanellos/ranking_workflow
# Directory where dataframes are checkpointed
checkpointDir=${nameNode}/${workflowDataDir}/check/
# The directory for the doi-based bip graph
bipGraphFilePath=${nameNode}/${workflowDataDir}/bipdbv8_graph
# The folder from which synonyms of openaire-ids are read
# openaireDataInput=${nameNode}/tmp/beta_provision/graph/21_graph_cleaned/
openaireDataInput=${/tmp/prod_provision/graph/18_graph_blacklisted}
# A folder where we will write the openaire to doi mapping
synonymFolder=${nameNode}/${workflowDataDir}/openaireid_to_dois/
# This will be where we store the openaire graph input. They told us on GARR to use a directory under /data
openaireGraphInputPath=${nameNode}/${workflowDataDir}/openaire_id_graph
# The workflow application path
wfAppPath=${nameNode}/${oozieWorkflowPath}
# The following is needed as a property of a workflow
oozie.wf.application.path=${wfAppPath}

View File

@ -0,0 +1,60 @@
import json
import sys
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
if len(sys.argv) != 3:
print("Usage: map_openaire_ids_to_dois.py <hdfs_src_dir> <hdfs_output_dir>")
sys.exit(-1)
conf = SparkConf().setAppName('BIP!: Map OpenAIRE IDs to DOIs')
sc = SparkContext(conf = conf)
spark = SparkSession.builder.appName('BIP!: Map OpenAIRE IDs to DOIs').getOrCreate()
sc.setLogLevel('OFF')
src_dir = sys.argv[1]
output = sys.argv[2]
# src_dir = "/tmp/beta_provision/graph/21_graph_cleaned/"
# output = '/tmp/openaireid_to_dois/'
def transform(doc):
# get publication year from 'doc.dateofacceptance.value'
dateofacceptance = doc.get('dateofacceptance', {}).get('value')
year = 0
if (dateofacceptance is not None):
year = dateofacceptance.split('-')[0]
# for each pid get 'pid.value' if 'pid.qualifier.classid' equals to 'doi'
dois = [ pid['value'] for pid in doc.get('pid', []) if (pid.get('qualifier', {}).get('classid') == 'doi' and pid['value'] is not None)]
num_dois = len(dois)
# exlcude openaire ids that do not correspond to DOIs
if (num_dois == 0):
return None
fields = [ doc['id'], str(num_dois), chr(0x02).join(dois), str(year) ]
return '\t'.join([ v.encode('utf-8') for v in fields ])
docs = None
for result_type in ["publication", "dataset", "software", "otherresearchproduct"]:
tmp = sc.textFile(src_dir + result_type).map(json.loads)
if (docs is None):
docs = tmp
else:
# append all result types in one RDD
docs = docs.union(tmp)
docs = docs.filter(lambda d: d.get('dataInfo', {}).get('deletedbyinference') == False and d.get('dataInfo', {}).get('invisible') == False)
docs = docs.map(transform).filter(lambda d: d is not None)
docs.saveAsTextFile(output)

View File

@ -0,0 +1,145 @@
# This program reads the openaire to doi mapping from the ${synonymFolder} of the workflow
# and uses this mapping to create doi-based score files in the format required by BiP! DB.
# This is done by reading each openaire-id based ranking file and joining the openaire based
# score and classes to all the corresponding dois.
#################################################################################################
# Imports
import sys
# Sparksession lib to communicate with cluster via session object
from pyspark.sql import SparkSession
# Import sql types to define schemas
from pyspark.sql.types import *
# Import sql functions with shorthand alias
import pyspark.sql.functions as F
# from pyspark.sql.functions import udf
#################################################################################################
#################################################################################################
# Clean up directory name
def clean_directory_name(dir_name):
# We have a name with the form *_bip_universe<digits>_* or *_graph_universe<digits>_*
# and we need to keep the parts in *
dir_name_parts = dir_name.split('_')
dir_name_parts = [part for part in dir_name_parts if ('bip' not in part and 'graph' not in part and 'universe' not in part and 'from' not in part)]
clean_name = '_'.join(dir_name_parts)
if '_ids' not in clean_name:
clean_name = clean_name.replace('id_', 'ids_')
# clean_name = clean_name.replace('.txt', '')
# clean_name = clean_name.replace('.gz', '')
if 'openaire_ids_' in clean_name:
clean_name = clean_name.replace('openaire_ids_', '')
# clean_name = clean_name + '.txt.gz'
# else:
# clean_name = clean_name + '.txt.gz'
return clean_name
#################################################################################################
if len(sys.argv) < 3:
print ("Usage: ./map_scores_to_dois.py <synonym_folder> <num_partitions> <score_file_1> <score_file_2> <...etc...>")
sys.exit(-1)
# Read arguments
synonyms_folder = sys.argv[1]
num_partitions = int(sys.argv[2])
input_file_list = [argument for argument in sys.argv[3:]]
input_file_list = [clean_directory_name(item) for item in input_file_list]
# Prepare output specific variables
output_file_list = [item.replace("_openaire_ids", "") for item in input_file_list]
output_file_list = [item + ".gz" if not item.endswith(".gz") else item for item in output_file_list]
# --- INFO MESSAGES --- #
print ("\n\n----------------------------")
print ("Mpping openaire ids to DOIs")
print ("Reading input from: " + synonyms_folder)
print ("Num partitions: " + str(num_partitions))
print ("Input files:" + " -- ".join(input_file_list))
print ("Output files: " + " -- ".join(output_file_list))
print ("----------------------------\n\n")
#######################################################################################
# We weill define the following schemas:
# --> the schema of the openaire - doi mapping file [string - int - doi_list] (the separator of the doi-list is a non printable character)
# --> a schema for floating point ranking scores [string - float - string] (the latter string is the class)
# --> a schema for integer ranking scores [string - int - string] (the latter string is the class)
float_schema = StructType([
StructField('id', StringType(), False),
StructField('score', FloatType(), False),
StructField('class', StringType(), False)
])
int_schema = StructType([
StructField('id', StringType(), False),
StructField('score', IntegerType(), False),
StructField('class', StringType(), False)
])
# This schema concerns the output of the file
# containing the number of references of each doi
synonyms_schema = StructType([
StructField('id', StringType(), False),
StructField('num_synonyms', IntegerType(), False),
StructField('doi_list', StringType(), False),
])
#######################################################################################
# Start spark session
spark = SparkSession.builder.appName('Map openaire scores to DOIs').getOrCreate()
# Set Log Level for spark session
spark.sparkContext.setLogLevel('WARN')
#######################################################################################
# MAIN Program
# Read and repartition the synonym folder - also cache it since we will need to perform multiple joins
synonym_df = spark.read.schema(synonyms_schema).option('delimiter', '\t').csv(synonyms_folder)
synonym_df = synonym_df.select('id', F.split(F.col('doi_list'), chr(0x02)).alias('doi_list'))
synonym_df = synonym_df.select('id', F.explode('doi_list').alias('doi')).repartition(num_partitions, 'id').cache()
# TESTING
# print ("Synonyms: " + str(synonym_df.count()))
# print ("DF looks like this:" )
# synonym_df.show(1000, False)
print ("\n\n-----------------------------")
# Now we need to join the score files on the openaire-id with the synonyms and then keep
# only doi - score - class and write this to the output
for offset, input_file in enumerate(input_file_list):
print ("Mapping scores from " + input_file)
# Select correct schema
schema = int_schema
if "attrank" in input_file.lower() or "pr" in input_file.lower() or "ram" in input_file.lower():
schema = float_schema
# Load file to dataframe
ranking_df = spark.read.schema(schema).option('delimiter', '\t').csv(input_file).repartition(num_partitions, 'id')
# TESTING
# print ("Loaded df sample:")
# ranking_df.show(1000, False)
# Join scores to synonyms and keep required fields
doi_score_df = synonym_df.join(ranking_df, ['id']).select('doi', 'score', 'class').repartition(num_partitions, 'doi').cache()
# Write output
output_file = output_file_list[offset]
print ("Writing to: " + output_file)
doi_score_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_file, compression='gzip')
# Free memory?
ranking_df.unpersist(True)
print ("-----------------------------")
print ("\n\nFinished!\n\n")

View File

@ -0,0 +1,600 @@
<workflow-app xmlns="uri:oozie:workflow:0.5" name="ranking-wf">
<!-- start using a decision node, so as to determine from which point onwards a job will continue -->
<!-- <start to="get-doi-synonyms" /> -->
<start to="entry-point-decision" />
<decision name="entry-point-decision">
<switch>
<!-- The default will be set as the normal start, a.k.a. get-doi-synonyms -->
<!-- If any different condition is set, go to the corresponding start -->
<case to="non-iterative-rankings">${resume eq "rankings-start"}</case>
<case to="spark-impulse">${resume eq "impulse"}</case>
<case to="iterative-rankings">${resume eq "rankings-iterative"}</case>
<case to="get-file-names">${resume eq "format-results"}</case>
<case to="map-openaire-to-doi">${resume eq "map-ids"}</case>
<case to="map-scores-to-dois">${resume eq "map-scores"}</case>
<case to="create-openaire-ranking-graph">${resume eq "start"}</case>
<!-- TODO: add action set creation here -->
<default to="create-openaire-ranking-graph" />
</switch>
</decision>
<!-- Script here written by Serafeim: maps openaire ids to their synonyms -->
<action name="create-openaire-ranking-graph">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- Delete previously created doi synonym folder -->
<prepare>
<delete path="${synonymFolder}"/>
</prepare>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
<mode>cluster</mode>
<!-- This is the name of our job -->
<name>Openaire Ranking Graph Creation</name>
<!-- Script name goes here -->
<jar>create_openaire_ranking_graph.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 20G --executor-cores 4 --driver-memory 20G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
<!-- Script arguments here -->
<!-- The openaire graph data from which to read relations and objects -->
<arg>${openaireDataInput}</arg>
<!-- Year for filtering entries w/ larger values / empty -->
<arg>${currentYear}</arg>
<!-- number of partitions to be used on joins -->
<arg>7680</arg>
<!-- The output of the graph should be the openaire input graph for ranking-->
<arg>${openaireGraphInputPath}</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/create_openaire_ranking_graph.py#create_openaire_ranking_graph.py</file>
</spark>
<!-- Do this after finishing okay -->
<ok to="non-iterative-rankings" />
<!-- Go there if we have an error -->
<error to="openaire-graph-error" />
</action>
<!-- Citation Count and RAM are calculated in parallel-->
<!-- Impulse Requires resources and will be run after-->
<fork name="non-iterative-rankings">
<path start="spark-cc"/>
<!-- <path start="spark-impulse"/> -->
<path start="spark-ram"/>
</fork>
<!-- CC here -->
<action name="spark-cc">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
<mode>cluster</mode>
<!-- This is the name of our job -->
<name>Spark CC</name>
<!-- Script name goes here -->
<jar>CC.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 18G --executor-cores 4 --driver-memory 10G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
<!-- Script arguments here -->
<arg>${openaireGraphInputPath}</arg>
<!-- number of partitions to be used on joins -->
<arg>7680</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/CC.py#CC.py</file>
</spark>
<!-- Do this after finishing okay -->
<ok to="join-non-iterative-rankings" />
<!-- Go there if we have an error -->
<error to="cc-fail" />
</action>
<!-- IMPULSE here -->
<action name="spark-ram">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
<mode>cluster</mode>
<!-- This is the name of our job -->
<name>Spark RAM</name>
<!-- Script name goes here -->
<jar>TAR.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 18G --executor-cores 4 --driver-memory 10G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
<!-- Script arguments here -->
<arg>${openaireGraphInputPath}</arg>
<arg>${ramGamma}</arg>
<arg>${currentYear}</arg>
<arg>RAM</arg>
<!-- number of partitions to be used on joins -->
<arg>7680</arg>
<arg>${γιτ α}</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/TAR.py#TAR.py</file>
</spark>
<!-- Do this after finishing okay -->
<ok to="join-non-iterative-rankings" />
<!-- Go there if we have an error -->
<error to="ram-fail" />
</action>
<!-- JOIN NON-ITERATIVE METHODS AND THEN CONTINUE TO ITERATIVE ONES -->
<join name="join-non-iterative-rankings" to="spark-impulse"/>
<!-- IMPULSE here -->
<action name="spark-impulse">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
<mode>cluster</mode>
<!-- This is the name of our job -->
<name>Spark Impulse</name>
<!-- Script name goes here -->
<jar>CC.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 18G --executor-cores 4 --driver-memory 10G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
<!-- Script arguments here -->
<arg>${openaireGraphInputPath}</arg>
<!-- number of partitions to be used on joins -->
<arg>7680</arg>
<arg>3</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/CC.py#CC.py</file>
</spark>
<!-- Do this after finishing okay -->
<ok to="iterative-rankings" />
<!-- Go there if we have an error -->
<error to="impulse-fail" />
</action>
<fork name="iterative-rankings">
<path start="spark-pagerank"/>
<path start="spark-attrank"/>
</fork>
<!-- PAGERANK here -->
<action name="spark-pagerank">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- we could add map-reduce configs here, but I don't know if we need them -->
<!-- This is the type of master-client configuration for running spark -->
<!-- <master>yarn-client</master> -->
<!-- Reference says: The master element indicates the url of the Spark Master. Ex: spark://host:port, mesos://host:port, yarn-cluster, yarn-master, or local. -->
<!-- <master>local[*]</master> -->
<!-- Reference says: The mode element if present indicates the mode of spark, where to run spark driver program. Ex: client,cluster. | In my case I always have a client -->
<!-- <mode>client</mode> -->
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
<mode>cluster</mode>
<!-- This is the name of our job -->
<name>Spark Pagerank</name>
<!-- Script name goes here -->
<jar>PageRank.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 18G --executor-cores 4 --driver-memory 10G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
<!-- Script arguments here -->
<arg>${openaireGraphInputPath}</arg>
<arg>${pageRankAlpha}</arg>
<arg>${convergenceError}</arg>
<arg>${checkpointDir}</arg>
<!-- number of partitions to be used on joins -->
<arg>7680</arg>
<arg>dfs</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/PageRank.py#PageRank.py</file>
</spark>
<!-- Do this after finishing okay -->
<ok to="join-iterative-rankings" />
<!-- Go there if we have an error -->
<error to="pagerank-fail" />
</action>
<!-- ATTRANK here -->
<action name="spark-attrank">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
<mode>cluster</mode>
<!-- This is the name of our job -->
<name>Spark AttRank</name>
<!-- Script name goes here -->
<jar>AttRank.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 18G --executor-cores 4 --driver-memory 10G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
<!-- Script arguments here -->
<arg>${openaireGraphInputPath}</arg>
<arg>${attrankAlpha}</arg>
<arg>${attrankBeta}</arg>
<arg>${attrankGamma}</arg>
<arg>${attrankRho}</arg>
<arg>${currentYear}</arg>
<arg>${attrankStartYear}</arg>
<arg>${convergenceError}</arg>
<arg>${checkpointDir}</arg>
<!-- number of partitions to be used on joins -->
<arg>7680</arg>
<arg>dfs</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/AttRank.py#AttRank.py</file>
</spark>
<!-- Do this after finishing okay -->
<ok to="join-iterative-rankings" />
<!-- Go there if we have an error -->
<error to="attrank-fail" />
</action>
<!-- JOIN ITERATIVE METHODS AND THEN END -->
<join name="join-iterative-rankings" to="get-file-names"/>
<!-- This will be a shell action that will output key-value pairs for output files -->
<action name="get-file-names">
<!-- This is required as a tag for shell jobs -->
<shell xmlns="uri:oozie:shell-action:0.3">
<!-- Same for all -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs -->
<name-node>${nameNode}</name-node>
<!-- Exec is needed foor shell comands - points to type of shell command -->
<exec>/usr/bin/bash</exec>
<!-- name of script to run -->
<argument>get_ranking_files.sh</argument>
<!-- We only pass the directory where we expect to find the rankings -->
<argument>/${workflowDataDir}</argument>
<!-- the name of the file run -->
<file>${wfAppPath}/get_ranking_files.sh#get_ranking_files.sh</file>
<!-- Get the output in order to be usable by following actions -->
<capture-output/>
</shell>
<!-- Do this after finishing okay -->
<ok to="format-result-files" />
<!-- Go there if we have an error -->
<error to="filename-getting-error" />
</action>
<!-- Now we will run in parallel the formatting of ranking files for BiP! DB and openaire (json files) -->
<fork name="format-result-files">
<path start="format-bip-files"/>
<path start="format-json-files"/>
</fork>
<!-- Format json files -->
<!-- Two parts: a) format files b) make the file endings .json.gz -->
<action name="format-json-files">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
<mode>cluster</mode>
<!-- This is the name of our job -->
<name>Format Ranking Results JSON</name>
<!-- Script name goes here -->
<jar>format_ranking_results.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 10G --executor-cores 4 --driver-memory 10G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
<!-- Script arguments here -->
<arg>json</arg>
<!-- Input files must be identified dynamically -->
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['cc_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['impulse_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['ram_file']}</arg>
<!-- Num partitions -->
<arg>7680</arg>
<!-- Type of data to be produced [bip (dois) / openaire (openaire-ids) ] -->
<arg>openaire</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/format_ranking_results.py#format_ranking_results.py</file>
</spark>
<!-- Do this after finishing okay -->
<ok to="join-file-formatting" />
<!-- Go there if we have an error -->
<error to="json-formatting-fail" />
</action>
<!-- This is the second line of parallel workflow execution where we create the BiP! DB files -->
<action name="format-bip-files">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
<mode>cluster</mode>
<!-- This is the name of our job -->
<name>Format Ranking Results BiP! DB</name>
<!-- Script name goes here -->
<jar>format_ranking_results.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 10G --executor-cores 4 --driver-memory 10G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
<!-- Script arguments here -->
<arg>zenodo</arg>
<!-- Input files must be identified dynamically -->
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['cc_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['impulse_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['ram_file']}</arg>
<!-- Num partitions -->
<arg>7680</arg>
<!-- Type of data to be produced [bip (dois) / openaire (openaire-ids) ] -->
<arg>openaire</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/format_ranking_results.py#format_ranking_results.py</file>
</spark>
<!-- Do this after finishing okay -->
<ok to="join-file-formatting" />
<!-- Go there if we have an error -->
<error to="bip-formatting-fail" />
</action>
<!-- Finish formatting data and end -->
<join name="join-file-formatting" to="map-openaire-to-doi"/>
<!-- Script here written by Serafeim: maps openaire ids to their synonyms -->
<action name="map-openaire-to-doi">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- Delete previously created doi synonym folder -->
<prepare>
<delete path="${synonymFolder}"/>
</prepare>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
<mode>cluster</mode>
<!-- This is the name of our job -->
<name>Openaire-DOI synonym collection</name>
<!-- Script name goes here -->
<jar>map_openaire_ids_to_dois.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 18G --executor-cores 4 --driver-memory 15G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
<!-- Script arguments here -->
<arg>${openaireDataInput}</arg>
<!-- number of partitions to be used on joins -->
<arg>${synonymFolder}</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/map_openaire_ids_to_dois.py#map_openaire_ids_to_dois.py</file>
</spark>
<!-- Do this after finishing okay -->
<ok to="map-scores-to-dois" />
<!-- Go there if we have an error -->
<error to="synonym-collection-fail" />
</action>
<!-- Script here written by Serafeim: maps openaire ids to their synonyms -->
<action name="map-scores-to-dois">
<!-- This is required as a tag for spark jobs, regardless of programming language -->
<spark xmlns="uri:oozie:spark-action:0.2">
<!-- Is this yarn? Probably the answers are at the link serafeim sent me -->
<job-tracker>${jobTracker}</job-tracker>
<!-- This should give the machine/root of the hdfs, serafeim has provided a link with the required job properties -->
<name-node>${nameNode}</name-node>
<!-- using configs from an example on openaire -->
<master>yarn-cluster</master>
<mode>cluster</mode>
<!-- This is the name of our job -->
<name>Mapping Openaire Scores to DOIs</name>
<!-- Script name goes here -->
<jar>map_scores_to_dois.py</jar>
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
<spark-opts>--executor-memory 18G --executor-cores 4 --driver-memory 15G
--master yarn
--deploy-mode cluster
--conf spark.sql.shuffle.partitions=7680
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
<!-- Script arguments here -->
<arg>${synonymFolder}</arg>
<!-- Number of partitions -->
<arg>7680</arg>
<!-- The remaining input are the ranking files fproduced for bip db-->
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['cc_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['impulse_file']}</arg>
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['ram_file']}</arg>
<!-- This needs to point to the file on the hdfs i think -->
<file>${wfAppPath}/map_scores_to_dois.py#map_scores_to_dois.py</file>
</spark>
<!-- Do this after finishing okay -->
<ok to="end" />
<!-- Go there if we have an error -->
<error to="map-scores-fail" />
</action>
<!-- TODO: end the workflow-->
<!-- Define ending node -->
<end name="end" />
<!-- Definitions of failure messages -->
<kill name="pagerank-fail">
<message>PageRank failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="attrank-fail">
<message>AttRank failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="cc-fail">
<message>CC failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="impulse-fail">
<message>Impulse failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="ram-fail">
<message>RAM failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="openaire-graph-error">
<message>Creation of openaire-graph failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="synonym-collection-fail">
<message>Synonym collection failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="map-scores-fail">
<message>Mapping scores to DOIs failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
</workflow-app>