affRo/crossref.py

67 lines
2.0 KiB
Python

import json
from pyspark.sql import SparkSession
from affro_cluster import *
import sys
folder_path = sys.argv[1]
hdfs_output_path = sys.argv[2]
# Initialize Spark session
spark = SparkSession.builder.appName("AffRo-Crossref").getOrCreate()
def remove_duplicates(list_of_dicts):
# Use a set to store tuples of dictionary items to filter out duplicates
seen = set()
unique_list_of_dicts = []
for d in list_of_dicts:
# Convert the dictionary to a tuple of items
items = tuple(d.items())
if items not in seen:
seen.add(items)
unique_list_of_dicts.append(d)
return unique_list_of_dicts
def crossref_affro(record):
doi = record['DOI']
try:
for author in record['author']:
affiliations = []
if len(author['affiliation'])>0:
for organization in author['affiliation']:
try:
if organization['name'] not in affiliations:
affiliations.append(organization['name'])
except:
pass
if len(affiliations)>0:
affiliations = list(set(affiliations))
ror_links = [affro(affil) for affil in affiliations]
matchings = [inner_ror for outer_ror in ror_links for inner_ror in outer_ror]
matchings = remove_duplicates(matchings)
if len(matchings)>0:
result = {'DOI' : doi, 'Matchings' : matchings}
return result
except Exception as e:
print(f"Error processing record with id {record['DOI']} : {str(e)}")
df = spark.read.json(folder_path)
# Apply the update_record function
updated_rdd = df.rdd.map(lambda row: crossref_affro(row.asDict()))
filtered_rdd = updated_rdd.filter(lambda record: record is not None and record != {})
# Convert updated RDD to JSON strings
json_rdd = filtered_rdd.map(lambda record: json.dumps(record))
json_rdd.saveAsTextFile(hdfs_output_path)