diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/format_ranking_results.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/format_ranking_results.py
index 60c71e52f..e7d62c2f1 100644
--- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/format_ranking_results.py
+++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/format_ranking_results.py
@@ -421,7 +421,7 @@ elif mode == 'json':
# Score-specific dataframe - read inputs
pagerank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(pagerank_dir).repartition(num_partitions, 'id')
- attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',False).csv(attrank_dir).repartition(num_partitions, 'id')
+ attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(attrank_dir).repartition(num_partitions, 'id')
cc_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(cc_dir).repartition(num_partitions, 'id')
impulse_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(impulse_dir).repartition(num_partitions, 'id')
ram_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header', True).csv(ram_dir).repartition(num_partitions, 'id')
@@ -601,7 +601,7 @@ elif mode == 'json-5-way':
# Score-specific dataframe - read inputs
pagerank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(pagerank_dir).repartition(num_partitions, 'id')
- attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',False).csv(attrank_dir).repartition(num_partitions, 'id')
+ attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(attrank_dir).repartition(num_partitions, 'id')
cc_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(cc_dir).repartition(num_partitions, 'id')
impulse_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(impulse_dir).repartition(num_partitions, 'id')
ram_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header', True).csv(ram_dir).repartition(num_partitions, 'id')
@@ -753,15 +753,36 @@ elif mode == 'json-5-way':
# -------------------------------------------- #
# Write json output
+ # -------------------------------------------- #
+ # Write json output - set the directory here
output_dir = "/".join(pagerank_dir.split('/')[:-1])
if graph_type == 'bip':
- output_dir = output_dir + '/bip_universe_doi_scores_5_classes/'
+ output_dir = output_dir + '/bip_universe_doi_scores/'
else:
- output_dir = output_dir + '/openaire_universe_scores_5_classes/'
-
+ output_dir = output_dir + '/openaire_universe_scores/'
+
+ # Write the dataframe
print ("Writing output to: " + output_dir)
results_df.write.mode('overwrite').option('header', False).text(output_dir, compression='gzip')
+ # Rename the files to .json.gz now
+ sc = spark.sparkContext
+ URI = sc._gateway.jvm.java.net.URI
+ Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
+ FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
+ # Get master prefix from input file path
+ master_prefix = "/".join(pagerank_dir.split('/')[:5])
+ fs = FileSystem.get(URI(master_prefix), sc._jsc.hadoopConfiguration())
+ path = Path(output_dir)
+ print ("Path is:" + path.toString())
+ file_list = fs.listStatus(Path(output_dir))
+ print ("Renaming files:")
+ for f in file_list:
+ initial_filename = f.getPath().toString()
+ if "part" in initial_filename:
+ print (initial_filename + " => " + initial_filename.replace(".txt.gz", ".json.gz"))
+ fs.rename(Path(initial_filename), Path(initial_filename.replace(".txt.gz", ".json.gz")))
+
# Close spark session
spark.stop()
diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml
index 7aa95db22..f07a27244 100644
--- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml
@@ -390,7 +390,7 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- json
+ json-5-way
${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']}
${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']}