[oalex] register the UDF oalex_affro and the schema of the output to be used in the dataframe by pyspark

This commit is contained in:
Miriam Baglioni 2024-12-05 12:18:45 +01:00
parent efa4db4e52
commit 4fe3d31ed5
1 changed files with 19 additions and 5 deletions

View File

@ -1,15 +1,27 @@
import json
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, DoubleType
from affro_cluster import *
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, first, collect_list
from pyspark.sql.functions import col, explode, first, collect_list, udf
import sys
spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
folder_path = sys.argv[1]
hdfs_output_path = sys.argv[2]
working_dir_path = sys.argv[3]
matchings_schema = ArrayType(
StructType([
StructField("Provenance", StringType(), nullable=True),
StructField("PID", StringType(), nullable=True),
StructField("Value", StringType(), nullable=True),
StructField("Confidence", DoubleType(), nullable=True),
StructField("Status", StringType(), nullable=True)
])
)
#Version of affro application on a single raw_aff_string and returns just the Matchins set
def oalex_affro(aff_string):
@ -22,12 +34,14 @@ def oalex_affro(aff_string):
print(f"Error processing affiliation string {aff_string}: {str(e)}")
return []
oalex_affro_udf = udf(oalex_affro, matchings_schema)
explode = spark.read.json(folder_path) \
.filter(col("doi").isNotNull()) \
.select(
col("doi").alias("DOI"),
col("ror").alias("OAlex"),
explode(col("raw_aff_string")).alias("affiliation") #this allows to split all the raw_aff_string and to parallelize better
col("doi").alias("DOI"),
col("ror").alias("OAlex"),
explode(col("raw_aff_string")).alias("affiliation") #this allows to split all the raw_aff_string and to parallelize better
)
affs = explode \