8172_impact_indicators_workflow #284

Merged
miriam.baglioni merged 49 commits from 8172_impact_indicators_workflow into beta 2023-08-14 15:50:48 +02:00
1 changed files with 15 additions and 0 deletions
Showing only changes of commit 2374f445a9 - Show all commits

View File

@ -15,6 +15,8 @@ from pyspark.sql.types import *
# Import sql functions with shorthand alias # Import sql functions with shorthand alias
import pyspark.sql.functions as F import pyspark.sql.functions as F
from pyspark.sql.functions import max
# from pyspark.sql.functions import udf # from pyspark.sql.functions import udf
################################################################################################# #################################################################################################
################################################################################################# #################################################################################################
@ -127,6 +129,10 @@ for offset, input_file in enumerate(input_file_list):
# Load file to dataframe # Load file to dataframe
ranking_df = spark.read.schema(schema).option('delimiter', '\t').csv(input_file).repartition(num_partitions, 'id') ranking_df = spark.read.schema(schema).option('delimiter', '\t').csv(input_file).repartition(num_partitions, 'id')
# Get max score
max_score = ranking_df.select(max('score').alias('max')).collect()[0]['max']
print ("Max Score for " + str(input_file) + " is " + str(max_score))
# TESTING # TESTING
# print ("Loaded df sample:") # print ("Loaded df sample:")
@ -138,6 +144,15 @@ for offset, input_file in enumerate(input_file_list):
output_file = output_file_list[offset] output_file = output_file_list[offset]
print ("Writing to: " + output_file) print ("Writing to: " + output_file)
doi_score_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_file, compression='gzip') doi_score_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_file, compression='gzip')
# Creata another file for the bip update process
ranking_df = ranking_df.select('id', 'score', F.lit(F.col('score')/max_score).alias('normalized_score'), 'class', F.col('class').alias('class_dup'))
doi_score_df = synonym_df.join(ranking_df, ['id']).select('doi', 'score', 'normalized_score', 'class', 'class_dup').repartition(num_partitions, 'doi').cache()
output_file = output_file.replace(".txt.gz", "_for_bip_update.txt.gz")
print ("Writing bip update to: " + output_file)
doi_score_df.write.mode('overwrite').option('delimiter','\t').option('header',False).csv(output_file, compression='gzip')
# Free memory? # Free memory?
ranking_df.unpersist(True) ranking_df.unpersist(True)