Oalex #13

Merged
mkallipo merged 15 commits from openaire-workflow-ready_2 into openaire-workflow-ready 2024-12-09 18:51:24 +01:00
1 changed files with 20 additions and 11 deletions
Showing only changes of commit efa4db4e52 - Show all commits

View File

@ -9,29 +9,38 @@ spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
folder_path = sys.argv[1] folder_path = sys.argv[1]
hdfs_output_path = sys.argv[2] hdfs_output_path = sys.argv[2]
working_dir_path = sys.argv[3]
#Version of affro application on a single raw_aff_string and returns just the Matchins set #Version of affro application on a single raw_aff_string and returns just the Matchins set
def oalex_affro(doi, aff_string): def oalex_affro(aff_string):
try: try:
matchings = affro(aff_string) matchings = affro(aff_string)
if not isinstance(matchings, list): if not isinstance(matchings, list):
matchings = [matchings] matchings = [matchings]
return matchings return matchings
except Exception as e: except Exception as e:
print(f"Error processing record with doi {doi}: {str(e)}") print(f"Error processing affiliation string {aff_string}: {str(e)}")
return [] return []
explode = spark.read.json(folder_path) \
spark.read.json(folder_path) \
.filter(col("doi").isNotNull()) \ .filter(col("doi").isNotNull()) \
.select( .select(
col("doi").alias("DOI"), col("doi").alias("DOI"),
col("rors").alias("OAlex"), col("ror").alias("OAlex"),
explode(col("raw_aff_string")).alias("aff_string") #this allows to split all the raw_aff_string and to parallelize better explode(col("raw_aff_string")).alias("affiliation") #this allows to split all the raw_aff_string and to parallelize better
) \ )
.drop(col("aff_string") #removes the aff_string column
) \ affs = explode \
.select(col("DOI"),col("OAlex"),explode("Matchins").alias("match")) \ .select("affiliation") \
.distinct() \
.withColumn("Matchings", oalex_affro(col("aff_string")))
affs.join(explode, on = "affiliation") \
.select(col("DOI"),
col("OAlex"),
explode("Matchins").alias("match")
) \
.groupBy("DOI") \ .groupBy("DOI") \
.agg(first("OAlex").alias("OAlex"), #for each DOI it says what are the other columns Since OALEX is equal for each doi just select the first, while use the collect_list function to aggregate the Matchings .agg(first("OAlex").alias("OAlex"), #for each DOI it says what are the other columns Since OALEX is equal for each doi just select the first, while use the collect_list function to aggregate the Matchings
collect_list("match").alias("Matchings") #each exploded match is collected again collect_list("match").alias("Matchings") #each exploded match is collected again