diff --git a/strings.py b/strings.py index 7c4d363..cc71569 100644 --- a/strings.py +++ b/strings.py @@ -22,21 +22,22 @@ def oalex_affro(doi, aff_string): return [] -spark.read.json(folder_path) - .filter(col("doi").isNotNull()) +spark.read.json(folder_path) \ + .filter(col("doi").isNotNull()) \ .select( col("doi").alias("DOI"), col("rors").alias("OAlex"), explode(col("raw_aff_string")).alias("aff_string") #this allows to split all the raw_aff_string and to parallelize better - ) - .withColumn("Matchings", oalex_affro(col("doi"), col("aff_string"))) #this one says create a new column with name Matchinds as the result of the function as second argument - .drop(col("aff_string") - .select(col("DOI"),col("OAlex"),explode("Matchins").alias("match") - .groupBy("DOI") #this groups by doi to have just one row per each doi + ) \ + .drop(col("aff_string") #removes the aff_string column + ) \ + .select(col("DOI"),col("OAlex"),explode("Matchins").alias("match")) \ + .groupBy("DOI") \ .agg(first("OAlex").alias("OAlex"), #for each DOI it says what are the other columns Since OALEX is equal for each doi just select the first, while use the collect_list function to aggregate the Matchings - collect_list("match").alias("Matchings")) - .write - .mode("overwrite") #in case the folder already exists on hadoop it does not break - .option("compression","gzip") #to reduce the space + collect_list("match").alias("Matchings") #each exploded match is collected again + ) \ + .write \ + .mode("overwrite") \ + .option("compression","gzip") \ .json(hdfs_output_path)