112 lines
3.9 KiB
Python
112 lines
3.9 KiB
Python
|
import json
|
||
|
from pyspark.sql import SparkSession
|
||
|
from pyspark.sql.functions import col, from_json
|
||
|
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
|
||
|
from affro_cluster import *
|
||
|
|
||
|
import sys
|
||
|
|
||
|
folder_path = sys.argv[1]
|
||
|
hdfs_output_path = sys.argv[2]
|
||
|
|
||
|
|
||
|
# Initialize Spark session
|
||
|
spark = SparkSession.builder.appName("AffRo-DataCite").getOrCreate()
|
||
|
|
||
|
json_schema = StructType([
|
||
|
StructField("doi", StringType(), True),
|
||
|
StructField("attributes", StructType([
|
||
|
StructField("doi", StringType(), True),
|
||
|
StructField("identifiers", ArrayType(StringType()), True),
|
||
|
StructField("creators", ArrayType(StructType([
|
||
|
StructField("name", StringType(), True),
|
||
|
StructField("givenName", StringType(), True),
|
||
|
StructField("familyName", StringType(), True),
|
||
|
StructField("nameType", StringType(), True),
|
||
|
StructField("affiliation", ArrayType(StringType()), True),
|
||
|
StructField("nameIdentifiers", ArrayType(StringType()), True)
|
||
|
])), True),
|
||
|
]), True),
|
||
|
])
|
||
|
|
||
|
def remove_duplicates(list_of_dicts):
|
||
|
# Use a set to store tuples of dictionary items to filter out duplicates
|
||
|
seen = set()
|
||
|
unique_list_of_dicts = []
|
||
|
|
||
|
for d in list_of_dicts:
|
||
|
# Convert the dictionary to a tuple of items
|
||
|
items = tuple(d.items())
|
||
|
if items not in seen:
|
||
|
seen.add(items)
|
||
|
unique_list_of_dicts.append(d)
|
||
|
|
||
|
return unique_list_of_dicts
|
||
|
|
||
|
|
||
|
def datacite_affro(record):
|
||
|
try:
|
||
|
doi = record['doi']
|
||
|
result = {}
|
||
|
authors = []
|
||
|
for creator in record['json_parsed']['attributes']['creators']:
|
||
|
name = {}
|
||
|
name['Full'] = creator['name']
|
||
|
corresponing = False
|
||
|
contributor_roles = None
|
||
|
matchings = []
|
||
|
try:
|
||
|
name['First'] = creator['givenName']
|
||
|
except Exception as e:
|
||
|
name['First'] = None
|
||
|
try:
|
||
|
name['Last'] = creator['familyName']
|
||
|
except:
|
||
|
name['Last'] = None
|
||
|
try:
|
||
|
name['Type'] = creator['nameType']
|
||
|
except:
|
||
|
name['Type'] = None
|
||
|
if 'affiliation' in creator:
|
||
|
affiliation = creator['affiliation']
|
||
|
for org in affiliation:
|
||
|
if 'corresponding author' in org.lower():
|
||
|
corresponing = True
|
||
|
if len(affiliation)>0:
|
||
|
ror_links = [affro(org) for org in affiliation]
|
||
|
matchings = [inner_ror for outer_ror in ror_links for inner_ror in outer_ror]
|
||
|
matchings = remove_duplicates(matchings)
|
||
|
|
||
|
else:
|
||
|
affiliation = []
|
||
|
matchings = []
|
||
|
|
||
|
if len(matchings)>0:
|
||
|
authors.append({'Name' : name, 'Corresponding' : corresponing, 'Contributor_roles' : contributor_roles, 'Raw_affiliations' : affiliation, 'Matchings':matchings})
|
||
|
|
||
|
|
||
|
collect_organizations = [author['Matchings'] for author in authors]
|
||
|
organizations = [inner_ror for outer_ror in collect_organizations for inner_ror in outer_ror]
|
||
|
organizations = remove_duplicates(organizations)
|
||
|
|
||
|
if len(authors)>0:
|
||
|
result = {'DOI' : doi, 'Authors' : authors, 'Organizations' : organizations}
|
||
|
|
||
|
return result
|
||
|
except Exception as e:
|
||
|
print(f"Error processing record with id {record['DOI']} : {str(e)}")
|
||
|
|
||
|
|
||
|
df = spark.read.option("mode", "PERMISSIVE").parquet(folder_path)
|
||
|
df_parsed = df.withColumn("json_parsed", from_json(col("json"), json_schema))
|
||
|
|
||
|
|
||
|
updated_rdd = df_parsed.rdd.map(lambda row: datacite_affro(row.asDict()))
|
||
|
|
||
|
filtered_rdd = updated_rdd.filter(lambda record: record is not None and record != {})
|
||
|
|
||
|
# Convert updated RDD to JSON strings
|
||
|
json_rdd = filtered_rdd.map(lambda record: json.dumps(record))
|
||
|
|
||
|
json_rdd.saveAsTextFile(hdfs_output_path)
|