Merge pull request '9126-impact-indicators-wf-optimisation' (#471) from 9126-impact-indicators-wf-optimisation into beta
Reviewed-on: #471
This commit is contained in:
commit
4a9aeb6238
|
@ -1,63 +0,0 @@
|
||||||
#/usr/bin/bash
|
|
||||||
|
|
||||||
# Read log files from ranking scripts and create a two-line file
|
|
||||||
# with score limits for the various measures. To be used by Kleanthis
|
|
||||||
|
|
||||||
attrank_file=$(ls *attrank*.log);
|
|
||||||
pr_file=$(ls *pagerank*.log)
|
|
||||||
ram_file=$(ls *ram*.log);
|
|
||||||
cc_file=$(ls *cc*.log);
|
|
||||||
impulse_file=$(ls *impulse*.log);
|
|
||||||
|
|
||||||
echo
|
|
||||||
echo "-----------------------------"
|
|
||||||
echo "Attrank file:${attrank_file}";
|
|
||||||
echo "PageRank file:${pr_file}";
|
|
||||||
echo "RAM file:${ram_file}";
|
|
||||||
echo "CC file:${cc_file}";
|
|
||||||
echo "Impulse file:${impulse_file}";
|
|
||||||
echo "-----------------------------"
|
|
||||||
echo
|
|
||||||
echo
|
|
||||||
|
|
||||||
# output file will be called score_limits.csv
|
|
||||||
echo -e "influence_top001\tinfluence_top01\tinfluence_top1\tinfluence_top10\tpopularity_top001\tpopularity_top01\tpopularity_top1\tpopularity_top10\timpulse_top001\timpulse_top01\timpulse_top1\timpulse_top10\tcc_top001\tcc_top01\tcc_top1\tcc_top10" > score_limits.csv
|
|
||||||
# ---------------------------------------------------- #
|
|
||||||
# Get respective score limits (we don't need RAM)
|
|
||||||
inf_001=$(grep "^0.01%" ${pr_file} | cut -f 2);
|
|
||||||
inf_01=$(grep "^0.1%" ${pr_file} | cut -f 2);
|
|
||||||
inf_1=$(grep "^1%" ${pr_file} | cut -f 2);
|
|
||||||
inf_10=$(grep "^10%" ${pr_file} | cut -f 2);
|
|
||||||
echo "Influnence limits:"
|
|
||||||
echo -e "${inf_001}\t${inf_01}\t${inf_1}\t${inf_10}";
|
|
||||||
# ---------------------------------------------------- #
|
|
||||||
pop_001=$(grep "^0.01%" ${attrank_file} | cut -f 2);
|
|
||||||
pop_01=$(grep "^0.1%" ${attrank_file} | cut -f 2);
|
|
||||||
pop_1=$(grep "^1%" ${attrank_file} | cut -f 2);
|
|
||||||
pop_10=$(grep "^10%" ${attrank_file} | cut -f 2);
|
|
||||||
echo "Popularity limits:";
|
|
||||||
echo -e "${pop_001}\t${pop_01}\t${pop_1}\t${pop_10}";
|
|
||||||
# ---------------------------------------------------- #
|
|
||||||
imp_001=$(grep "^0.01%" ${impulse_file} | cut -f 2);
|
|
||||||
imp_01=$(grep "^0.1%" ${impulse_file} | cut -f 2);
|
|
||||||
imp_1=$(grep "^1%" ${impulse_file} | cut -f 2);
|
|
||||||
imp_10=$(grep "^10%" ${impulse_file} | cut -f 2);
|
|
||||||
echo "Popularity limits:";
|
|
||||||
echo -e "${imp_001}\t${imp_01}\t${imp_1}\t${imp_10}";
|
|
||||||
# ---------------------------------------------------- #
|
|
||||||
cc_001=$(grep "^0.01%" ${cc_file} | cut -f 2);
|
|
||||||
cc_01=$(grep "^0.1%" ${cc_file} | cut -f 2);
|
|
||||||
cc_1=$(grep "^1%" ${cc_file} | cut -f 2);
|
|
||||||
cc_10=$(grep "^10%" ${cc_file} | cut -f 2);
|
|
||||||
echo "Popularity limits:";
|
|
||||||
echo -e "${cc_001}\t${cc_01}\t${cc_1}\t${cc_10}";
|
|
||||||
# ---------------------------------------------------- #
|
|
||||||
|
|
||||||
echo -e "${inf_001}\t${inf_01}\t${inf_1}\t${inf_10}\t${pop_001}\t${pop_01}\t${pop_1}\t${pop_10}\t${imp_001}\t${imp_01}\t${imp_1}\t${imp_10}\t${cc_001}\t${cc_01}\t${cc_1}\t${cc_10}" >> score_limits.csv
|
|
||||||
|
|
||||||
echo
|
|
||||||
echo "score_limits.csv contents:"
|
|
||||||
cat score_limits.csv
|
|
||||||
|
|
||||||
echo;
|
|
||||||
echo;
|
|
|
@ -1,60 +0,0 @@
|
||||||
import json
|
|
||||||
import sys
|
|
||||||
from pyspark.sql import SparkSession
|
|
||||||
from pyspark import SparkConf, SparkContext
|
|
||||||
|
|
||||||
if len(sys.argv) != 3:
|
|
||||||
print("Usage: map_openaire_ids_to_dois.py <hdfs_src_dir> <hdfs_output_dir>")
|
|
||||||
sys.exit(-1)
|
|
||||||
|
|
||||||
conf = SparkConf().setAppName('BIP!: Map OpenAIRE IDs to DOIs')
|
|
||||||
sc = SparkContext(conf = conf)
|
|
||||||
spark = SparkSession.builder.appName('BIP!: Map OpenAIRE IDs to DOIs').getOrCreate()
|
|
||||||
sc.setLogLevel('OFF')
|
|
||||||
|
|
||||||
src_dir = sys.argv[1]
|
|
||||||
output = sys.argv[2]
|
|
||||||
|
|
||||||
# src_dir = "/tmp/beta_provision/graph/21_graph_cleaned/"
|
|
||||||
# output = '/tmp/openaireid_to_dois/'
|
|
||||||
|
|
||||||
def transform(doc):
|
|
||||||
|
|
||||||
# get publication year from 'doc.dateofacceptance.value'
|
|
||||||
dateofacceptance = doc.get('dateofacceptance', {}).get('value')
|
|
||||||
|
|
||||||
year = 0
|
|
||||||
|
|
||||||
if (dateofacceptance is not None):
|
|
||||||
year = dateofacceptance.split('-')[0]
|
|
||||||
|
|
||||||
# for each pid get 'pid.value' if 'pid.qualifier.classid' equals to 'doi'
|
|
||||||
dois = [ pid['value'] for pid in doc.get('pid', []) if (pid.get('qualifier', {}).get('classid') == 'doi' and pid['value'] is not None)]
|
|
||||||
|
|
||||||
num_dois = len(dois)
|
|
||||||
|
|
||||||
# exlcude openaire ids that do not correspond to DOIs
|
|
||||||
if (num_dois == 0):
|
|
||||||
return None
|
|
||||||
|
|
||||||
fields = [ doc['id'], str(num_dois), chr(0x02).join(dois), str(year) ]
|
|
||||||
|
|
||||||
return '\t'.join([ v.encode('utf-8') for v in fields ])
|
|
||||||
|
|
||||||
docs = None
|
|
||||||
|
|
||||||
for result_type in ["publication", "dataset", "software", "otherresearchproduct"]:
|
|
||||||
|
|
||||||
tmp = sc.textFile(src_dir + result_type).map(json.loads)
|
|
||||||
|
|
||||||
if (docs is None):
|
|
||||||
docs = tmp
|
|
||||||
else:
|
|
||||||
# append all result types in one RDD
|
|
||||||
docs = docs.union(tmp)
|
|
||||||
|
|
||||||
docs = docs.filter(lambda d: d.get('dataInfo', {}).get('deletedbyinference') == False and d.get('dataInfo', {}).get('invisible') == False)
|
|
||||||
|
|
||||||
docs = docs.map(transform).filter(lambda d: d is not None)
|
|
||||||
|
|
||||||
docs.saveAsTextFile(output)
|
|
|
@ -1,168 +0,0 @@
|
||||||
#!/usr/bin/python
|
|
||||||
# This program reads the openaire to doi mapping from the ${synonymFolder} of the workflow
|
|
||||||
# and uses this mapping to create doi-based score files in the format required by BiP! DB.
|
|
||||||
# This is done by reading each openaire-id based ranking file and joining the openaire based
|
|
||||||
# score and classes to all the corresponding dois.
|
|
||||||
#################################################################################################
|
|
||||||
# Imports
|
|
||||||
import sys
|
|
||||||
|
|
||||||
# Sparksession lib to communicate with cluster via session object
|
|
||||||
from pyspark.sql import SparkSession
|
|
||||||
|
|
||||||
# Import sql types to define schemas
|
|
||||||
from pyspark.sql.types import *
|
|
||||||
|
|
||||||
# Import sql functions with shorthand alias
|
|
||||||
import pyspark.sql.functions as F
|
|
||||||
|
|
||||||
from pyspark.sql.functions import max
|
|
||||||
# from pyspark.sql.functions import udf
|
|
||||||
#################################################################################################
|
|
||||||
#################################################################################################
|
|
||||||
# Clean up directory name - no longer needed in final workflow version
|
|
||||||
'''
|
|
||||||
def clean_directory_name(dir_name):
|
|
||||||
# We have a name with the form *_bip_universe<digits>_* or *_graph_universe<digits>_*
|
|
||||||
# and we need to keep the parts in *
|
|
||||||
|
|
||||||
|
|
||||||
dir_name_parts = dir_name.split('_')
|
|
||||||
dir_name_parts = [part for part in dir_name_parts if ('bip' not in part and 'graph' not in part and 'universe' not in part and 'from' not in part)]
|
|
||||||
|
|
||||||
dir_name = dir_name.replace("openaire_id_graph", "openaire_ids")
|
|
||||||
clean_name = dir_name + ".txt.gz"
|
|
||||||
|
|
||||||
# clean_name = '_'.join(dir_name_parts)
|
|
||||||
|
|
||||||
# if '_ids' not in clean_name:
|
|
||||||
# clean_name = clean_name.replace('id_', 'ids_')
|
|
||||||
|
|
||||||
# clean_name = clean_name.replace('.txt', '')
|
|
||||||
# clean_name = clean_name.replace('.gz', '')
|
|
||||||
|
|
||||||
# if 'openaire_ids_' in clean_name:
|
|
||||||
# clean_name = clean_name.replace('openaire_ids_', '')
|
|
||||||
# clean_name = clean_name + '.txt.gz'
|
|
||||||
# else:
|
|
||||||
# clean_name = clean_name + '.txt.gz'
|
|
||||||
|
|
||||||
return clean_name
|
|
||||||
'''
|
|
||||||
#################################################################################################
|
|
||||||
if len(sys.argv) < 3:
|
|
||||||
print ("Usage: ./map_scores_to_dois.py <synonym_folder> <num_partitions> <score_file_1> <score_file_2> <...etc...>")
|
|
||||||
sys.exit(-1)
|
|
||||||
|
|
||||||
# Read arguments
|
|
||||||
synonyms_folder = sys.argv[1]
|
|
||||||
num_partitions = int(sys.argv[2])
|
|
||||||
input_file_list = [argument.replace("_openaire_id_graph", "").replace("_openaire_id_graph_", "") + "_openaire_ids.txt.gz" for argument in sys.argv[3:]]
|
|
||||||
# input_file_list = [clean_directory_name(item) for item in input_file_list]
|
|
||||||
|
|
||||||
# Prepare output specific variables
|
|
||||||
output_file_list = [item.replace("_openaire_ids", "") for item in input_file_list]
|
|
||||||
output_file_list = [item + ".txt.gz" if not item.endswith(".txt.gz") else item for item in output_file_list]
|
|
||||||
|
|
||||||
# --- INFO MESSAGES --- #
|
|
||||||
print ("\n\n----------------------------")
|
|
||||||
print ("Mpping openaire ids to DOIs")
|
|
||||||
print ("Reading input from: " + synonyms_folder)
|
|
||||||
print ("Num partitions: " + str(num_partitions))
|
|
||||||
print ("Input files:" + " -- ".join(input_file_list))
|
|
||||||
print ("Output files: " + " -- ".join(output_file_list))
|
|
||||||
print ("----------------------------\n\n")
|
|
||||||
#######################################################################################
|
|
||||||
# We weill define the following schemas:
|
|
||||||
# --> the schema of the openaire - doi mapping file [string - int - doi_list] (the separator of the doi-list is a non printable character)
|
|
||||||
# --> a schema for floating point ranking scores [string - float - string] (the latter string is the class)
|
|
||||||
# --> a schema for integer ranking scores [string - int - string] (the latter string is the class)
|
|
||||||
|
|
||||||
float_schema = StructType([
|
|
||||||
StructField('id', StringType(), False),
|
|
||||||
StructField('score', FloatType(), False),
|
|
||||||
StructField('class', StringType(), False)
|
|
||||||
])
|
|
||||||
|
|
||||||
int_schema = StructType([
|
|
||||||
StructField('id', StringType(), False),
|
|
||||||
StructField('score', IntegerType(), False),
|
|
||||||
StructField('class', StringType(), False)
|
|
||||||
])
|
|
||||||
|
|
||||||
# This schema concerns the output of the file
|
|
||||||
# containing the number of references of each doi
|
|
||||||
synonyms_schema = StructType([
|
|
||||||
StructField('id', StringType(), False),
|
|
||||||
StructField('num_synonyms', IntegerType(), False),
|
|
||||||
StructField('doi_list', StringType(), False),
|
|
||||||
])
|
|
||||||
#######################################################################################
|
|
||||||
# Start spark session
|
|
||||||
spark = SparkSession.builder.appName('Map openaire scores to DOIs').getOrCreate()
|
|
||||||
# Set Log Level for spark session
|
|
||||||
spark.sparkContext.setLogLevel('WARN')
|
|
||||||
#######################################################################################
|
|
||||||
# MAIN Program
|
|
||||||
|
|
||||||
# Read and repartition the synonym folder - also cache it since we will need to perform multiple joins
|
|
||||||
synonym_df = spark.read.schema(synonyms_schema).option('delimiter', '\t').csv(synonyms_folder)
|
|
||||||
synonym_df = synonym_df.select('id', F.split(F.col('doi_list'), chr(0x02)).alias('doi_list'))
|
|
||||||
synonym_df = synonym_df.select('id', F.explode('doi_list').alias('doi')).repartition(num_partitions, 'id').cache()
|
|
||||||
|
|
||||||
# TESTING
|
|
||||||
# print ("Synonyms: " + str(synonym_df.count()))
|
|
||||||
# print ("DF looks like this:" )
|
|
||||||
# synonym_df.show(1000, False)
|
|
||||||
|
|
||||||
print ("\n\n-----------------------------")
|
|
||||||
# Now we need to join the score files on the openaire-id with the synonyms and then keep
|
|
||||||
# only doi - score - class and write this to the output
|
|
||||||
for offset, input_file in enumerate(input_file_list):
|
|
||||||
|
|
||||||
print ("Mapping scores from " + input_file)
|
|
||||||
|
|
||||||
# Select correct schema
|
|
||||||
schema = int_schema
|
|
||||||
if "attrank" in input_file.lower() or "pr" in input_file.lower() or "ram" in input_file.lower():
|
|
||||||
schema = float_schema
|
|
||||||
|
|
||||||
# Load file to dataframe
|
|
||||||
ranking_df = spark.read.schema(schema).option('delimiter', '\t').csv(input_file).repartition(num_partitions, 'id')
|
|
||||||
|
|
||||||
# Get max score
|
|
||||||
max_score = ranking_df.select(max('score').alias('max')).collect()[0]['max']
|
|
||||||
print ("Max Score for " + str(input_file) + " is " + str(max_score))
|
|
||||||
|
|
||||||
# TESTING
|
|
||||||
# print ("Loaded df sample:")
|
|
||||||
# ranking_df.show(1000, False)
|
|
||||||
|
|
||||||
# Join scores to synonyms and keep required fields
|
|
||||||
doi_score_df = synonym_df.join(ranking_df, ['id']).select('doi', 'score', 'class').repartition(num_partitions, 'doi').cache()
|
|
||||||
# Write output
|
|
||||||
output_file = output_file_list[offset]
|
|
||||||
print ("Writing to: " + output_file)
|
|
||||||
doi_score_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_file, compression='gzip')
|
|
||||||
|
|
||||||
# Creata another file for the bip update process
|
|
||||||
ranking_df = ranking_df.select('id', 'score', F.lit(F.col('score')/max_score).alias('normalized_score'), 'class', F.col('class').alias('class_dup'))
|
|
||||||
doi_score_df = synonym_df.join(ranking_df, ['id']).select('doi', 'score', 'normalized_score', 'class', 'class_dup').repartition(num_partitions, 'doi').cache()
|
|
||||||
output_file = output_file.replace(".txt.gz", "_for_bip_update.txt.gz")
|
|
||||||
print ("Writing bip update to: " + output_file)
|
|
||||||
doi_score_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_file, compression='gzip')
|
|
||||||
|
|
||||||
|
|
||||||
# Free memory?
|
|
||||||
ranking_df.unpersist(True)
|
|
||||||
|
|
||||||
print ("-----------------------------")
|
|
||||||
print ("\n\nFinished!\n\n")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -17,10 +17,6 @@
|
||||||
<name>openaireGraphInputPath</name>
|
<name>openaireGraphInputPath</name>
|
||||||
<value>${nameNode}/${workingDir}/openaire_id_graph</value>
|
<value>${nameNode}/${workingDir}/openaire_id_graph</value>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
|
||||||
<name>synonymFolder</name>
|
|
||||||
<value>${nameNode}/${workingDir}/openaireid_to_dois/</value>
|
|
||||||
</property>
|
|
||||||
<property>
|
<property>
|
||||||
<name>checkpointDir</name>
|
<name>checkpointDir</name>
|
||||||
<value>${nameNode}/${workingDir}/check/</value>
|
<value>${nameNode}/${workingDir}/check/</value>
|
||||||
|
@ -32,29 +28,34 @@
|
||||||
</configuration>
|
</configuration>
|
||||||
</global>
|
</global>
|
||||||
|
|
||||||
<!-- start using a decision node, so as to determine from which point onwards a job will continue -->
|
<!-- Start using a decision node, to determine from which point onwards a job will continue -->
|
||||||
<start to="entry-point-decision" />
|
<start to="entry-point-decision" />
|
||||||
|
|
||||||
<decision name="entry-point-decision">
|
<decision name="entry-point-decision">
|
||||||
<switch>
|
<switch>
|
||||||
<!-- The default will be set as the normal start, a.k.a. get-doi-synonyms -->
|
|
||||||
<!-- If any different condition is set, go to the corresponding start -->
|
<!-- Start from creating the citation network (i.e., normal execution should start from here) -->
|
||||||
|
<case to="create-openaire-ranking-graph">${wf:conf('resume') eq "start"}</case>
|
||||||
|
|
||||||
|
<!-- Different citation-based impact indicators are computed -->
|
||||||
<case to="spark-cc">${wf:conf('resume') eq "cc"}</case>
|
<case to="spark-cc">${wf:conf('resume') eq "cc"}</case>
|
||||||
<case to="spark-ram">${wf:conf('resume') eq "ram"}</case>
|
<case to="spark-ram">${wf:conf('resume') eq "ram"}</case>
|
||||||
<case to="spark-impulse">${wf:conf('resume') eq "impulse"}</case>
|
<case to="spark-impulse">${wf:conf('resume') eq "impulse"}</case>
|
||||||
<case to="spark-pagerank">${wf:conf('resume') eq "pagerank"}</case>
|
<case to="spark-pagerank">${wf:conf('resume') eq "pagerank"}</case>
|
||||||
<case to="spark-attrank">${wf:conf('resume') eq "attrank"}</case>
|
<case to="spark-attrank">${wf:conf('resume') eq "attrank"}</case>
|
||||||
<!-- <case to="iterative-rankings">${wf:conf('resume') eq "rankings-iterative"}</case> -->
|
|
||||||
|
<!-- Format the results appropriately before transforming them to action sets -->
|
||||||
<case to="get-file-names">${wf:conf('resume') eq "format-results"}</case>
|
<case to="get-file-names">${wf:conf('resume') eq "format-results"}</case>
|
||||||
<case to="map-openaire-to-doi">${wf:conf('resume') eq "map-ids"}</case>
|
|
||||||
<case to="map-scores-to-dois">${wf:conf('resume') eq "map-scores"}</case>
|
|
||||||
<case to="create-openaire-ranking-graph">${wf:conf('resume') eq "start"}</case>
|
|
||||||
|
|
||||||
<!-- Aggregation of impact scores on the project level -->
|
<!-- Aggregation of impact scores on the project level -->
|
||||||
<case to="project-impact-indicators">${wf:conf('resume') eq "projects-impact"}</case>
|
<case to="project-impact-indicators">${wf:conf('resume') eq "projects-impact"}</case>
|
||||||
|
|
||||||
|
<!-- Create action sets -->
|
||||||
<case to="create-actionset">${wf:conf('resume') eq "create-actionset"}</case>
|
<case to="create-actionset">${wf:conf('resume') eq "create-actionset"}</case>
|
||||||
|
|
||||||
|
<!-- The default will be set as the normal start, a.k.a. create-openaire-ranking-graph -->
|
||||||
<default to="create-openaire-ranking-graph" />
|
<default to="create-openaire-ranking-graph" />
|
||||||
|
|
||||||
</switch>
|
</switch>
|
||||||
</decision>
|
</decision>
|
||||||
|
|
||||||
|
@ -295,18 +296,11 @@
|
||||||
<capture-output/>
|
<capture-output/>
|
||||||
</shell>
|
</shell>
|
||||||
|
|
||||||
<ok to="format-result-files" />
|
<ok to="format-json-files" />
|
||||||
<error to="filename-getting-error" />
|
<error to="filename-getting-error" />
|
||||||
|
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<!-- Now we will run in parallel the formatting of ranking files for BiP! DB and openaire (json files) -->
|
|
||||||
<fork name="format-result-files">
|
|
||||||
<path start="format-bip-files"/>
|
|
||||||
<path start="format-json-files"/>
|
|
||||||
</fork>
|
|
||||||
|
|
||||||
|
|
||||||
<!-- Format json files -->
|
<!-- Format json files -->
|
||||||
<!-- Two parts: a) format files b) make the file endings .json.gz -->
|
<!-- Two parts: a) format files b) make the file endings .json.gz -->
|
||||||
<action name="format-json-files">
|
<action name="format-json-files">
|
||||||
|
@ -345,139 +339,8 @@
|
||||||
<file>${wfAppPath}/format_ranking_results.py#format_ranking_results.py</file>
|
<file>${wfAppPath}/format_ranking_results.py#format_ranking_results.py</file>
|
||||||
</spark>
|
</spark>
|
||||||
|
|
||||||
<ok to="join-file-formatting" />
|
|
||||||
<error to="json-formatting-fail" />
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<!-- This is the second line of parallel workflow execution where we create the BiP! DB files -->
|
|
||||||
<action name="format-bip-files">
|
|
||||||
<!-- This is required as a tag for spark jobs, regardless of programming language -->
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
|
|
||||||
<!-- using configs from an example on openaire -->
|
|
||||||
<master>yarn-cluster</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
|
|
||||||
<!-- This is the name of our job -->
|
|
||||||
<name>Format Ranking Results BiP! DB</name>
|
|
||||||
<!-- Script name goes here -->
|
|
||||||
<jar>format_ranking_results.py</jar>
|
|
||||||
<!-- spark configuration options: I've taken most of them from an example from dhp workflows / Master value stolen from sandro -->
|
|
||||||
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkNormalExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkNormalDriverMemory}
|
|
||||||
--conf spark.executor.memoryOverhead=${sparkNormalExecutorMemory}
|
|
||||||
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
</spark-opts>
|
|
||||||
|
|
||||||
<!-- Script arguments here -->
|
|
||||||
<arg>zenodo</arg>
|
|
||||||
<!-- Input files must be identified dynamically -->
|
|
||||||
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['pr_file']}</arg>
|
|
||||||
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['attrank_file']}</arg>
|
|
||||||
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['cc_file']}</arg>
|
|
||||||
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['impulse_file']}</arg>
|
|
||||||
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['ram_file']}</arg>
|
|
||||||
<!-- Num partitions -->
|
|
||||||
<arg>${sparkShufflePartitions}</arg>
|
|
||||||
<!-- Type of data to be produced [bip (dois) / openaire (openaire-ids) ] -->
|
|
||||||
<arg>openaire</arg>
|
|
||||||
<!-- This needs to point to the file on the hdfs i think -->
|
|
||||||
<file>${wfAppPath}/format_ranking_results.py#format_ranking_results.py</file>
|
|
||||||
</spark>
|
|
||||||
|
|
||||||
<ok to="join-file-formatting" />
|
|
||||||
<error to="bip-formatting-fail" />
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<!-- Finish formatting jobs -->
|
|
||||||
<join name="join-file-formatting" to="map-openaire-to-doi"/>
|
|
||||||
|
|
||||||
<!-- maps openaire ids to DOIs -->
|
|
||||||
<action name="map-openaire-to-doi">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
|
|
||||||
<!-- Delete previously created doi synonym folder -->
|
|
||||||
<prepare>
|
|
||||||
<delete path="${synonymFolder}"/>
|
|
||||||
</prepare>
|
|
||||||
|
|
||||||
<master>yarn-cluster</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Openaire-DOI synonym collection</name>
|
|
||||||
<jar>map_openaire_ids_to_dois.py</jar>
|
|
||||||
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkHighExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkHighDriverMemory}
|
|
||||||
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
|
|
||||||
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
</spark-opts>
|
|
||||||
|
|
||||||
<!-- Script arguments here -->
|
|
||||||
<arg>${openaireDataInput}/</arg>
|
|
||||||
<!-- number of partitions to be used on joins -->
|
|
||||||
<arg>${synonymFolder}</arg>
|
|
||||||
|
|
||||||
<file>${wfAppPath}/map_openaire_ids_to_dois.py#map_openaire_ids_to_dois.py</file>
|
|
||||||
</spark>
|
|
||||||
|
|
||||||
<ok to="map-scores-to-dois" />
|
|
||||||
<error to="synonym-collection-fail" />
|
|
||||||
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<!-- mapping openaire scores to DOIs -->
|
|
||||||
<action name="map-scores-to-dois">
|
|
||||||
<!-- This is required as a tag for spark jobs, regardless of programming language -->
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
|
|
||||||
<!-- using configs from an example on openaire -->
|
|
||||||
<master>yarn-cluster</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Mapping Openaire Scores to DOIs</name>
|
|
||||||
<jar>map_scores_to_dois.py</jar>
|
|
||||||
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkHighExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkHighDriverMemory}
|
|
||||||
--conf spark.executor.memoryOverhead=${sparkHighExecutorMemory}
|
|
||||||
--conf spark.sql.shuffle.partitions=${sparkShufflePartitions}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
</spark-opts>
|
|
||||||
|
|
||||||
<!-- Script arguments here -->
|
|
||||||
<arg>${synonymFolder}</arg>
|
|
||||||
<!-- Number of partitions -->
|
|
||||||
<arg>${sparkShufflePartitions}</arg>
|
|
||||||
<!-- The remaining input are the ranking files fproduced for bip db-->
|
|
||||||
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['pr_file']}</arg>
|
|
||||||
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['attrank_file']}</arg>
|
|
||||||
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['cc_file']}</arg>
|
|
||||||
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['impulse_file']}</arg>
|
|
||||||
<arg>${nameNode}/${workingDir}/${wf:actionData('get-file-names')['ram_file']}</arg>
|
|
||||||
|
|
||||||
<file>${wfAppPath}/map_scores_to_dois.py#map_scores_to_dois.py</file>
|
|
||||||
</spark>
|
|
||||||
|
|
||||||
<ok to="project-impact-indicators" />
|
<ok to="project-impact-indicators" />
|
||||||
<error to="map-scores-fail" />
|
<error to="json-formatting-fail" />
|
||||||
|
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<action name="project-impact-indicators">
|
<action name="project-impact-indicators">
|
||||||
|
@ -594,18 +457,6 @@
|
||||||
<message>Error formatting json files, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Error formatting json files, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
<kill name="bip-formatting-fail">
|
|
||||||
<message>Error formatting BIP files, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
|
||||||
</kill>
|
|
||||||
|
|
||||||
<kill name="synonym-collection-fail">
|
|
||||||
<message>Synonym collection failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
|
||||||
</kill>
|
|
||||||
|
|
||||||
<kill name="map-scores-fail">
|
|
||||||
<message>Mapping scores to DOIs failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
|
||||||
</kill>
|
|
||||||
|
|
||||||
<kill name="actionset-delete-fail">
|
<kill name="actionset-delete-fail">
|
||||||
<message>Deleting output path for actionsets failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Deleting output path for actionsets failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
Loading…
Reference in New Issue