Merge branch '8172_impact_indicators_workflow' of https://code-repo.d4science.org/D-Net/dnet-hadoop into 8172_impact_indicators_workflow

This commit is contained in:
Serafeim Chatzopoulos 2023-07-21 15:27:02 +03:00
commit c64e5e588f
1 changed files with 9 additions and 5 deletions

View File

@ -213,7 +213,10 @@ if mode == 'bip':
cc_dir = sys.argv[4] cc_dir = sys.argv[4]
impulse_dir = sys.argv[5] impulse_dir = sys.argv[5]
ram_dir = sys.argv[6] ram_dir = sys.argv[6]
refs_dir = sys.argv[7]
# NOTE: This was used initial, but @Serafeim told me to remove it since we don't get doi-doi referencew anymore
# In case of emergency, bring this back
# refs_dir = sys.argv[7]
# Score-specific dataframe # Score-specific dataframe
pagerank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(pagerank_dir).repartition(num_partitions, 'id') pagerank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(pagerank_dir).repartition(num_partitions, 'id')
@ -221,7 +224,7 @@ if mode == 'bip':
cc_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(cc_dir).repartition(num_partitions, 'id') cc_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(cc_dir).repartition(num_partitions, 'id')
impulse_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(impulse_dir).repartition(num_partitions, 'id') impulse_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(impulse_dir).repartition(num_partitions, 'id')
ram_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header', True).csv(ram_dir).repartition(num_partitions, 'id') ram_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header', True).csv(ram_dir).repartition(num_partitions, 'id')
refs_df = spark.read.schema(refs_schema).option('delimiter', '\t').option('header',True).csv(refs_dir).repartition(num_partitions, 'id') # refs_df = spark.read.schema(refs_schema).option('delimiter', '\t').option('header',True).csv(refs_dir).repartition(num_partitions, 'id')
# ----------- TESTING CODE --------------- # # ----------- TESTING CODE --------------- #
# pagerank_entries = pagerank_df.count() # pagerank_entries = pagerank_df.count()
@ -258,9 +261,10 @@ if mode == 'bip':
.select(results_df.id, 'pagerank', 'pagerank_normalized', 'attrank', 'attrank_normalized', 'cc', 'cc_normalized',\ .select(results_df.id, 'pagerank', 'pagerank_normalized', 'attrank', 'attrank_normalized', 'cc', 'cc_normalized',\
'3-cc', '3-cc_normalized', F.col('score').alias('ram')) '3-cc', '3-cc_normalized', F.col('score').alias('ram'))
# Add references # Add references - THIS WAS REMOVED SINCE WE DON't GET DOI REFERENCES
results_df = results_df.join(refs_df, ['id']).select(results_df.id, 'pagerank', 'pagerank_normalized', 'attrank', 'attrank_normalized', \ # In case of emergency bring back
'cc', 'cc_normalized', '3-cc', '3-cc_normalized', 'ram', 'num_refs') # results_df = results_df.join(refs_df, ['id']).select(results_df.id, 'pagerank', 'pagerank_normalized', 'attrank', 'attrank_normalized', \
# 'cc', 'cc_normalized', '3-cc', '3-cc_normalized', 'ram', 'num_refs')
# Write resulting dataframe to file # Write resulting dataframe to file
output_dir = "/".join(pagerank_dir.split('/')[:-1]) output_dir = "/".join(pagerank_dir.split('/')[:-1])