From 2374f445a941a76fe239a95e75a5e491c12a22bf Mon Sep 17 00:00:00 2001 From: ikanellos Date: Fri, 21 Jul 2023 17:42:46 +0300 Subject: [PATCH] Produce additional bip update specific files --- .../oozie_app/map_scores_to_dois.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_scores_to_dois.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_scores_to_dois.py index 0fc67eb53..f6a8e9996 100755 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_scores_to_dois.py +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_scores_to_dois.py @@ -15,6 +15,8 @@ from pyspark.sql.types import * # Import sql functions with shorthand alias import pyspark.sql.functions as F + +from pyspark.sql.functions import max # from pyspark.sql.functions import udf ################################################################################################# ################################################################################################# @@ -127,6 +129,10 @@ for offset, input_file in enumerate(input_file_list): # Load file to dataframe ranking_df = spark.read.schema(schema).option('delimiter', '\t').csv(input_file).repartition(num_partitions, 'id') + + # Get max score + max_score = ranking_df.select(max('score').alias('max')).collect()[0]['max'] + print ("Max Score for " + str(input_file) + " is " + str(max_score)) # TESTING # print ("Loaded df sample:") @@ -138,6 +144,15 @@ for offset, input_file in enumerate(input_file_list): output_file = output_file_list[offset] print ("Writing to: " + output_file) doi_score_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_file, compression='gzip') + + # Creata another file for the bip update process + ranking_df = ranking_df.select('id', 'score', F.lit(F.col('score')/max_score).alias('normalized_score'), 'class', F.col('class').alias('class_dup')) + doi_score_df = synonym_df.join(ranking_df, ['id']).select('doi', 'score', 'normalized_score', 'class', 'class_dup').repartition(num_partitions, 'doi').cache() + output_file = output_file.replace(".txt.gz", "_for_bip_update.txt.gz") + print ("Writing bip update to: " + output_file) + doi_score_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_file, compression='gzip') + + # Free memory? ranking_df.unpersist(True)