[oalex] proposal to higher the parallelization
This commit is contained in:
parent
0500fc586f
commit
f4704aef4d
49
strings.py
49
strings.py
|
@ -2,7 +2,7 @@ import json
|
|||
from affro_cluster import *
|
||||
import os
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark.sql.functions import col
|
||||
from pyspark.sql.functions import col, explode, first, collect_list
|
||||
import sys
|
||||
|
||||
spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
|
||||
|
@ -10,31 +10,32 @@ spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
|
|||
folder_path = sys.argv[1]
|
||||
hdfs_output_path = sys.argv[2]
|
||||
|
||||
# folder_path = '/user/zeppelin/affiliations/raw_aff_string/2024-08'
|
||||
# hdfs_output_path = 'tmp/affro/results_strings'
|
||||
|
||||
|
||||
def oalex_affro(record):
|
||||
doi = record['doi'][16:]
|
||||
oalex = record['rors']
|
||||
#Version of affro application on a single raw_aff_string and returns just the Matchins set
|
||||
def oalex_affro(doi, aff_string):
|
||||
try:
|
||||
matchings = [item for sublist in [affro(x) for x in record['raw_aff_string']] for item in (sublist if isinstance(sublist, list) else [sublist])]
|
||||
|
||||
result = {'DOI' : doi, 'OAlex' : oalex, 'Matchings': matchings}
|
||||
|
||||
return result
|
||||
|
||||
matchings = affro(aff_string)
|
||||
if not isinstance(matchings, list):
|
||||
matchings = [matchings]
|
||||
return matchings
|
||||
except Exception as e:
|
||||
print(f"Error processing record with id {record.get('doi')}: {str(e)}")
|
||||
return None
|
||||
print(f"Error processing record with doi {doi}: {str(e)}")
|
||||
return []
|
||||
|
||||
|
||||
df = spark.read.json(folder_path)
|
||||
filtered_df = df.filter(col("doi").isNotNull())
|
||||
updated_rdd = filtered_df.rdd.map(lambda row: oalex_affro(row.asDict()))
|
||||
|
||||
json_rdd = updated_rdd.map(lambda record: json.dumps(record))
|
||||
|
||||
|
||||
json_rdd.saveAsTextFile(hdfs_output_path)
|
||||
spark.read.json(folder_path)
|
||||
.filter(col("doi").isNotNull())
|
||||
.select(
|
||||
col("doi").alias("DOI"),
|
||||
col("rors").alias("OAlex"),
|
||||
explode(col("raw_aff_string")).alias("aff_string") #this allows to split all the raw_aff_string and to parallelize better
|
||||
)
|
||||
.withColumn("Matchings", oalex_affro(col("doi"), col("aff_string"))) #this one says create a new column with name Matchinds as the result of the function as second argument
|
||||
.select("DOI","OAlex","Matchins")
|
||||
.groupBy("DOI") #this groups by doi to have just one row per each doi
|
||||
.agg(first("OAlex").alias("OAlex"), #for each DOI it says what are the other columns Since OALEX is equal for each doi just select the first, while use the collect_list function to aggregate the Matchings
|
||||
collect_list("Matchings").alias("Matchings"))
|
||||
.write
|
||||
.mode("overwrite") #in case the folder already exists on hadoop it does not break
|
||||
.option("compression","gzip") #to reduce the space
|
||||
.json(hdfs_output_path)
|
||||
|
||||
|
|
Loading…
Reference in New Issue