diff --git a/strings.py b/strings.py index fd0706f..fd67cd6 100644 --- a/strings.py +++ b/strings.py @@ -1,15 +1,27 @@ import json + +from pyspark.sql.types import StringType, ArrayType, StructType, StructField, DoubleType + from affro_cluster import * import os from pyspark.sql import SparkSession -from pyspark.sql.functions import col, explode, first, collect_list +from pyspark.sql.functions import col, explode, first, collect_list, udf import sys spark = SparkSession.builder.appName("JSONProcessing").getOrCreate() folder_path = sys.argv[1] hdfs_output_path = sys.argv[2] -working_dir_path = sys.argv[3] + +matchings_schema = ArrayType( + StructType([ + StructField("Provenance", StringType(), nullable=True), + StructField("PID", StringType(), nullable=True), + StructField("Value", StringType(), nullable=True), + StructField("Confidence", DoubleType(), nullable=True), + StructField("Status", StringType(), nullable=True) + ]) +) #Version of affro application on a single raw_aff_string and returns just the Matchins set def oalex_affro(aff_string): @@ -22,12 +34,14 @@ def oalex_affro(aff_string): print(f"Error processing affiliation string {aff_string}: {str(e)}") return [] +oalex_affro_udf = udf(oalex_affro, matchings_schema) + explode = spark.read.json(folder_path) \ .filter(col("doi").isNotNull()) \ .select( - col("doi").alias("DOI"), - col("ror").alias("OAlex"), - explode(col("raw_aff_string")).alias("affiliation") #this allows to split all the raw_aff_string and to parallelize better + col("doi").alias("DOI"), + col("ror").alias("OAlex"), + explode(col("raw_aff_string")).alias("affiliation") #this allows to split all the raw_aff_string and to parallelize better ) affs = explode \