Oalex #13

Merged
mkallipo merged 15 commits from openaire-workflow-ready_2 into openaire-workflow-ready 2024-12-09 18:51:24 +01:00
1 changed files with 77 additions and 20 deletions

View File

@ -1,8 +1,11 @@
import json
import time
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, DoubleType
from threading import Thread
from affro_cluster import *
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import col, explode, first, collect_list, udf, collect_set
import sys
spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
@ -10,31 +13,85 @@ spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
folder_path = sys.argv[1]
hdfs_output_path = sys.argv[2]
# folder_path = '/user/zeppelin/affiliations/raw_aff_string/2024-08'
# hdfs_output_path = 'tmp/affro/results_strings'
matchings_schema = ArrayType(
StructType([
StructField("Provenance", StringType(), nullable=True),
StructField("PID", StringType(), nullable=True),
StructField("Value", StringType(), nullable=True),
StructField("Confidence", DoubleType(), nullable=True),
StructField("Status", StringType(), nullable=True)
])
)
operation_counter = spark.sparkContext.accumulator(0)
def oalex_affro(record):
doi = record['doi'][16:]
oalex = record['rors']
try:
matchings = [item for sublist in [affro(x) for x in record['raw_aff_string']] for item in (sublist if isinstance(sublist, list) else [sublist])]
#Version of affro application on a single raw_aff_string and returns just the Matchins set
def oalex_affro(aff_string):
global operation_counter
try:
matchings = affro(aff_string)
operation_counter += 1
# Ensure matchings is a list, even if affro returns a single dict
if not isinstance(matchings, list):
matchings = [matchings]
# Create the result as a tuple that matches matchings_schema
result = []
for matching in matchings:
# Assuming 'matching' is a dictionary that contains 'Provenance', 'PID', 'Value', 'Confidence', 'Status'
result.append((
matching.get("Provenance", None),
matching.get("PID", None),
matching.get("Value", None),
float(matching.get("Confidence", None)),
matching.get("Status", None)
))
result = {'DOI' : doi, 'OAlex' : oalex, 'Matchings': matchings}
return result
except Exception as e:
print(f"Error processing record with id {record.get('doi')}: {str(e)}")
return None
print(f"Error processing affiliation string {aff_string}: {str(e)}")
return ()
oalex_affro_udf = udf(oalex_affro, matchings_schema)
monitor_done = False
df = spark.read.json(folder_path)
filtered_df = df.filter(col("doi").isNotNull())
updated_rdd = filtered_df.rdd.map(lambda row: oalex_affro(row.asDict()))
def monitor_counter(interval):
while True:
print(f"Number of calls to AffRo: {operation_counter.value}")
time.sleep(interval)
if monitor_done:
break
json_rdd = updated_rdd.map(lambda record: json.dumps(record))
exploded = spark.read.json(folder_path) \
.filter(col("doi").isNotNull()) \
.select(
col("doi").alias("DOI"),
col("rors").alias("OAlex"),
explode(col("raw_aff_string")).alias("affiliation") #this allows to split all the raw_aff_string and to parallelize better
)
monitor_thread = Thread(target=monitor_counter, args=(600,), daemon=True)
monitor_thread.start()
json_rdd.saveAsTextFile(hdfs_output_path)
affs = exploded \
.select("affiliation") \
.distinct() \
.withColumn("Matchings", oalex_affro_udf(col("affiliation")))
affs.join(exploded, on="affiliation") \
.select(col("DOI"),
col("OAlex"),
explode(col("Matchings")).alias("match")
) \
.groupBy("DOI") \
.agg(first("OAlex").alias("OAlex"), #for each DOI it says what are the other columns Since OALEX is equal for each doi just select the first, while use the collect_list function to aggregate the Matchings
collect_set("match").alias("Matchings") #each exploded match is collected again
) \
.write \
.mode("overwrite") \
.option("compression","gzip") \
.json(hdfs_output_path)
monitor_done = True
monitor_thread.join()