import json from pyspark.sql.types import StringType, ArrayType, StructType, StructField, DoubleType from affro_cluster import * from pyspark.sql import SparkSession from pyspark.sql.functions import col, explode, first, collect_list, udf import sys spark = SparkSession.builder.appName("JSONProcessing").getOrCreate() folder_path = sys.argv[1] hdfs_output_path = sys.argv[2] matchings_schema = ArrayType( StructType([ StructField("Provenance", StringType(), nullable=True), StructField("PID", StringType(), nullable=True), StructField("Value", StringType(), nullable=True), StructField("Confidence", DoubleType(), nullable=True), StructField("Status", StringType(), nullable=True) ]) ) def oalex_affro_2(aff_string): try: matchings = affro(aff_string) # Ensure matchings is a list, even if affro returns a single dict if not isinstance(matchings, list): matchings = [matchings] # Create the result as a tuple that matches matchings_schema result = [] for matching in matchings: # Assuming 'matching' is a dictionary that contains 'Provenance', 'PID', 'Value', 'Confidence', 'Status' result.append(( matching.get("Provenance", None), matching.get("PID", None), matching.get("Value", None), float(matching.get("Confidence", None)), matching.get("Status", None) )) return result except Exception as e: print(f"Error processing affiliation string {aff_string}: {str(e)}") return () #Version of affro application on a single raw_aff_string and returns just the Matchins set def oalex_affro(aff_string): try: matchings = affro(aff_string) if not isinstance(matchings, list): matchings = [matchings] return matchings except Exception as e: print(f"Error processing affiliation string {aff_string}: {str(e)}") return [] oalex_affro_udf = udf(oalex_affro_2, matchings_schema) exploded = spark.read.json(folder_path) \ .filter(col("doi").isNotNull()) \ .select( col("doi").alias("DOI"), col("rors").alias("OAlex"), explode(col("raw_aff_string")).alias("affiliation") #this allows to split all the raw_aff_string and to parallelize better ) affs = explode \ .select("affiliation") \ .distinct() \ .withColumn("Matchings", oalex_affro_udf(col("affiliation"))) affs.join(exploded, on="affiliation") \ .select(col("DOI"), col("OAlex"), explode(col("Matchings")).alias("match") ) \ .groupBy("DOI") \ .agg(first("OAlex").alias("OAlex"), #for each DOI it says what are the other columns Since OALEX is equal for each doi just select the first, while use the collect_list function to aggregate the Matchings collect_list("match").alias("Matchings") #each exploded match is collected again ) \ .write \ .mode("overwrite") \ .option("compression","gzip") \ .json(hdfs_output_path)