Oalex #13

Merged
mkallipo merged 15 commits from openaire-workflow-ready_2 into openaire-workflow-ready 2024-12-09 18:51:24 +01:00
1 changed files with 12 additions and 11 deletions
Showing only changes of commit ea2e27a9f4 - Show all commits

View File

@ -22,21 +22,22 @@ def oalex_affro(doi, aff_string):
return [] return []
spark.read.json(folder_path) spark.read.json(folder_path) \
.filter(col("doi").isNotNull()) .filter(col("doi").isNotNull()) \
.select( .select(
col("doi").alias("DOI"), col("doi").alias("DOI"),
col("rors").alias("OAlex"), col("rors").alias("OAlex"),
explode(col("raw_aff_string")).alias("aff_string") #this allows to split all the raw_aff_string and to parallelize better explode(col("raw_aff_string")).alias("aff_string") #this allows to split all the raw_aff_string and to parallelize better
) ) \
.withColumn("Matchings", oalex_affro(col("doi"), col("aff_string"))) #this one says create a new column with name Matchinds as the result of the function as second argument .drop(col("aff_string") #removes the aff_string column
.drop(col("aff_string") ) \
.select(col("DOI"),col("OAlex"),explode("Matchins").alias("match") .select(col("DOI"),col("OAlex"),explode("Matchins").alias("match")) \
.groupBy("DOI") #this groups by doi to have just one row per each doi .groupBy("DOI") \
.agg(first("OAlex").alias("OAlex"), #for each DOI it says what are the other columns Since OALEX is equal for each doi just select the first, while use the collect_list function to aggregate the Matchings .agg(first("OAlex").alias("OAlex"), #for each DOI it says what are the other columns Since OALEX is equal for each doi just select the first, while use the collect_list function to aggregate the Matchings
collect_list("match").alias("Matchings")) collect_list("match").alias("Matchings") #each exploded match is collected again
.write ) \
.mode("overwrite") #in case the folder already exists on hadoop it does not break .write \
.option("compression","gzip") #to reduce the space .mode("overwrite") \
.option("compression","gzip") \
.json(hdfs_output_path) .json(hdfs_output_path)