From a59d0ce9fc674afcd602adf8e7aad942ef8ed20a Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Thu, 5 Dec 2024 18:41:16 +0100 Subject: [PATCH] [oalex] avoid redefinition of explode function --- strings.py | 55 +++++++++++++++++------------------------------------- 1 file changed, 17 insertions(+), 38 deletions(-) diff --git a/strings.py b/strings.py index 961918e..250d659 100644 --- a/strings.py +++ b/strings.py @@ -12,7 +12,6 @@ spark = SparkSession.builder.appName("JSONProcessing").getOrCreate() folder_path = sys.argv[1] hdfs_output_path = sys.argv[2] -working_dir = sys.argv[3] matchings_schema = ArrayType( StructType([ @@ -24,20 +23,6 @@ matchings_schema = ArrayType( ]) ) -result_schema = StructType([ - StructField("affiliation", StringType(),nullable=True), - StructField("match", ArrayType( - StructType([ - StructField("Provenance", StringType(), nullable=True), - StructField("PID", StringType(), nullable=True), - StructField("Value", StringType(), nullable=True), - StructField("Confidence", DoubleType(), nullable=True), - StructField("Status", StringType(), nullable=True) - ]) - )) -]) - - def oalex_affro_2(aff_string): try: matchings = affro(aff_string) @@ -77,34 +62,28 @@ def oalex_affro(aff_string): oalex_affro_udf = udf(oalex_affro_2, matchings_schema) -explode = spark.read.json(folder_path) \ +exploded = spark.read.json(folder_path) \ .filter(col("doi").isNotNull()) \ .select( - col("doi").alias("DOI"), - col("rors").alias("OAlex"), - explode(col("raw_aff_string")).alias("affiliation") #this allows to split all the raw_aff_string and to parallelize better - ) - -rdd = explode \ + col("doi").alias("DOI"), + col("rors").alias("OAlex"), + explode(col("raw_aff_string")).alias("affiliation") #this allows to split all the raw_aff_string and to parallelize better +) +affs = explode \ .select("affiliation") \ .distinct() \ - .rdd \ - .flatMap(lambda row: [{"affiliation":row['affiliation'], "match": m} for m in oalex_affro(row['affiliation'])]) + .withColumn("Matchings", oalex_affro_udf(col("affiliation"))) -rdd.map(json.dumps).saveAsTextFile(working_dir + "/ tmp") -affs = spark.read.json(working_dir + "/ tmp") - -affs.join(explode, on="affiliation") \ +affs.join(exploded, on="affiliation") \ .select(col("DOI"), col("OAlex"), - col("match") + explode(col("Matchings")).alias("match") ) \ - .groupBy("DOI") \ - .agg(first("OAlex").alias("OAlex"), #for each DOI it says what are the other columns Since OALEX is equal for each doi just select the first, while use the collect_list function to aggregate the Matchings - collect_list("match").alias("Matchings") #each exploded match is collected again - ) \ - .write \ - .mode("overwrite") \ - .option("compression","gzip") \ - .json(hdfs_output_path) - + .groupBy("DOI") \ + .agg(first("OAlex").alias("OAlex"), #for each DOI it says what are the other columns Since OALEX is equal for each doi just select the first, while use the collect_list function to aggregate the Matchings + collect_list("match").alias("Matchings") #each exploded match is collected again + ) \ + .write \ + .mode("overwrite") \ + .option("compression","gzip") \ + .json(hdfs_output_path)