diff --git a/strings.py b/strings.py index cc71569..fd0706f 100644 --- a/strings.py +++ b/strings.py @@ -9,29 +9,38 @@ spark = SparkSession.builder.appName("JSONProcessing").getOrCreate() folder_path = sys.argv[1] hdfs_output_path = sys.argv[2] +working_dir_path = sys.argv[3] #Version of affro application on a single raw_aff_string and returns just the Matchins set -def oalex_affro(doi, aff_string): +def oalex_affro(aff_string): try: matchings = affro(aff_string) if not isinstance(matchings, list): matchings = [matchings] return matchings except Exception as e: - print(f"Error processing record with doi {doi}: {str(e)}") + print(f"Error processing affiliation string {aff_string}: {str(e)}") return [] - -spark.read.json(folder_path) \ +explode = spark.read.json(folder_path) \ .filter(col("doi").isNotNull()) \ .select( - col("doi").alias("DOI"), - col("rors").alias("OAlex"), - explode(col("raw_aff_string")).alias("aff_string") #this allows to split all the raw_aff_string and to parallelize better - ) \ - .drop(col("aff_string") #removes the aff_string column - ) \ - .select(col("DOI"),col("OAlex"),explode("Matchins").alias("match")) \ + col("doi").alias("DOI"), + col("ror").alias("OAlex"), + explode(col("raw_aff_string")).alias("affiliation") #this allows to split all the raw_aff_string and to parallelize better + ) + +affs = explode \ + .select("affiliation") \ + .distinct() \ + .withColumn("Matchings", oalex_affro(col("aff_string"))) + + +affs.join(explode, on = "affiliation") \ + .select(col("DOI"), + col("OAlex"), + explode("Matchins").alias("match") + ) \ .groupBy("DOI") \ .agg(first("OAlex").alias("OAlex"), #for each DOI it says what are the other columns Since OALEX is equal for each doi just select the first, while use the collect_list function to aggregate the Matchings collect_list("match").alias("Matchings") #each exploded match is collected again