41 lines
1.1 KiB
Python
41 lines
1.1 KiB
Python
import json
|
|
from affro_cluster import *
|
|
import os
|
|
from pyspark.sql import SparkSession
|
|
from pyspark.sql.functions import col
|
|
import sys
|
|
|
|
spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
|
|
|
|
folder_path = sys.argv[1]
|
|
hdfs_output_path = sys.argv[2]
|
|
|
|
# folder_path = '/user/zeppelin/affiliations/raw_aff_string/2024-08'
|
|
# hdfs_output_path = 'tmp/affro/results_strings'
|
|
|
|
|
|
def oalex_affro(record):
|
|
doi = record['doi'][16:]
|
|
oalex = record['rors']
|
|
try:
|
|
matchings = [item for sublist in [affro(x) for x in record['raw_aff_string']] for item in (sublist if isinstance(sublist, list) else [sublist])]
|
|
|
|
result = {'DOI' : doi, 'OAlex' : oalex, 'Matchings': matchings}
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
print(f"Error processing record with id {record.get('doi')}: {str(e)}")
|
|
return None
|
|
|
|
|
|
df = spark.read.json(folder_path)
|
|
filtered_df = df.filter(col("doi").isNotNull())
|
|
updated_rdd = filtered_df.rdd.map(lambda row: oalex_affro(row.asDict()))
|
|
|
|
json_rdd = updated_rdd.map(lambda record: json.dumps(record))
|
|
|
|
|
|
json_rdd.saveAsTextFile(hdfs_output_path)
|
|
|