Update scripts to be integrated as a workflow in the provision pipeline

This commit is contained in:
Serafeim Chatzopoulos 2024-09-09 14:21:57 +03:00
parent 0c98ba76a6
commit eae8412a16
3 changed files with 16 additions and 38 deletions

View File

@ -5,10 +5,10 @@ from matching_cluster import *
from create_input_cluster import * from create_input_cluster import *
import json import json
dix_org = load_json('dictionaries/dix_acad.json') dix_org = load_json('dix_acad.json')
dix_mult = load_json('dictionaries/dix_mult.json') dix_mult = load_json('dix_mult.json')
dix_city = load_json('dictionaries/dix_city.json') dix_city = load_json('dix_city.json')
dix_country = load_json('dictionaries/dix_country.json') dix_country = load_json('dix_country.json')
def affro(raw_aff_string): def affro(raw_aff_string):

View File

@ -32,12 +32,12 @@ def replace_double_consonants(text):
result = re.sub(pattern, r'\1', text, flags=re.IGNORECASE) result = re.sub(pattern, r'\1', text, flags=re.IGNORECASE)
return result return result
remove_list = [replace_double_consonants(x) for x in load_txt('txt_files/remove_list.txt')] remove_list = [replace_double_consonants(x) for x in load_txt('remove_list.txt')]
stop_words = load_txt('txt_files/stop_words.txt') stop_words = load_txt('stop_words.txt')
university_terms = [replace_double_consonants(x) for x in load_txt('txt_files/university_terms.txt')] university_terms = [replace_double_consonants(x) for x in load_txt('university_terms.txt')]
city_names = [replace_double_consonants(x) for x in load_txt('txt_files/city_names.txt')] city_names = [replace_double_consonants(x) for x in load_txt('city_names.txt')]
categ_dicts = load_json('dictionaries/dix_categ.json') categ_dicts = load_json('dix_categ.json')
def is_contained(s, w): def is_contained(s, w):

View File

@ -4,18 +4,11 @@ from pyspark.sql import SparkSession
from affro_cluster import * from affro_cluster import *
folder_path = '/user/zeppelin/miriam.baglioni/AffStringFromIISDataset2' folder_path = '/user/zeppelin/miriam.baglioni/AffStringFromIISDataset2'
hdfs_output_path = '/tmp/affro/results'
#folder_path = 'check' #folder_path = 'check'
json_file_names = [] json_file_names = []
# Loop through all files in the directory
for file_name in os.listdir(folder_path):
# Check if the file is a JSON file (you can adjust the check as needed)
if file_name != '_SUCCESS':
json_file_names.append(file_name)
# json_file_names now contains the names of all JSON files in the folder
# Initialize Spark session # Initialize Spark session
spark = SparkSession.builder.appName("JSONProcessing").getOrCreate() spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
@ -90,27 +83,12 @@ def update_record(record):
print(f"Error processing record with id {record.get('id')}: {str(e)}") print(f"Error processing record with id {record.get('id')}: {str(e)}")
return None return None
df = spark.read.json(folder_path)
# Apply the update_record function
updated_rdd = df.rdd.map(lambda row: update_record(row.asDict()))
for file in json_file_names: # Convert updated RDD to JSON strings
print('start processing '+str(file)) json_rdd = updated_rdd.map(lambda record: json.dumps(record))
df = spark.read.json(folder_path + '/' + file)
# Apply the update_record function json_rdd.saveAsTextFile(hdfs_output_path)
updated_rdd = df.rdd.map(lambda row: update_record(row.asDict()))
# Convert updated RDD to JSON strings
json_rdd = updated_rdd.map(lambda record: json.dumps(record))
# Collect the data and write to an output file with a unique name
json_data = json_rdd.collect()
# Create a new filename by appending "_output.json" to the original filename (without extension)
output_file_name = file+'_output.json'
print('end processing '+str(file))
with open(output_file_name, 'w') as f:
for i, item in enumerate(json_data):
print('write '+str(i))
f.write(item + '\n')