updates -datacite
This commit is contained in:
parent
ba98a16bcb
commit
413ec3773e
|
@ -0,0 +1,111 @@
|
|||
import json
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark.sql.functions import col, from_json
|
||||
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
|
||||
from affro_cluster import *
|
||||
|
||||
import sys
|
||||
|
||||
folder_path = sys.argv[1]
|
||||
hdfs_output_path = sys.argv[2]
|
||||
|
||||
|
||||
# Initialize Spark session
|
||||
spark = SparkSession.builder.appName("AffRo-DataCite").getOrCreate()
|
||||
|
||||
json_schema = StructType([
|
||||
StructField("doi", StringType(), True),
|
||||
StructField("attributes", StructType([
|
||||
StructField("doi", StringType(), True),
|
||||
StructField("identifiers", ArrayType(StringType()), True),
|
||||
StructField("creators", ArrayType(StructType([
|
||||
StructField("name", StringType(), True),
|
||||
StructField("givenName", StringType(), True),
|
||||
StructField("familyName", StringType(), True),
|
||||
StructField("nameType", StringType(), True),
|
||||
StructField("affiliation", ArrayType(StringType()), True),
|
||||
StructField("nameIdentifiers", ArrayType(StringType()), True)
|
||||
])), True),
|
||||
]), True),
|
||||
])
|
||||
|
||||
def remove_duplicates(list_of_dicts):
|
||||
# Use a set to store tuples of dictionary items to filter out duplicates
|
||||
seen = set()
|
||||
unique_list_of_dicts = []
|
||||
|
||||
for d in list_of_dicts:
|
||||
# Convert the dictionary to a tuple of items
|
||||
items = tuple(d.items())
|
||||
if items not in seen:
|
||||
seen.add(items)
|
||||
unique_list_of_dicts.append(d)
|
||||
|
||||
return unique_list_of_dicts
|
||||
|
||||
|
||||
def datacite_affro(record):
|
||||
try:
|
||||
doi = record['doi']
|
||||
result = {}
|
||||
authors = []
|
||||
for creator in record['json_parsed']['attributes']['creators']:
|
||||
name = {}
|
||||
name['Full'] = creator['name']
|
||||
corresponing = False
|
||||
contributor_roles = None
|
||||
matchings = []
|
||||
try:
|
||||
name['First'] = creator['givenName']
|
||||
except Exception as e:
|
||||
name['First'] = None
|
||||
try:
|
||||
name['Last'] = creator['familyName']
|
||||
except:
|
||||
name['Last'] = None
|
||||
try:
|
||||
name['Type'] = creator['nameType']
|
||||
except:
|
||||
name['Type'] = None
|
||||
if 'affiliation' in creator:
|
||||
affiliation = creator['affiliation']
|
||||
for org in affiliation:
|
||||
if 'corresponding author' in org.lower():
|
||||
corresponing = True
|
||||
if len(affiliation)>0:
|
||||
ror_links = [affro(org) for org in affiliation]
|
||||
matchings = [inner_ror for outer_ror in ror_links for inner_ror in outer_ror]
|
||||
matchings = remove_duplicates(matchings)
|
||||
|
||||
else:
|
||||
affiliation = []
|
||||
matchings = []
|
||||
|
||||
if len(matchings)>0:
|
||||
authors.append({'Name' : name, 'Corresponding' : corresponing, 'Contributor_roles' : contributor_roles, 'Raw_affiliations' : affiliation, 'Matchings':matchings})
|
||||
|
||||
|
||||
collect_organizations = [author['Matchings'] for author in authors]
|
||||
organizations = [inner_ror for outer_ror in collect_organizations for inner_ror in outer_ror]
|
||||
organizations = remove_duplicates(organizations)
|
||||
|
||||
if len(authors)>0:
|
||||
result = {'DOI' : doi, 'Authors' : authors, 'Organizations' : organizations}
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
print(f"Error processing record with id {record['DOI']} : {str(e)}")
|
||||
|
||||
|
||||
df = spark.read.option("mode", "PERMISSIVE").parquet(folder_path)
|
||||
df_parsed = df.withColumn("json_parsed", from_json(col("json"), json_schema))
|
||||
|
||||
|
||||
updated_rdd = df_parsed.rdd.map(lambda row: datacite_affro(row.asDict()))
|
||||
|
||||
filtered_rdd = updated_rdd.filter(lambda record: record is not None and record != {})
|
||||
|
||||
# Convert updated RDD to JSON strings
|
||||
json_rdd = filtered_rdd.map(lambda record: json.dumps(record))
|
||||
|
||||
json_rdd.saveAsTextFile(hdfs_output_path)
|
|
@ -5,13 +5,14 @@ at
|
|||
de
|
||||
for
|
||||
et
|
||||
für
|
||||
des
|
||||
in
|
||||
as
|
||||
a
|
||||
and
|
||||
fur
|
||||
for
|
||||
und
|
||||
general
|
||||
afiliated
|
||||
zu
|
||||
der
|
||||
aus
|
||||
dem
|
||||
|
|
Loading…
Reference in New Issue