import json from pyspark.sql import SparkSession from pyspark.sql.functions import col, from_json from pyspark.sql.types import StructType, StructField, StringType, ArrayType from affro_cluster import * import sys folder_path = sys.argv[1] hdfs_output_path = sys.argv[2] # Initialize Spark session spark = SparkSession.builder.appName("AffRo-DataCite").getOrCreate() json_schema = StructType([ StructField("doi", StringType(), True), StructField("attributes", StructType([ StructField("doi", StringType(), True), StructField("identifiers", ArrayType(StringType()), True), StructField("creators", ArrayType(StructType([ StructField("name", StringType(), True), StructField("givenName", StringType(), True), StructField("familyName", StringType(), True), StructField("nameType", StringType(), True), StructField("affiliation", ArrayType(StringType()), True), StructField("nameIdentifiers", ArrayType(StringType()), True) ])), True), ]), True), ]) def remove_duplicates(list_of_dicts): # Use a set to store tuples of dictionary items to filter out duplicates seen = set() unique_list_of_dicts = [] for d in list_of_dicts: # Convert the dictionary to a tuple of items items = tuple(d.items()) if items not in seen: seen.add(items) unique_list_of_dicts.append(d) return unique_list_of_dicts def datacite_affro(record): try: doi = record['doi'] result = {} authors = [] for creator in record['json_parsed']['attributes']['creators']: name = {} name['Full'] = creator['name'] corresponing = False contributor_roles = None matchings = [] try: name['First'] = creator['givenName'] except Exception as e: name['First'] = None try: name['Last'] = creator['familyName'] except: name['Last'] = None try: name['Type'] = creator['nameType'] except: name['Type'] = None if 'affiliation' in creator: affiliation = creator['affiliation'] for org in affiliation: if 'corresponding author' in org.lower(): corresponing = True if len(affiliation)>0: ror_links = [affro(org) for org in affiliation] matchings = [inner_ror for outer_ror in ror_links for inner_ror in outer_ror] matchings = remove_duplicates(matchings) else: affiliation = [] matchings = [] if len(matchings)>0: authors.append({'Name' : name, 'Corresponding' : corresponing, 'Contributor_roles' : contributor_roles, 'Raw_affiliations' : affiliation, 'Matchings':matchings}) collect_organizations = [author['Matchings'] for author in authors] organizations = [inner_ror for outer_ror in collect_organizations for inner_ror in outer_ror] organizations = remove_duplicates(organizations) if len(authors)>0: result = {'DOI' : doi, 'Authors' : authors, 'Organizations' : organizations} return result except Exception as e: print(f"Error processing record with id {record['DOI']} : {str(e)}") df = spark.read.option("mode", "PERMISSIVE").parquet(folder_path) df_parsed = df.withColumn("json_parsed", from_json(col("json"), json_schema)) updated_rdd = df_parsed.rdd.map(lambda row: datacite_affro(row.asDict())) filtered_rdd = updated_rdd.filter(lambda record: record is not None and record != {}) # Convert updated RDD to JSON strings json_rdd = filtered_rdd.map(lambda record: json.dumps(record)) json_rdd.saveAsTextFile(hdfs_output_path)