8172_impact_indicators_workflow #284
|
@ -421,7 +421,7 @@ elif mode == 'json':
|
|||
|
||||
# Score-specific dataframe - read inputs
|
||||
pagerank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(pagerank_dir).repartition(num_partitions, 'id')
|
||||
attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',False).csv(attrank_dir).repartition(num_partitions, 'id')
|
||||
attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(attrank_dir).repartition(num_partitions, 'id')
|
||||
cc_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(cc_dir).repartition(num_partitions, 'id')
|
||||
impulse_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(impulse_dir).repartition(num_partitions, 'id')
|
||||
ram_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header', True).csv(ram_dir).repartition(num_partitions, 'id')
|
||||
|
@ -601,7 +601,7 @@ elif mode == 'json-5-way':
|
|||
|
||||
# Score-specific dataframe - read inputs
|
||||
pagerank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(pagerank_dir).repartition(num_partitions, 'id')
|
||||
attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',False).csv(attrank_dir).repartition(num_partitions, 'id')
|
||||
attrank_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header',True).csv(attrank_dir).repartition(num_partitions, 'id')
|
||||
cc_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(cc_dir).repartition(num_partitions, 'id')
|
||||
impulse_df = spark.read.schema(int_schema).option('delimiter', '\t').option('header',True).csv(impulse_dir).repartition(num_partitions, 'id')
|
||||
ram_df = spark.read.schema(float_schema).option('delimiter', '\t').option('header', True).csv(ram_dir).repartition(num_partitions, 'id')
|
||||
|
@ -753,15 +753,36 @@ elif mode == 'json-5-way':
|
|||
|
||||
# -------------------------------------------- #
|
||||
# Write json output
|
||||
# -------------------------------------------- #
|
||||
# Write json output - set the directory here
|
||||
output_dir = "/".join(pagerank_dir.split('/')[:-1])
|
||||
if graph_type == 'bip':
|
||||
output_dir = output_dir + '/bip_universe_doi_scores_5_classes/'
|
||||
output_dir = output_dir + '/bip_universe_doi_scores/'
|
||||
else:
|
||||
output_dir = output_dir + '/openaire_universe_scores_5_classes/'
|
||||
output_dir = output_dir + '/openaire_universe_scores/'
|
||||
|
||||
# Write the dataframe
|
||||
print ("Writing output to: " + output_dir)
|
||||
results_df.write.mode('overwrite').option('header', False).text(output_dir, compression='gzip')
|
||||
|
||||
# Rename the files to .json.gz now
|
||||
sc = spark.sparkContext
|
||||
URI = sc._gateway.jvm.java.net.URI
|
||||
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
|
||||
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
|
||||
# Get master prefix from input file path
|
||||
master_prefix = "/".join(pagerank_dir.split('/')[:5])
|
||||
fs = FileSystem.get(URI(master_prefix), sc._jsc.hadoopConfiguration())
|
||||
path = Path(output_dir)
|
||||
print ("Path is:" + path.toString())
|
||||
file_list = fs.listStatus(Path(output_dir))
|
||||
print ("Renaming files:")
|
||||
for f in file_list:
|
||||
initial_filename = f.getPath().toString()
|
||||
if "part" in initial_filename:
|
||||
print (initial_filename + " => " + initial_filename.replace(".txt.gz", ".json.gz"))
|
||||
fs.rename(Path(initial_filename), Path(initial_filename.replace(".txt.gz", ".json.gz")))
|
||||
|
||||
# Close spark session
|
||||
spark.stop()
|
||||
|
||||
|
|
|
@ -390,7 +390,7 @@
|
|||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}</spark-opts>
|
||||
<!-- Script arguments here -->
|
||||
<arg>json</arg>
|
||||
<arg>json-5-way</arg>
|
||||
<!-- Input files must be identified dynamically -->
|
||||
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']}</arg>
|
||||
<arg>${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']}</arg>
|
||||
|
|
Loading…
Reference in New Issue