Compare commits
19 Commits
main
...
openaire-w
Author | SHA1 | Date |
---|---|---|
Myrto Kallipoliti | 44f0f9987f | |
Miriam Baglioni | ad691c28c2 | |
Miriam Baglioni | 2806511e02 | |
Miriam Baglioni | 0043e4051f | |
Miriam Baglioni | a59d0ce9fc | |
Miriam Baglioni | e2f8007433 | |
Miriam Baglioni | f8479083f2 | |
Miriam Baglioni | 9440f863c9 | |
Miriam Baglioni | f78456288c | |
Miriam Baglioni | 997f2e492f | |
Miriam Baglioni | 982a1b0b9f | |
Miriam Baglioni | 4fe3d31ed5 | |
Miriam Baglioni | efa4db4e52 | |
Miriam Baglioni | ea2e27a9f4 | |
Miriam Baglioni | e33bf4ef14 | |
Miriam Baglioni | f4704aef4d | |
Miriam Baglioni | 0500fc586f | |
Miriam Baglioni | 5568aa92ec | |
Miriam Baglioni | 600ddf8087 |
|
@ -5,15 +5,15 @@ from matching_cluster import *
|
||||||
from create_input_cluster import *
|
from create_input_cluster import *
|
||||||
import json
|
import json
|
||||||
|
|
||||||
dix_org = load_json('dictionaries/dix_acad.json')
|
dix_org = load_json('dix_acad.json')
|
||||||
dix_mult = load_json('dictionaries/dix_mult.json')
|
dix_mult = load_json('dix_mult.json')
|
||||||
dix_city = load_json('dictionaries/dix_city.json')
|
dix_city = load_json('dix_city.json')
|
||||||
dix_country = load_json('dictionaries/dix_country.json')
|
dix_country = load_json('dix_country.json')
|
||||||
dix_org_oaire = load_json('dictionaries/dix_acad_oaire.json')
|
dix_org_oaire = load_json('dix_acad_oaire.json')
|
||||||
dix_mult_oaire = load_json('dictionaries/dix_mult_oaire.json')
|
dix_mult_oaire = load_json('dix_mult_oaire.json')
|
||||||
dix_country_oaire = load_json('dictionaries/dix_country_oaire.json')
|
dix_country_oaire = load_json('dix_country_oaire.json')
|
||||||
dix_status = load_json('dictionaries/dix_status.json')
|
dix_status = load_json('dix_status.json')
|
||||||
dix_grids = load_json('dictionaries/dix_grids_rors.json')
|
dix_grids = load_json('dix_grids_rors.json')
|
||||||
|
|
||||||
|
|
||||||
def find_ror(input, simU, simG):
|
def find_ror(input, simU, simG):
|
||||||
|
|
|
@ -27,12 +27,12 @@ def replace_double_consonants(text):
|
||||||
result = re.sub(pattern, r'\1', text, flags=re.IGNORECASE)
|
result = re.sub(pattern, r'\1', text, flags=re.IGNORECASE)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
remove_list = [replace_double_consonants(x) for x in load_txt('txt_files/remove_list.txt')]
|
remove_list = [replace_double_consonants(x) for x in load_txt('remove_list.txt')]
|
||||||
stop_words = load_txt('txt_files/stop_words.txt')
|
stop_words = load_txt('stop_words.txt')
|
||||||
university_terms = [replace_double_consonants(x) for x in load_txt('txt_files/university_terms.txt')]
|
university_terms = [replace_double_consonants(x) for x in load_txt('university_terms.txt')]
|
||||||
city_names = [replace_double_consonants(x) for x in load_txt('txt_files/city_names.txt')]
|
city_names = [replace_double_consonants(x) for x in load_txt('city_names.txt')]
|
||||||
|
|
||||||
categ_dicts = load_json('dictionaries/dix_categ.json')
|
categ_dicts = load_json('dix_categ.json')
|
||||||
|
|
||||||
|
|
||||||
def is_contained(s, w):
|
def is_contained(s, w):
|
||||||
|
|
97
strings.py
97
strings.py
|
@ -1,8 +1,11 @@
|
||||||
import json
|
import time
|
||||||
|
|
||||||
|
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, DoubleType
|
||||||
|
from threading import Thread
|
||||||
from affro_cluster import *
|
from affro_cluster import *
|
||||||
import os
|
|
||||||
from pyspark.sql import SparkSession
|
from pyspark.sql import SparkSession
|
||||||
from pyspark.sql.functions import col
|
from pyspark.sql.functions import col, explode, first, collect_list, udf, collect_set
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
|
spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
|
||||||
|
@ -10,31 +13,85 @@ spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
|
||||||
folder_path = sys.argv[1]
|
folder_path = sys.argv[1]
|
||||||
hdfs_output_path = sys.argv[2]
|
hdfs_output_path = sys.argv[2]
|
||||||
|
|
||||||
# folder_path = '/user/zeppelin/affiliations/raw_aff_string/2024-08'
|
matchings_schema = ArrayType(
|
||||||
# hdfs_output_path = 'tmp/affro/results_strings'
|
StructType([
|
||||||
|
StructField("Provenance", StringType(), nullable=True),
|
||||||
|
StructField("PID", StringType(), nullable=True),
|
||||||
|
StructField("Value", StringType(), nullable=True),
|
||||||
|
StructField("Confidence", DoubleType(), nullable=True),
|
||||||
|
StructField("Status", StringType(), nullable=True)
|
||||||
|
])
|
||||||
|
)
|
||||||
|
|
||||||
|
operation_counter = spark.sparkContext.accumulator(0)
|
||||||
|
|
||||||
def oalex_affro(record):
|
#Version of affro application on a single raw_aff_string and returns just the Matchins set
|
||||||
doi = record['doi'][16:]
|
def oalex_affro(aff_string):
|
||||||
oalex = record['rors']
|
global operation_counter
|
||||||
try:
|
try:
|
||||||
matchings = [item for sublist in [affro(x) for x in record['raw_aff_string']] for item in (sublist if isinstance(sublist, list) else [sublist])]
|
matchings = affro(aff_string)
|
||||||
|
operation_counter += 1
|
||||||
|
# Ensure matchings is a list, even if affro returns a single dict
|
||||||
|
if not isinstance(matchings, list):
|
||||||
|
matchings = [matchings]
|
||||||
|
|
||||||
|
# Create the result as a tuple that matches matchings_schema
|
||||||
|
result = []
|
||||||
|
for matching in matchings:
|
||||||
|
# Assuming 'matching' is a dictionary that contains 'Provenance', 'PID', 'Value', 'Confidence', 'Status'
|
||||||
|
result.append((
|
||||||
|
matching.get("Provenance", None),
|
||||||
|
matching.get("PID", None),
|
||||||
|
matching.get("Value", None),
|
||||||
|
float(matching.get("Confidence", None)),
|
||||||
|
matching.get("Status", None)
|
||||||
|
))
|
||||||
|
|
||||||
result = {'DOI' : doi, 'OAlex' : oalex, 'Matchings': matchings}
|
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error processing record with id {record.get('doi')}: {str(e)}")
|
print(f"Error processing affiliation string {aff_string}: {str(e)}")
|
||||||
return None
|
return ()
|
||||||
|
|
||||||
|
oalex_affro_udf = udf(oalex_affro, matchings_schema)
|
||||||
|
monitor_done = False
|
||||||
|
|
||||||
df = spark.read.json(folder_path)
|
def monitor_counter(interval):
|
||||||
filtered_df = df.filter(col("doi").isNotNull())
|
while True:
|
||||||
updated_rdd = filtered_df.rdd.map(lambda row: oalex_affro(row.asDict()))
|
print(f"Number of calls to AffRo: {operation_counter.value}")
|
||||||
|
time.sleep(interval)
|
||||||
|
if monitor_done:
|
||||||
|
break
|
||||||
|
|
||||||
json_rdd = updated_rdd.map(lambda record: json.dumps(record))
|
exploded = spark.read.json(folder_path) \
|
||||||
|
.filter(col("doi").isNotNull()) \
|
||||||
|
.select(
|
||||||
|
col("doi").alias("DOI"),
|
||||||
|
col("rors").alias("OAlex"),
|
||||||
|
explode(col("raw_aff_string")).alias("affiliation") #this allows to split all the raw_aff_string and to parallelize better
|
||||||
|
)
|
||||||
|
|
||||||
|
monitor_thread = Thread(target=monitor_counter, args=(600,), daemon=True)
|
||||||
|
monitor_thread.start()
|
||||||
|
|
||||||
json_rdd.saveAsTextFile(hdfs_output_path)
|
affs = exploded \
|
||||||
|
.select("affiliation") \
|
||||||
|
.distinct() \
|
||||||
|
.withColumn("Matchings", oalex_affro_udf(col("affiliation")))
|
||||||
|
|
||||||
|
affs.join(exploded, on="affiliation") \
|
||||||
|
.select(col("DOI"),
|
||||||
|
col("OAlex"),
|
||||||
|
explode(col("Matchings")).alias("match")
|
||||||
|
) \
|
||||||
|
.groupBy("DOI") \
|
||||||
|
.agg(first("OAlex").alias("OAlex"), #for each DOI it says what are the other columns Since OALEX is equal for each doi just select the first, while use the collect_list function to aggregate the Matchings
|
||||||
|
collect_set("match").alias("Matchings") #each exploded match is collected again
|
||||||
|
) \
|
||||||
|
.write \
|
||||||
|
.mode("overwrite") \
|
||||||
|
.option("compression","gzip") \
|
||||||
|
.json(hdfs_output_path)
|
||||||
|
|
||||||
|
monitor_done = True
|
||||||
|
monitor_thread.join()
|
|
@ -3,9 +3,10 @@ import os
|
||||||
from pyspark.sql import SparkSession
|
from pyspark.sql import SparkSession
|
||||||
from affro_cluster import *
|
from affro_cluster import *
|
||||||
|
|
||||||
folder_path ='/Users/myrto/Documents/openAIRE/7. matching/data_samples/iis_short'
|
import sys
|
||||||
|
|
||||||
#folder_path = 'check'
|
folder_path = sys.argv[1]
|
||||||
|
hdfs_output_path = sys.argv[2]
|
||||||
|
|
||||||
json_file_names = []
|
json_file_names = []
|
||||||
|
|
||||||
|
@ -106,8 +107,6 @@ def update_record(record):
|
||||||
print(f"Error processing record with id {record.get('id')}: {str(e)}")
|
print(f"Error processing record with id {record.get('id')}: {str(e)}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
for file in json_file_names:
|
for file in json_file_names:
|
||||||
print('start processing '+str(file))
|
print('start processing '+str(file))
|
||||||
df = spark.read.json(folder_path + '/' + file)
|
df = spark.read.json(folder_path + '/' + file)
|
||||||
|
@ -122,7 +121,7 @@ for file in json_file_names:
|
||||||
json_data = json_rdd.collect()
|
json_data = json_rdd.collect()
|
||||||
|
|
||||||
# Create a new filename by appending "_output.json" to the original filename (without extension)
|
# Create a new filename by appending "_output.json" to the original filename (without extension)
|
||||||
output_file_name = file+'_output.json'
|
output_file_name = hdfs_output_path + "/" + file+'_output.json'
|
||||||
print('end processing '+str(file))
|
print('end processing '+str(file))
|
||||||
|
|
||||||
with open(output_file_name, 'w') as f:
|
with open(output_file_name, 'w') as f:
|
||||||
|
|
Loading…
Reference in New Issue