From 3de35fd6a310ca41c8fb7cdd1a1e1396a2067fba Mon Sep 17 00:00:00 2001 From: ikanellos Date: Thu, 11 May 2023 14:42:25 +0300 Subject: [PATCH] Produce 5 classes of ranking scores --- .../oozie_app/format_ranking_results.py | 31 ++++++++++++++++--- .../impact_indicators/oozie_app/workflow.xml | 2 +- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/format_ranking_results.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/format_ranking_results.py index 60c71e52f..e7d62c2f1 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/format_ranking_results.py +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/format_ranking_results.py @@ -421,7 +421,7 @@ elif mode == 'json': # Score-specific dataframe - read inputs pagerank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(pagerank_dir).repartition(num_partitions, 'id') - attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',False).csv(attrank_dir).repartition(num_partitions, 'id') + attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(attrank_dir).repartition(num_partitions, 'id') cc_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(cc_dir).repartition(num_partitions, 'id') impulse_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(impulse_dir).repartition(num_partitions, 'id') ram_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header', True).csv(ram_dir).repartition(num_partitions, 'id') @@ -601,7 +601,7 @@ elif mode == 'json-5-way': # Score-specific dataframe - read inputs pagerank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(pagerank_dir).repartition(num_partitions, 'id') - attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',False).csv(attrank_dir).repartition(num_partitions, 'id') + attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(attrank_dir).repartition(num_partitions, 'id') cc_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(cc_dir).repartition(num_partitions, 'id') impulse_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(impulse_dir).repartition(num_partitions, 'id') ram_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header', True).csv(ram_dir).repartition(num_partitions, 'id') @@ -753,15 +753,36 @@ elif mode == 'json-5-way': # -------------------------------------------- # # Write json output + # -------------------------------------------- # + # Write json output - set the directory here output_dir = "/".join(pagerank_dir.split('/')[:-1]) if graph_type == 'bip': - output_dir = output_dir + '/bip_universe_doi_scores_5_classes/' + output_dir = output_dir + '/bip_universe_doi_scores/' else: - output_dir = output_dir + '/openaire_universe_scores_5_classes/' - + output_dir = output_dir + '/openaire_universe_scores/' + + # Write the dataframe print ("Writing output to: " + output_dir) results_df.write.mode('overwrite').option('header', False).text(output_dir, compression='gzip') + # Rename the files to .json.gz now + sc = spark.sparkContext + URI = sc._gateway.jvm.java.net.URI + Path = sc._gateway.jvm.org.apache.hadoop.fs.Path + FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem + # Get master prefix from input file path + master_prefix = "/".join(pagerank_dir.split('/')[:5]) + fs = FileSystem.get(URI(master_prefix), sc._jsc.hadoopConfiguration()) + path = Path(output_dir) + print ("Path is:" + path.toString()) + file_list = fs.listStatus(Path(output_dir)) + print ("Renaming files:") + for f in file_list: + initial_filename = f.getPath().toString() + if "part" in initial_filename: + print (initial_filename + " => " + initial_filename.replace(".txt.gz", ".json.gz")) + fs.rename(Path(initial_filename), Path(initial_filename.replace(".txt.gz", ".json.gz"))) + # Close spark session spark.stop() diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index 7aa95db22..f07a27244 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -390,7 +390,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - json + json-5-way ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']} ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']}