import json import os from pyspark.sql import SparkSession from affro_cluster import * import sys folder_path = sys.argv[1] hdfs_output_path = sys.argv[2] json_file_names = [] # Loop through all files in the directory for file_name in os.listdir(folder_path): # Check if the file is a JSON file (you can adjust the check as needed) if file_name != '_SUCCESS': json_file_names.append(file_name) # Initialize Spark session spark = SparkSession.builder.appName("JSONProcessing").getOrCreate() def remove_duplicates(list_of_dicts): # Use a set to store tuples of dictionary items to filter out duplicates seen = set() unique_list_of_dicts = [] for d in list_of_dicts: # Convert the dictionary to a tuple of items items = tuple(d.items()) if items not in seen: seen.add(items) unique_list_of_dicts.append(d) return unique_list_of_dicts def update_record(record): id = record['id'] authors = [] try: for author in record['authors']: author_object = {} if 'orcid.org/0' in author['fullName']: author_object['Name'] = {'Full':author['fullName'].split(',')[1].strip(), 'First' : 'None', 'Last' : 'None', 'Type' : 'None'} author_object['ORCID'] = author['fullName'].split(',')[0][:36] else: author_object['Name'] = {'Full':author['fullName'].strip(), 'First' : 'None', 'Last' : 'None', 'Type' : 'None'} author_object['ORCID'] = 'None' author_object['Raw_affiliations'] = [affiliation['raw_affiliation_string'] for affiliation in author['affiliations']] all_affs_with_ror = [] have_id = False for affiliation in author['affiliations']: # author_object['Raw_affiliations'] = [x for x in affiliation['raw_affiliation_string']] if 'ORCID: 0' in affiliation['raw_affiliation_string']: x = affiliation['raw_affiliation_string'] author_object['ORCID'] = 'https://orcid.org/'+x.split('ORCID: ')[1] elif 'ORCID 0' in affiliation['raw_affiliation_string']: x = affiliation['raw_affiliation_string'] author_object['ORCID'] = 'https://orcid.org/'+x.split('ORCID ')[1] if 'ror.org' in affiliation['raw_affiliation_string']: have_id = True all_affs_with_ror.append({ 'Origin': 'data', 'RORid': affiliation['raw_affiliation_string'][0:25], 'Confidence': None }) elif 'grid.' in affiliation['raw_affiliation_string']: have_id = True for k in dix_grids: if k in affiliation['raw_affiliation_string'].split(' ')[0]: try: all_affs_with_ror.append({ 'Provenance': 'Data', 'PID' : 'ROR', 'Value' : dix_grids[k] , 'Confidence': 1 }) except: pass else: if len(affro(affiliation['raw_affiliation_string']))>0: author_object['Matchings'] = affro(affiliation['raw_affiliation_string']) try: author_object['Matchings'] = remove_duplicates([json.loads(x) for x in author_object['Matchings']]) except: author_object['Matchings'] = remove_duplicates([x for x in author_object['Matchings']]) else: author_object['Matchings'] = [] if have_id == True: author_object['Matchings'] = all_affs_with_ror order = ["Name", "Raw_affiliations", "Matchings", "ORCID"] reordered_data = {k: author_object[k] for k in order} authors.append(reordered_data) organizations = remove_duplicates([x for author in authors for x in author['Matchings']]) updt = {'ID' : id, 'Authors' : authors, 'Organizations' : organizations} return updt except Exception as e: print(f"Error processing record with id {record.get('id')}: {str(e)}") return None for file in json_file_names: print('start processing '+str(file)) df = spark.read.json(folder_path + '/' + file) # Apply the update_record function updated_rdd = df.rdd.map(lambda row: update_record(row.asDict())) # Convert updated RDD to JSON strings json_rdd = updated_rdd.map(lambda record: json.dumps(record)) # Collect the data and write to an output file with a unique name json_data = json_rdd.collect() # Create a new filename by appending "_output.json" to the original filename (without extension) output_file_name = hdfs_output_path + "/" + file+'_output.json' print('end processing '+str(file)) with open(output_file_name, 'w') as f: for i, item in enumerate(json_data): print('write '+str(i)) f.write(item + '\n')