diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/get_score_limits.sh b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/get_score_limits.sh deleted file mode 100644 index 6d4161d7ff..0000000000 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/get_score_limits.sh +++ /dev/null @@ -1,63 +0,0 @@ -#/usr/bin/bash - -# Read log files from ranking scripts and create a two-line file -# with score limits for the various measures. To be used by Kleanthis - -attrank_file=$(ls *attrank*.log); -pr_file=$(ls *pagerank*.log) -ram_file=$(ls *ram*.log); -cc_file=$(ls *cc*.log); -impulse_file=$(ls *impulse*.log); - -echo -echo "-----------------------------" -echo "Attrank file:${attrank_file}"; -echo "PageRank file:${pr_file}"; -echo "RAM file:${ram_file}"; -echo "CC file:${cc_file}"; -echo "Impulse file:${impulse_file}"; -echo "-----------------------------" -echo -echo - -# output file will be called score_limits.csv -echo -e "influence_top001\tinfluence_top01\tinfluence_top1\tinfluence_top10\tpopularity_top001\tpopularity_top01\tpopularity_top1\tpopularity_top10\timpulse_top001\timpulse_top01\timpulse_top1\timpulse_top10\tcc_top001\tcc_top01\tcc_top1\tcc_top10" > score_limits.csv -# ---------------------------------------------------- # -# Get respective score limits (we don't need RAM) -inf_001=$(grep "^0.01%" ${pr_file} | cut -f 2); -inf_01=$(grep "^0.1%" ${pr_file} | cut -f 2); -inf_1=$(grep "^1%" ${pr_file} | cut -f 2); -inf_10=$(grep "^10%" ${pr_file} | cut -f 2); -echo "Influnence limits:" -echo -e "${inf_001}\t${inf_01}\t${inf_1}\t${inf_10}"; -# ---------------------------------------------------- # -pop_001=$(grep "^0.01%" ${attrank_file} | cut -f 2); -pop_01=$(grep "^0.1%" ${attrank_file} | cut -f 2); -pop_1=$(grep "^1%" ${attrank_file} | cut -f 2); -pop_10=$(grep "^10%" ${attrank_file} | cut -f 2); -echo "Popularity limits:"; -echo -e "${pop_001}\t${pop_01}\t${pop_1}\t${pop_10}"; -# ---------------------------------------------------- # -imp_001=$(grep "^0.01%" ${impulse_file} | cut -f 2); -imp_01=$(grep "^0.1%" ${impulse_file} | cut -f 2); -imp_1=$(grep "^1%" ${impulse_file} | cut -f 2); -imp_10=$(grep "^10%" ${impulse_file} | cut -f 2); -echo "Popularity limits:"; -echo -e "${imp_001}\t${imp_01}\t${imp_1}\t${imp_10}"; -# ---------------------------------------------------- # -cc_001=$(grep "^0.01%" ${cc_file} | cut -f 2); -cc_01=$(grep "^0.1%" ${cc_file} | cut -f 2); -cc_1=$(grep "^1%" ${cc_file} | cut -f 2); -cc_10=$(grep "^10%" ${cc_file} | cut -f 2); -echo "Popularity limits:"; -echo -e "${cc_001}\t${cc_01}\t${cc_1}\t${cc_10}"; -# ---------------------------------------------------- # - -echo -e "${inf_001}\t${inf_01}\t${inf_1}\t${inf_10}\t${pop_001}\t${pop_01}\t${pop_1}\t${pop_10}\t${imp_001}\t${imp_01}\t${imp_1}\t${imp_10}\t${cc_001}\t${cc_01}\t${cc_1}\t${cc_10}" >> score_limits.csv - -echo -echo "score_limits.csv contents:" -cat score_limits.csv - -echo; -echo; diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_openaire_ids_to_dois.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_openaire_ids_to_dois.py deleted file mode 100644 index 7997eec82c..0000000000 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_openaire_ids_to_dois.py +++ /dev/null @@ -1,60 +0,0 @@ -import json -import sys -from pyspark.sql import SparkSession -from pyspark import SparkConf, SparkContext - -if len(sys.argv) != 3: - print("Usage: map_openaire_ids_to_dois.py ") - sys.exit(-1) - -conf = SparkConf().setAppName('BIP!: Map OpenAIRE IDs to DOIs') -sc = SparkContext(conf = conf) -spark = SparkSession.builder.appName('BIP!: Map OpenAIRE IDs to DOIs').getOrCreate() -sc.setLogLevel('OFF') - -src_dir = sys.argv[1] -output = sys.argv[2] - -# src_dir = "/tmp/beta_provision/graph/21_graph_cleaned/" -# output = '/tmp/openaireid_to_dois/' - -def transform(doc): - - # get publication year from 'doc.dateofacceptance.value' - dateofacceptance = doc.get('dateofacceptance', {}).get('value') - - year = 0 - - if (dateofacceptance is not None): - year = dateofacceptance.split('-')[0] - - # for each pid get 'pid.value' if 'pid.qualifier.classid' equals to 'doi' - dois = [ pid['value'] for pid in doc.get('pid', []) if (pid.get('qualifier', {}).get('classid') == 'doi' and pid['value'] is not None)] - - num_dois = len(dois) - - # exlcude openaire ids that do not correspond to DOIs - if (num_dois == 0): - return None - - fields = [ doc['id'], str(num_dois), chr(0x02).join(dois), str(year) ] - - return '\t'.join([ v.encode('utf-8') for v in fields ]) - -docs = None - -for result_type in ["publication", "dataset", "software", "otherresearchproduct"]: - - tmp = sc.textFile(src_dir + result_type).map(json.loads) - - if (docs is None): - docs = tmp - else: - # append all result types in one RDD - docs = docs.union(tmp) - -docs = docs.filter(lambda d: d.get('dataInfo', {}).get('deletedbyinference') == False and d.get('dataInfo', {}).get('invisible') == False) - -docs = docs.map(transform).filter(lambda d: d is not None) - -docs.saveAsTextFile(output) diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_scores_to_dois.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_scores_to_dois.py deleted file mode 100755 index f6a8e99969..0000000000 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_scores_to_dois.py +++ /dev/null @@ -1,168 +0,0 @@ -#!/usr/bin/python -# This program reads the openaire to doi mapping from the ${synonymFolder} of the workflow -# and uses this mapping to create doi-based score files in the format required by BiP! DB. -# This is done by reading each openaire-id based ranking file and joining the openaire based -# score and classes to all the corresponding dois. -################################################################################################# -# Imports -import sys - -# Sparksession lib to communicate with cluster via session object -from pyspark.sql import SparkSession - -# Import sql types to define schemas -from pyspark.sql.types import * - -# Import sql functions with shorthand alias -import pyspark.sql.functions as F - -from pyspark.sql.functions import max -# from pyspark.sql.functions import udf -################################################################################################# -################################################################################################# -# Clean up directory name - no longer needed in final workflow version -''' -def clean_directory_name(dir_name): - # We have a name with the form *_bip_universe_* or *_graph_universe_* - # and we need to keep the parts in * - - - dir_name_parts = dir_name.split('_') - dir_name_parts = [part for part in dir_name_parts if ('bip' not in part and 'graph' not in part and 'universe' not in part and 'from' not in part)] - - dir_name = dir_name.replace("openaire_id_graph", "openaire_ids") - clean_name = dir_name + ".txt.gz" - - # clean_name = '_'.join(dir_name_parts) - - # if '_ids' not in clean_name: - # clean_name = clean_name.replace('id_', 'ids_') - - # clean_name = clean_name.replace('.txt', '') - # clean_name = clean_name.replace('.gz', '') - - # if 'openaire_ids_' in clean_name: - # clean_name = clean_name.replace('openaire_ids_', '') - # clean_name = clean_name + '.txt.gz' - # else: - # clean_name = clean_name + '.txt.gz' - - return clean_name -''' -################################################################################################# -if len(sys.argv) < 3: - print ("Usage: ./map_scores_to_dois.py <...etc...>") - sys.exit(-1) - -# Read arguments -synonyms_folder = sys.argv[1] -num_partitions = int(sys.argv[2]) -input_file_list = [argument.replace("_openaire_id_graph", "").replace("_openaire_id_graph_", "") + "_openaire_ids.txt.gz" for argument in sys.argv[3:]] -# input_file_list = [clean_directory_name(item) for item in input_file_list] - -# Prepare output specific variables -output_file_list = [item.replace("_openaire_ids", "") for item in input_file_list] -output_file_list = [item + ".txt.gz" if not item.endswith(".txt.gz") else item for item in output_file_list] - -# --- INFO MESSAGES --- # -print ("\n\n----------------------------") -print ("Mpping openaire ids to DOIs") -print ("Reading input from: " + synonyms_folder) -print ("Num partitions: " + str(num_partitions)) -print ("Input files:" + " -- ".join(input_file_list)) -print ("Output files: " + " -- ".join(output_file_list)) -print ("----------------------------\n\n") -####################################################################################### -# We weill define the following schemas: -# --> the schema of the openaire - doi mapping file [string - int - doi_list] (the separator of the doi-list is a non printable character) -# --> a schema for floating point ranking scores [string - float - string] (the latter string is the class) -# --> a schema for integer ranking scores [string - int - string] (the latter string is the class) - -float_schema = StructType([ - StructField('id', StringType(), False), - StructField('score', FloatType(), False), - StructField('class', StringType(), False) - ]) - -int_schema = StructType([ - StructField('id', StringType(), False), - StructField('score', IntegerType(), False), - StructField('class', StringType(), False) - ]) - -# This schema concerns the output of the file -# containing the number of references of each doi -synonyms_schema = StructType([ - StructField('id', StringType(), False), - StructField('num_synonyms', IntegerType(), False), - StructField('doi_list', StringType(), False), - ]) -####################################################################################### -# Start spark session -spark = SparkSession.builder.appName('Map openaire scores to DOIs').getOrCreate() -# Set Log Level for spark session -spark.sparkContext.setLogLevel('WARN') -####################################################################################### -# MAIN Program - -# Read and repartition the synonym folder - also cache it since we will need to perform multiple joins -synonym_df = spark.read.schema(synonyms_schema).option('delimiter', '\t').csv(synonyms_folder) -synonym_df = synonym_df.select('id', F.split(F.col('doi_list'), chr(0x02)).alias('doi_list')) -synonym_df = synonym_df.select('id', F.explode('doi_list').alias('doi')).repartition(num_partitions, 'id').cache() - -# TESTING -# print ("Synonyms: " + str(synonym_df.count())) -# print ("DF looks like this:" ) -# synonym_df.show(1000, False) - -print ("\n\n-----------------------------") -# Now we need to join the score files on the openaire-id with the synonyms and then keep -# only doi - score - class and write this to the output -for offset, input_file in enumerate(input_file_list): - - print ("Mapping scores from " + input_file) - - # Select correct schema - schema = int_schema - if "attrank" in input_file.lower() or "pr" in input_file.lower() or "ram" in input_file.lower(): - schema = float_schema - - # Load file to dataframe - ranking_df = spark.read.schema(schema).option('delimiter', '\t').csv(input_file).repartition(num_partitions, 'id') - - # Get max score - max_score = ranking_df.select(max('score').alias('max')).collect()[0]['max'] - print ("Max Score for " + str(input_file) + " is " + str(max_score)) - - # TESTING - # print ("Loaded df sample:") - # ranking_df.show(1000, False) - - # Join scores to synonyms and keep required fields - doi_score_df = synonym_df.join(ranking_df, ['id']).select('doi', 'score', 'class').repartition(num_partitions, 'doi').cache() - # Write output - output_file = output_file_list[offset] - print ("Writing to: " + output_file) - doi_score_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_file, compression='gzip') - - # Creata another file for the bip update process - ranking_df = ranking_df.select('id', 'score', F.lit(F.col('score')/max_score).alias('normalized_score'), 'class', F.col('class').alias('class_dup')) - doi_score_df = synonym_df.join(ranking_df, ['id']).select('doi', 'score', 'normalized_score', 'class', 'class_dup').repartition(num_partitions, 'doi').cache() - output_file = output_file.replace(".txt.gz", "_for_bip_update.txt.gz") - print ("Writing bip update to: " + output_file) - doi_score_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_file, compression='gzip') - - - # Free memory? - ranking_df.unpersist(True) - -print ("-----------------------------") -print ("\n\nFinished!\n\n") - - - - - - - - diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index 70f5f8d2a6..5d86698231 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -17,10 +17,6 @@ openaireGraphInputPath ${nameNode}/${workingDir}/openaire_id_graph - - synonymFolder - ${nameNode}/${workingDir}/openaireid_to_dois/ - checkpointDir ${nameNode}/${workingDir}/check/ @@ -32,29 +28,34 @@ - + - - + + + ${wf:conf('resume') eq "start"} + + ${wf:conf('resume') eq "cc"} ${wf:conf('resume') eq "ram"} ${wf:conf('resume') eq "impulse"} ${wf:conf('resume') eq "pagerank"} ${wf:conf('resume') eq "attrank"} - - ${wf:conf('resume') eq "format-results"} - ${wf:conf('resume') eq "map-ids"} - ${wf:conf('resume') eq "map-scores"} - ${wf:conf('resume') eq "start"} - + + ${wf:conf('resume') eq "format-results"} + + ${wf:conf('resume') eq "projects-impact"} + + ${wf:conf('resume') eq "create-actionset"} + + @@ -295,18 +296,11 @@ - + - - - - - - - @@ -345,139 +339,8 @@ ${wfAppPath}/format_ranking_results.py#format_ranking_results.py - - - - - - - - - - - yarn-cluster - cluster - - - Format Ranking Results BiP! DB - - format_ranking_results.py - - - - --executor-memory=${sparkNormalExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkNormalDriverMemory} - --conf spark.executor.memoryOverhead=${sparkNormalExecutorMemory} - --conf spark.sql.shuffle.partitions=${sparkShufflePartitions} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - - - zenodo - - ${nameNode}/${workingDir}/${wf:actionData('get-file-names')['pr_file']} - ${nameNode}/${workingDir}/${wf:actionData('get-file-names')['attrank_file']} - ${nameNode}/${workingDir}/${wf:actionData('get-file-names')['cc_file']} - ${nameNode}/${workingDir}/${wf:actionData('get-file-names')['impulse_file']} - ${nameNode}/${workingDir}/${wf:actionData('get-file-names')['ram_file']} - - ${sparkShufflePartitions} - - openaire - - ${wfAppPath}/format_ranking_results.py#format_ranking_results.py - - - - - - - - - - - - - - - - - - - yarn-cluster - cluster - Openaire-DOI synonym collection - map_openaire_ids_to_dois.py - - - --executor-memory=${sparkHighExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkHighDriverMemory} - --conf spark.executor.memoryOverhead=${sparkHighExecutorMemory} - --conf spark.sql.shuffle.partitions=${sparkShufflePartitions} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - - - ${openaireDataInput}/ - - ${synonymFolder} - - ${wfAppPath}/map_openaire_ids_to_dois.py#map_openaire_ids_to_dois.py - - - - - - - - - - - - - - yarn-cluster - cluster - Mapping Openaire Scores to DOIs - map_scores_to_dois.py - - - --executor-memory=${sparkHighExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkHighDriverMemory} - --conf spark.executor.memoryOverhead=${sparkHighExecutorMemory} - --conf spark.sql.shuffle.partitions=${sparkShufflePartitions} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - - - ${synonymFolder} - - ${sparkShufflePartitions} - - ${nameNode}/${workingDir}/${wf:actionData('get-file-names')['pr_file']} - ${nameNode}/${workingDir}/${wf:actionData('get-file-names')['attrank_file']} - ${nameNode}/${workingDir}/${wf:actionData('get-file-names')['cc_file']} - ${nameNode}/${workingDir}/${wf:actionData('get-file-names')['impulse_file']} - ${nameNode}/${workingDir}/${wf:actionData('get-file-names')['ram_file']} - - ${wfAppPath}/map_scores_to_dois.py#map_scores_to_dois.py - - - - + @@ -594,18 +457,6 @@ Error formatting json files, error message[${wf:errorMessage(wf:lastErrorNode())}] - - Error formatting BIP files, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - Synonym collection failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - Mapping scores to DOIs failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - Deleting output path for actionsets failed, error message[${wf:errorMessage(wf:lastErrorNode())}]