67 lines
2.2 KiB
Python
67 lines
2.2 KiB
Python
import json
|
|
|
|
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, DoubleType
|
|
|
|
from affro_cluster import *
|
|
|
|
from pyspark.sql import SparkSession
|
|
from pyspark.sql.functions import col, explode, first, collect_list, udf
|
|
import sys
|
|
|
|
spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
|
|
|
|
folder_path = sys.argv[1]
|
|
hdfs_output_path = sys.argv[2]
|
|
|
|
matchings_schema = ArrayType(
|
|
StructType([
|
|
StructField("Provenance", StringType(), nullable=True),
|
|
StructField("PID", StringType(), nullable=True),
|
|
StructField("Value", StringType(), nullable=True),
|
|
StructField("Confidence", DoubleType(), nullable=True),
|
|
StructField("Status", StringType(), nullable=True)
|
|
])
|
|
)
|
|
|
|
#Version of affro application on a single raw_aff_string and returns just the Matchins set
|
|
def oalex_affro(aff_string):
|
|
try:
|
|
matchings = affro(aff_string)
|
|
if not isinstance(matchings, list):
|
|
matchings = [matchings]
|
|
return matchings
|
|
except Exception as e:
|
|
print(f"Error processing affiliation string {aff_string}: {str(e)}")
|
|
return []
|
|
|
|
oalex_affro_udf = udf(oalex_affro, matchings_schema)
|
|
|
|
explode = spark.read.json(folder_path) \
|
|
.filter(col("doi").isNotNull()) \
|
|
.select(
|
|
col("doi").alias("DOI"),
|
|
col("ror").alias("OAlex"),
|
|
explode(col("raw_aff_string")).alias("affiliation") #this allows to split all the raw_aff_string and to parallelize better
|
|
)
|
|
|
|
affs = explode \
|
|
.select("affiliation") \
|
|
.distinct() \
|
|
.withColumn("Matchings", oalex_affro_udf(col("affiliation")))
|
|
|
|
|
|
affs.join(explode, on="affiliation") \
|
|
.select(col("DOI"),
|
|
col("OAlex"),
|
|
explode(col("Matchins")).alias("match")
|
|
) \
|
|
.groupBy("DOI") \
|
|
.agg(first("OAlex").alias("OAlex"), #for each DOI it says what are the other columns Since OALEX is equal for each doi just select the first, while use the collect_list function to aggregate the Matchings
|
|
collect_list("match").alias("Matchings") #each exploded match is collected again
|
|
) \
|
|
.write \
|
|
.mode("overwrite") \
|
|
.option("compression","gzip") \
|
|
.json(hdfs_output_path)
|
|
|