affRo/update_records.py

117 lines
4.5 KiB
Python

import json
import os
from pyspark.sql import SparkSession
from affro_cluster import *
folder_path = '/user/zeppelin/miriam.baglioni/AffStringFromIISDataset2'
#folder_path = 'check'
json_file_names = []
# Loop through all files in the directory
for file_name in os.listdir(folder_path):
# Check if the file is a JSON file (you can adjust the check as needed)
if file_name != '_SUCCESS':
json_file_names.append(file_name)
# json_file_names now contains the names of all JSON files in the folder
# Initialize Spark session
spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
def remove_duplicates(list_of_dicts):
# Use a set to store tuples of dictionary items to filter out duplicates
seen = set()
unique_list_of_dicts = []
for d in list_of_dicts:
# Convert the dictionary to a tuple of items
items = tuple(d.items())
if items not in seen:
seen.add(items)
unique_list_of_dicts.append(d)
return unique_list_of_dicts
def update_record(record):
id = record['id']
authors = []
try:
for author in record['authors']:
author_object = {}
if 'orcid.org/0' in author['fullName']:
author_object['Name'] = {'Full':author['fullName'].split(',')[1], 'First' : None, 'Last' : None}
author_object['ORCID'] = author['fullName'].split(',')[0][:36]
else:
author_object['Name'] = {'Full':author['fullName'], 'First' : None, 'Last' : None}
author_object['ORCID'] = None
author_object['Raw_affiliations'] = [affiliation['raw_affiliation_string'] for affiliation in author['affiliations']]
all_affs_with_ror = []
have_ror = False
for affiliation in author['affiliations']:
# author_object['Raw_affiliations'] = [x for x in affiliation['raw_affiliation_string']]
if 'ORCID: 0' in affiliation['raw_affiliation_string']:
x = affiliation['raw_affiliation_string']
author_object['ORCID'] = 'https://orcid.org/'+x.split('ORCID: ')[1]
elif 'ORCID 0' in affiliation['raw_affiliation_string']:
x = affiliation['raw_affiliation_string']
author_object['ORCID'] = 'https://orcid.org/'+x.split('ORCID ')[1]
if 'ror.org' in affiliation['raw_affiliation_string']:
have_ror = True
all_affs_with_ror.append({
'Origin': 'data',
'RORid': affiliation['raw_affiliation_string'][0:25],
'Confidence': None
})
else:
if len(affro(affiliation['raw_affiliation_string']))>0:
author_object['Organization_PIDs'] = affro(affiliation['raw_affiliation_string'])
author_object['Organization_PIDs'] = remove_duplicates([json.loads(x) for x in author_object['Organization_PIDs']])
else:
author_object['Organization_PIDs'] = []
if have_ror == True:
author_object['Organization_PIDs'] = all_affs_with_ror
order = ["Name", "Raw_affiliations", "Organization_PIDs", "ORCID"]
reordered_data = {k: author_object[k] for k in order}
authors.append(reordered_data)
organizations = remove_duplicates([x for author in authors for x in author['Organization_PIDs']])
updt = {'ID' : id, 'Authors' : authors, 'Organizations' : organizations}
return updt
except Exception as e:
print(f"Error processing record with id {record.get('id')}: {str(e)}")
return None
for file in json_file_names:
print('start processing '+str(file))
df = spark.read.json(folder_path + '/' + file)
# Apply the update_record function
updated_rdd = df.rdd.map(lambda row: update_record(row.asDict()))
# Convert updated RDD to JSON strings
json_rdd = updated_rdd.map(lambda record: json.dumps(record))
# Collect the data and write to an output file with a unique name
json_data = json_rdd.collect()
# Create a new filename by appending "_output.json" to the original filename (without extension)
output_file_name = file+'_output.json'
print('end processing '+str(file))
with open(output_file_name, 'w') as f:
for i, item in enumerate(json_data):
print('write '+str(i))
f.write(item + '\n')