From eae8412a160a6df40ece85f6602954df1b64bf9e Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Mon, 9 Sep 2024 14:21:57 +0300 Subject: [PATCH] Update scripts to be integrated as a workflow in the provision pipeline --- affro_cluster.py | 8 ++++---- functions_cluster.py | 10 +++++----- update_records.py | 36 +++++++----------------------------- 3 files changed, 16 insertions(+), 38 deletions(-) diff --git a/affro_cluster.py b/affro_cluster.py index 7d59a2b..f5cfbe9 100644 --- a/affro_cluster.py +++ b/affro_cluster.py @@ -5,10 +5,10 @@ from matching_cluster import * from create_input_cluster import * import json -dix_org = load_json('dictionaries/dix_acad.json') -dix_mult = load_json('dictionaries/dix_mult.json') -dix_city = load_json('dictionaries/dix_city.json') -dix_country = load_json('dictionaries/dix_country.json') +dix_org = load_json('dix_acad.json') +dix_mult = load_json('dix_mult.json') +dix_city = load_json('dix_city.json') +dix_country = load_json('dix_country.json') def affro(raw_aff_string): diff --git a/functions_cluster.py b/functions_cluster.py index 9562755..b139e4c 100644 --- a/functions_cluster.py +++ b/functions_cluster.py @@ -32,12 +32,12 @@ def replace_double_consonants(text): result = re.sub(pattern, r'\1', text, flags=re.IGNORECASE) return result -remove_list = [replace_double_consonants(x) for x in load_txt('txt_files/remove_list.txt')] -stop_words = load_txt('txt_files/stop_words.txt') -university_terms = [replace_double_consonants(x) for x in load_txt('txt_files/university_terms.txt')] -city_names = [replace_double_consonants(x) for x in load_txt('txt_files/city_names.txt')] +remove_list = [replace_double_consonants(x) for x in load_txt('remove_list.txt')] +stop_words = load_txt('stop_words.txt') +university_terms = [replace_double_consonants(x) for x in load_txt('university_terms.txt')] +city_names = [replace_double_consonants(x) for x in load_txt('city_names.txt')] -categ_dicts = load_json('dictionaries/dix_categ.json') +categ_dicts = load_json('dix_categ.json') def is_contained(s, w): diff --git a/update_records.py b/update_records.py index 04278ab..f65e5a0 100644 --- a/update_records.py +++ b/update_records.py @@ -4,18 +4,11 @@ from pyspark.sql import SparkSession from affro_cluster import * folder_path = '/user/zeppelin/miriam.baglioni/AffStringFromIISDataset2' +hdfs_output_path = '/tmp/affro/results' #folder_path = 'check' json_file_names = [] -# Loop through all files in the directory -for file_name in os.listdir(folder_path): - # Check if the file is a JSON file (you can adjust the check as needed) - if file_name != '_SUCCESS': - json_file_names.append(file_name) - -# json_file_names now contains the names of all JSON files in the folder - # Initialize Spark session spark = SparkSession.builder.appName("JSONProcessing").getOrCreate() @@ -90,27 +83,12 @@ def update_record(record): print(f"Error processing record with id {record.get('id')}: {str(e)}") return None +df = spark.read.json(folder_path) +# Apply the update_record function +updated_rdd = df.rdd.map(lambda row: update_record(row.asDict())) -for file in json_file_names: - print('start processing '+str(file)) - df = spark.read.json(folder_path + '/' + file) +# Convert updated RDD to JSON strings +json_rdd = updated_rdd.map(lambda record: json.dumps(record)) - # Apply the update_record function - updated_rdd = df.rdd.map(lambda row: update_record(row.asDict())) - - # Convert updated RDD to JSON strings - json_rdd = updated_rdd.map(lambda record: json.dumps(record)) - - # Collect the data and write to an output file with a unique name - json_data = json_rdd.collect() - - # Create a new filename by appending "_output.json" to the original filename (without extension) - output_file_name = file+'_output.json' - print('end processing '+str(file)) - - with open(output_file_name, 'w') as f: - for i, item in enumerate(json_data): - print('write '+str(i)) - - f.write(item + '\n') +json_rdd.saveAsTextFile(hdfs_output_path)