Compare commits

...

39 Commits

Author SHA1 Message Date
Myrto Kallipoliti 44f0f9987f Merge pull request 'Oalex' (#13) from openaire-workflow-ready_2 into openaire-workflow-ready
Reviewed-on: #13
2024-12-09 18:51:22 +01:00
Miriam Baglioni ad691c28c2 [oalex] change to add a thread to monitor the number of operations done by affro up to a certain point 2024-12-06 10:19:53 +01:00
Miriam Baglioni 2806511e02 [oalex] change collec_list to collect_set so that the same match will be there just one time 2024-12-05 21:26:08 +01:00
Miriam Baglioni 0043e4051f [oalex] renaming 2024-12-05 18:44:06 +01:00
Miriam Baglioni a59d0ce9fc [oalex] avoid redefinition of explode function 2024-12-05 18:41:16 +01:00
Miriam Baglioni e2f8007433 [oalex] added fix 2024-12-05 16:50:10 +01:00
Miriam Baglioni f8479083f2 [oalex] pasing the schema to avoid changing in confidence type 2024-12-05 16:44:17 +01:00
Miriam Baglioni 9440f863c9 [oalex] changed implementation passing throguh rdd to avoi calling udf function 2024-12-05 16:36:38 +01:00
Miriam Baglioni f78456288c [oalex] fix issue 2024-12-05 12:54:10 +01:00
Miriam Baglioni 997f2e492f [oalex] change the call of the function in the dataframe 2024-12-05 12:35:59 +01:00
Miriam Baglioni 982a1b0b9f [oalex] change the call of the function in the dataframe 2024-12-05 12:21:21 +01:00
Miriam Baglioni 4fe3d31ed5 [oalex] register the UDF oalex_affro and the schema of the output to be used in the dataframe by pyspark 2024-12-05 12:18:45 +01:00
Miriam Baglioni efa4db4e52 [oalex] execute affRo on distinct affilitaion_strings 2024-12-05 12:02:40 +01:00
Miriam Baglioni ea2e27a9f4 [oalex] fix python syntax errors 2024-12-05 11:22:10 +01:00
Miriam Baglioni e33bf4ef14 [oalex] proposal to higher the parallelization 2024-12-05 10:39:00 +01:00
Miriam Baglioni f4704aef4d [oalex] proposal to higher the parallelization 2024-12-05 10:27:32 +01:00
Miriam Baglioni 0500fc586f Added input/output path as parameters 2024-12-04 15:14:58 +01:00
Miriam Baglioni 5568aa92ec Remove from path 2024-12-03 16:54:47 +01:00
Miriam Baglioni 600ddf8087 Remove directory name
Change to make the file discoverable on the cluster
2024-12-03 16:45:57 +01:00
mkallipo 03dc19fd3b add gitignore 2024-12-01 20:04:32 +01:00
mkallipo d9dbc679e3 updates 2024-12-01 20:00:49 +01:00
mkallipo 413ec3773e updates -datacite 2024-11-21 13:32:50 +01:00
mkallipo ba98a16bcb updates -openorgs 2024-11-21 12:39:26 +01:00
mkallipo 415b45e3ca updates 2024-10-28 11:13:55 +01:00
mkallipo 8c6f6a5a9a crosserf 2024-10-24 09:32:08 +02:00
mkallipo b4f79adc56 path 2024-10-18 13:19:41 +02:00
mkallipo 90426a6d29 path 2024-10-18 13:12:00 +02:00
mkallipo ad656121ed arguments 2024-10-18 10:48:18 +02:00
mkallipo ca6e8ad3b9 . 2024-10-16 13:29:39 +02:00
mkallipo 8325c94e56 strings.py 2024-10-16 12:42:51 +02:00
mkallipo 5795ec6493 general, afiliated stopwords 2024-10-07 11:45:41 +02:00
mkallipo 57569fbb3b dix_acad, zu stopword 2024-10-07 11:39:21 +02:00
mkallipo 968ecf9680 multi 2024-10-07 11:35:15 +02:00
mkallipo 2c6e7b7a70 multi 2024-10-07 11:25:16 +02:00
mkallipo 9473c30a09 dictionaries 2024-10-06 22:09:42 +02:00
mkallipo bace694d21 updates 2024-09-19 21:37:28 +02:00
mkallipo a7b703b67d updates german terms, / 2024-09-17 12:06:29 +02:00
mkallipo b38be012a0 updates abbr 2024-09-16 12:20:37 +02:00
mkallipo fbf55b3d5d redirection of non active ror ids 2024-09-12 15:56:26 +02:00
21 changed files with 480 additions and 91 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
# Ignore macOS system files
.DS_Store
# Ignore Python cache files
__pycache__/

View File

@ -5,27 +5,59 @@ from matching_cluster import *
from create_input_cluster import *
import json
dix_org = load_json('dictionaries/dix_acad.json')
dix_mult = load_json('dictionaries/dix_mult.json')
dix_city = load_json('dictionaries/dix_city.json')
dix_country = load_json('dictionaries/dix_country.json')
dix_org = load_json('dix_acad.json')
dix_mult = load_json('dix_mult.json')
dix_city = load_json('dix_city.json')
dix_country = load_json('dix_country.json')
dix_org_oaire = load_json('dix_acad_oaire.json')
dix_mult_oaire = load_json('dix_mult_oaire.json')
dix_country_oaire = load_json('dix_country_oaire.json')
dix_status = load_json('dix_status.json')
dix_grids = load_json('dix_grids_rors.json')
def find_ror(input, simU, simG):
result = Aff_Ids(input, dix_org, dix_mult, dix_city, dix_country, simU, simG)
result_oaire = Aff_Ids(input, dix_org_oaire, dix_mult_oaire, dix_country_oaire, dix_country_oaire, simU, simG)
results_upd = []
for r in result:
if dix_status[r[2]][0] == 'active':
results_upd.append([r[1], 'ROR', r[2], 'active'])
else:
if dix_status[r[2]][1] == '':
results_upd.append([r[1], 'ROR', r[2], dix_status[r[2]][0]])
else:
results_upd.append([r[1], 'ROR', r[2], dix_status[r[2]][0]])
results_upd.append([r[1], 'ROR', dix_status[r[2]][1], 'active'])
for r in result_oaire:
results_upd.append([r[1],'OpenOrgs', r[2], None])
if len(results_upd)>0:
result_dict = [{'Provenance': 'AffRo', 'PID':'ROR', 'Value':x[2], 'Confidence':x[0], 'Status':x[3]} if x[1] == 'ROR' else {'Provenance': 'AffRo', 'PID':'OpenOrgs', 'Value':x[2], 'Confidence':x[0], 'Status': 'active'} for x in results_upd]
else:
result_dict = []
return result_dict
def affro(raw_aff_string):
try:
result = Aff_Ids(create_df_algorithm(raw_aff_string), dix_org, dix_mult, dix_city, dix_country, 0.65, 0.82)
if len(result)>0:
result_dict = [json.dumps({'Origin': 'affRo', 'RORid':x[2], 'Confidence':x[1]}) for x in result]
else:
result_dict = []
return result_dict
result = find_ror(create_df_algorithm(raw_aff_string), 0.65, 0.82)
return result
except Exception as e:
# Return some indication of an error, or log the row
print(f"Error: {str(e)}")
print(raw_aff_string)
pass
#raw_aff = 'university of california, los angeles, university of athens, university of california, san diego, university of athens, greece'
if __name__ == "__main__":

66
crossref.py Normal file
View File

@ -0,0 +1,66 @@
import json
from pyspark.sql import SparkSession
from affro_cluster import *
import sys
folder_path = sys.argv[1]
hdfs_output_path = sys.argv[2]
# Initialize Spark session
spark = SparkSession.builder.appName("AffRo-Crossref").getOrCreate()
def remove_duplicates(list_of_dicts):
# Use a set to store tuples of dictionary items to filter out duplicates
seen = set()
unique_list_of_dicts = []
for d in list_of_dicts:
# Convert the dictionary to a tuple of items
items = tuple(d.items())
if items not in seen:
seen.add(items)
unique_list_of_dicts.append(d)
return unique_list_of_dicts
def crossref_affro(record):
doi = record['DOI']
try:
for author in record['author']:
affiliations = []
if len(author['affiliation'])>0:
for organization in author['affiliation']:
try:
if organization['name'] not in affiliations:
affiliations.append(organization['name'])
except:
pass
if len(affiliations)>0:
affiliations = list(set(affiliations))
ror_links = [affro(affil) for affil in affiliations]
matchings = [inner_ror for outer_ror in ror_links for inner_ror in outer_ror]
matchings = remove_duplicates(matchings)
if len(matchings)>0:
result = {'DOI' : doi, 'Matchings' : matchings}
return result
except Exception as e:
print(f"Error processing record with id {record['DOI']} : {str(e)}")
df = spark.read.json(folder_path)
# Apply the update_record function
updated_rdd = df.rdd.map(lambda row: crossref_affro(row.asDict()))
filtered_rdd = updated_rdd.filter(lambda record: record is not None and record != {})
# Convert updated RDD to JSON strings
json_rdd = filtered_rdd.map(lambda record: json.dumps(record))
json_rdd.saveAsTextFile(hdfs_output_path)

111
datacite.py Normal file
View File

@ -0,0 +1,111 @@
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from affro_cluster import *
import sys
folder_path = sys.argv[1]
hdfs_output_path = sys.argv[2]
# Initialize Spark session
spark = SparkSession.builder.appName("AffRo-DataCite").getOrCreate()
json_schema = StructType([
StructField("doi", StringType(), True),
StructField("attributes", StructType([
StructField("doi", StringType(), True),
StructField("identifiers", ArrayType(StringType()), True),
StructField("creators", ArrayType(StructType([
StructField("name", StringType(), True),
StructField("givenName", StringType(), True),
StructField("familyName", StringType(), True),
StructField("nameType", StringType(), True),
StructField("affiliation", ArrayType(StringType()), True),
StructField("nameIdentifiers", ArrayType(StringType()), True)
])), True),
]), True),
])
def remove_duplicates(list_of_dicts):
# Use a set to store tuples of dictionary items to filter out duplicates
seen = set()
unique_list_of_dicts = []
for d in list_of_dicts:
# Convert the dictionary to a tuple of items
items = tuple(d.items())
if items not in seen:
seen.add(items)
unique_list_of_dicts.append(d)
return unique_list_of_dicts
def datacite_affro(record):
try:
doi = record['doi']
result = {}
authors = []
for creator in record['json_parsed']['attributes']['creators']:
name = {}
name['Full'] = creator['name']
corresponing = False
contributor_roles = None
matchings = []
try:
name['First'] = creator['givenName']
except Exception as e:
name['First'] = None
try:
name['Last'] = creator['familyName']
except:
name['Last'] = None
try:
name['Type'] = creator['nameType']
except:
name['Type'] = None
if 'affiliation' in creator:
affiliation = creator['affiliation']
for org in affiliation:
if 'corresponding author' in org.lower():
corresponing = True
if len(affiliation)>0:
ror_links = [affro(org) for org in affiliation]
matchings = [inner_ror for outer_ror in ror_links for inner_ror in outer_ror]
matchings = remove_duplicates(matchings)
else:
affiliation = []
matchings = []
if len(matchings)>0:
authors.append({'Name' : name, 'Corresponding' : corresponing, 'Contributor_roles' : contributor_roles, 'Raw_affiliations' : affiliation, 'Matchings':matchings})
collect_organizations = [author['Matchings'] for author in authors]
organizations = [inner_ror for outer_ror in collect_organizations for inner_ror in outer_ror]
organizations = remove_duplicates(organizations)
if len(authors)>0:
result = {'DOI' : doi, 'Authors' : authors, 'Organizations' : organizations}
return result
except Exception as e:
print(f"Error processing record with id {record['DOI']} : {str(e)}")
df = spark.read.option("mode", "PERMISSIVE").parquet(folder_path)
df_parsed = df.withColumn("json_parsed", from_json(col("json"), json_schema))
updated_rdd = df_parsed.rdd.map(lambda row: datacite_affro(row.asDict()))
filtered_rdd = updated_rdd.filter(lambda record: record is not None and record != {})
# Convert updated RDD to JSON strings
json_rdd = filtered_rdd.map(lambda record: json.dumps(record))
json_rdd.saveAsTextFile(hdfs_output_path)

BIN
dictionaries/.DS_Store vendored Normal file

Binary file not shown.

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1 +1 @@
{"research": "Univ/Inst", "uniwersytet": "Univ/Inst", "investigacions": "Univ/Inst", "institu": "Univ/Inst", "istitut": "Univ/Inst", "univ": "Univ/Inst", "col": "Univ/Inst", "center": "Univ/Inst", "polytechnic": "Univ/Inst", "politecnico": "Univ/Inst", "centre": "Univ/Inst", "cnrs": "Univ/Inst", "faculty": "Univ/Inst", "school": "Univ/Inst", "academ": "Univ/Inst", "akadem": "Univ/Inst", "hochschule": "Univ/Inst", "ecole": "Univ/Inst", "tech": "Univ/Inst", "observ": "Univ/Inst", "escuela": "Univ/Inst", "escola": "Univ/Inst", "discovery programe": "Univ/Inst", "ku leuven": "Univ/Inst", "ucla": "Univ/Inst", "eth zurich": "Univ/Inst", "athena": "Univ/Inst", "openaire": "Univ/Inst", "erasmus": "Univ/Inst", "ist austria": "Univ/Inst", "lmu munich": "Univ/Inst", "cancer trials ireland": "Univ/Inst", "food safety authority": "Univ/Inst", "ucd": "Univ/Inst", "tcd": "Univ/Inst", "apc microbiome": "Univ/Inst", "nasa": "Univ/Inst", "ucl": "Univ/Inst", "zentrum": "Univ/Inst", "ncsr demokritos": "Univ/Inst", "panepistemio": "Univ/Inst", "forth": "Univ/Inst", "nui galway": "Univ/Inst", "nui maynooth": "Univ/Inst", "tu wien": "Univ/Inst", "tu dublin": "Univ/Inst", "lab": "Laboratory", "science": "Laboratory", "cientific": "Laboratory", "hospital": "Hospital", "clinic": "Hospital", "hopital": "Hospital", "klinik": "Hospital", "oncol": "Hospital", "medical": "Hospital", "health": "Hospital", "medicin": "Hospital", "gmbh": "Company", "company": "Company", "industr": "Company", "etaireia": "Company", "corporation": "Company", "inc": "Company", "museum": "Museum", "library": "Museum", "foundation": "Foundation", "asociation": "Foundation", "organization": "Foundation", "society": "Foundation", "group": "Foundation", "royal": "Foundation", "ofice": "Foundation", "trust": "Foundation", "district": "Government", "federation": "Government", "government": "Government", "municipal": "Government", "county": "Government", "council": "Government", "agency": "Government", "unknown": "Unknown", "google": "Specific", "yahoo": "Specific", "ebay": "Specific", "microsoft": "Specific", "teagasc": "Specific", "ibm research": "Specific", "alergan": "Specific", "analog devices": "Specific", "medtronic": "Specific", "xilinx": "Specific", "pfizer": "Specific", "glaxosmithkline": "Specific", "astrazeneca": "Specific"}
{"research": "Univ/Inst", "recherche": "Univ/Inst", "uniwersytet": "Univ/Inst", "investigacions": "Univ/Inst", "institu": "Univ/Inst", "istitut": "Univ/Inst", "univ": "Univ/Inst", "col": "Univ/Inst", "center": "Univ/Inst", "polytechnic": "Univ/Inst", "politecnico": "Univ/Inst", "polutekhneio": "Univ/Inst", "centre": "Univ/Inst", "kentro": "Univ/Inst", "cnrs": "Univ/Inst", "faculty": "Univ/Inst", "school": "Univ/Inst", "academ": "Univ/Inst", "akadem": "Univ/Inst", "hochschule": "Univ/Inst", "ecole": "Univ/Inst", "tech": "Univ/Inst", "observ": "Univ/Inst", "escuela": "Univ/Inst", "escola": "Univ/Inst", "discovery programe": "Univ/Inst", "ku leuven": "Univ/Inst", "ucla": "Univ/Inst", "eth zurich": "Univ/Inst", "athena": "Univ/Inst", "openaire": "Univ/Inst", "erasmus": "Univ/Inst", "ist austria": "Univ/Inst", "lmu munich": "Univ/Inst", "cancer trials ireland": "Univ/Inst", "food safety authority": "Univ/Inst", "ucd": "Univ/Inst", "tcd": "Univ/Inst", "apc microbiome": "Univ/Inst", "nasa": "Univ/Inst", "ucl": "Univ/Inst", "zentrum": "Univ/Inst", "ncsr demokritos": "Univ/Inst", "panepist": "Univ/Inst", "nui galway": "Univ/Inst", "nui maynooth": "Univ/Inst", "tu wien": "Univ/Inst", "tu dublin": "Univ/Inst", "lab": "Laboratory", "science": "Laboratory", "cientific": "Laboratory", "hospital": "Hospital", "clinic": "Hospital", "hopital": "Hospital", "klinik": "Hospital", "oncol": "Hospital", "medical": "Hospital", "health": "Hospital", "medicin": "Hospital", "nosokomei": "Hospital", "krankenhaus": "Hospital", "gmbh": "Company", "company": "Company", "industr": "Company", "etaireia": "Company", "corporation": "Company", "inc": "Company", "museum": "Museum", "library": "Museum", "foundation": "Foundation", "asociation": "Foundation", "organization": "Foundation", "society": "Foundation", "group": "Foundation", "royal": "Foundation", "ofice": "Foundation", "trust": "Foundation", "district": "Government", "federation": "Government", "government": "Government", "municipal": "Government", "county": "Government", "council": "Government", "agency": "Government", "unknown": "Unknown", "google": "Specific", "yahoo": "Specific", "ebay": "Specific", "microsoft": "Specific", "teagasc": "Specific", "ibm research": "Specific", "alergan": "Specific", "analog devices": "Specific", "medtronic": "Specific", "xilinx": "Specific", "pfizer": "Specific", "glaxosmithkline": "Specific", "astrazenecaboehringer ingelheim": "Specific", "demokritos": "Specific", "siemens": "Specific", "forth": "Specific", "lily": "Specific", "boeing": "Specific"}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -12,11 +12,6 @@ def load_txt(file_path):
list_ = [line.strip() for line in file]
return list_
def load_pickled_dict(file_path):
with open(file_path, 'rb') as file:
pickled_dict = pickle.load(file)
return pickled_dict
def load_json(file_path):
with open(file_path, 'r') as json_file:
@ -32,12 +27,12 @@ def replace_double_consonants(text):
result = re.sub(pattern, r'\1', text, flags=re.IGNORECASE)
return result
remove_list = [replace_double_consonants(x) for x in load_txt('txt_files/remove_list.txt')]
stop_words = load_txt('txt_files/stop_words.txt')
university_terms = [replace_double_consonants(x) for x in load_txt('txt_files/university_terms.txt')]
city_names = [replace_double_consonants(x) for x in load_txt('txt_files/city_names.txt')]
remove_list = [replace_double_consonants(x) for x in load_txt('remove_list.txt')]
stop_words = load_txt('stop_words.txt')
university_terms = [replace_double_consonants(x) for x in load_txt('university_terms.txt')]
city_names = [replace_double_consonants(x) for x in load_txt('city_names.txt')]
categ_dicts = load_json('dictionaries/dix_categ.json')
categ_dicts = load_json('dix_categ.json')
def is_contained(s, w):
@ -114,10 +109,10 @@ def remove_parentheses(text):
return re.sub(r'\([^()]*\)', '', text)
def replace_umlauts(text):
normalized_text = unicodedata.normalize('NFKD', text)
replaced_text = ''.join(c for c in normalized_text if not unicodedata.combining(c))
return replaced_text
# def replace_umlauts(text):
# normalized_text = unicodedata.normalize('NFKD', text)
# replaced_text = ''.join(c for c in normalized_text if not unicodedata.combining(c))
# return replaced_text
def protect_phrases(input_string, phrases):
# Replace phrases with placeholders
@ -186,19 +181,45 @@ protected_phrases1 = [
replacements = {'uni versity':'university',
'univ ':'university ',
'univercity':'university',
'universtiy':'university',
'univeristy':'university',
'universirty':'university',
'universiti':'university',
'universitiy':'university',
'universty' :'university',
'univ col': 'university colege',
'belfield, dublin': 'dublin',
'balsbridge, dublin': 'dublin', #ballsbridge
'earlsfort terrace, dublin': 'dublin',
replacements = {'czechoslovak':'czech',
'saint' : 'st',
'aghia' : 'agia',
'universitatsklinikum' : 'universi hospital',
'universitetshospital' : 'universi hospital',
'universitatskinderklinik' : 'universi childrens hospital',
'universitatskliniken' : 'universi hospital',
'Universitätsklinik' : 'universi hospital',
'universitatsmedizin' : 'universi medicine',
'universitatsbibliothek' : 'universi library',
'nat.' : 'national',
'uni versity' : 'university',
'unive rsity' : 'university',
'univ ersity' : 'university',
'inst ' : 'institute ',
'adv ' : 'advanced ',
'univ ' : 'university ',
'stud ' : 'studies ',
'inst.' : 'institute',
'adv.' : 'advanced',
'univ.' : 'university',
'stud.' : 'studies',
'univercity' : 'university',
'univerisity' : 'university',
'universtiy' : 'university',
'univeristy' : 'university',
'universirty' : 'university',
'universiti' : 'university',
'universitiy' : 'university',
'universty' : 'university',
'techniche' : 'technological',
'univ col' : 'university colege',
'univ. col.' : 'university colege',
'univ. coll.' : 'university colege',
'col.' : 'colege',
'hipokration' : 'hipocration',
'belfield, dublin' : 'dublin',
'balsbridge, dublin' : 'dublin', #ballsbridge
'earlsfort terrace, dublin' : 'dublin',
'bon secours hospital, cork' : 'bon secours hospital cork',
'bon secours hospital, dublin' : 'bon secours hospital dublin',
'bon secours hospital, galway' : 'bon secours hospital galway',
@ -211,7 +232,7 @@ replacements = {'uni versity':'university',
'royal holoway, university london' : 'royal holoway universi london', #holloway
'city, university london' : 'city universi london',
'city university, london' : 'city universi london',
'aeginition':'eginition',
'aeginition' : 'eginition',
'national technical university, athens' : 'national technical university athens'
# 'harvard medical school' : 'harvard university'
@ -226,6 +247,7 @@ def substrings_dict(string):
for old, new in replacements.items():
string = string.replace(old, new)
string = string.replace('hospitalum','hospital').replace('hospitalen','hospital')
split_strings = split_string_with_protection(string, protected_phrases1)
# Define a set of university-related terms for later use
@ -234,17 +256,19 @@ def substrings_dict(string):
dict_string = {}
index = 0
for value in split_strings:
value = value.replace('.', ' ')
# Check if the substring contains any university-related terms
if not any(term in value.lower() for term in university_terms):
# Apply regex substitutions for common patterns
modified_value = re.sub(r'universi\w*', 'universi', value, flags=re.IGNORECASE)
modified_value = re.sub(r'institu\w*', 'institu', modified_value, flags=re.IGNORECASE)
modified_value = re.sub(r'centre*', 'center', modified_value, flags=re.IGNORECASE)
modified_value = re.sub(r'centre\b', 'center', modified_value, flags=re.IGNORECASE)
modified_value = re.sub(r'\bsaint\b', 'st', modified_value, flags=re.IGNORECASE)
modified_value = re.sub(r'\btrinity col\b', 'trinity colege', modified_value, flags=re.IGNORECASE)
modified_value = re.sub(r'\btechnische\b', 'technological', modified_value, flags=re.IGNORECASE)
modified_value = re.sub(r'\bteknologi\b', 'technology', modified_value, flags=re.IGNORECASE)
modified_value = re.sub(r'\bpolitehnica\b', 'polytechnic', modified_value, flags=re.IGNORECASE)
@ -272,21 +296,20 @@ def clean_string(input_string):
input_string = input_string.replace(" ", placeholder)
# Unescape HTML entities and convert to lowercase
input_string = replace_comma_spaces(replace_double_consonants(replace_umlauts(unidecode(remove_parentheses(html.unescape(input_string.replace("'", "")))))).strip())
input_string = replace_comma_spaces(replace_double_consonants(unidecode(remove_parentheses(html.unescape(input_string.replace(" ́e","e").replace("'", ""))))).strip())
# Normalize unicode characters (optional, e.g., replace umlauts)
input_string = unidecode(input_string)
# Replace `/` and `` with space (do not replace hyphen `-`)
result = re.sub(r'[/\-]', ' ', input_string)
# Replace `` with space (do not replace hyphen `-`)
result = re.sub(r'[\-]', ' ', input_string)
# Replace "saint" with "st"
result = re.sub(r'\bSaint\b', 'St', result)
result = re.sub(r'\bAghia\b', 'Agia', result)
result = re.sub(r'\bAghios\b', 'Agios', result)
# Remove characters that are not from the Latin alphabet, or allowed punctuation
result = replace_comma_spaces(re.sub(r'[^a-zA-Z\s,;/]', '', result).strip())
result = replace_comma_spaces(re.sub(r'[^a-zA-Z\s,;/.]', '', result).strip())
# Restore the " - " sequence from the placeholder
result = result.replace(placeholder, " ")
@ -303,12 +326,12 @@ def clean_string(input_string):
def clean_string_facts(input_string):
# Replace specified characters with space
input_string = remove_stop_words(replace_umlauts(unidecode(remove_parentheses(html.unescape(input_string.lower())))))
input_string = remove_stop_words(unidecode(remove_parentheses(html.unescape(input_string.lower()))))
result = re.sub(r'[/\-,]', ' ', input_string)
result = re.sub(r'\bsaint\b', 'st', result)
# Remove characters that are not from the Latin alphabet or numbers
result = re.sub(r'[^a-zA-Z0-9\s;/-]', '', result)
result = re.sub(r'[^a-zA-Z0-9\s;/-.]', '', result)
# Replace consecutive whitespace with a single space
result = re.sub(r'\s+', ' ', result)

View File

@ -9,6 +9,10 @@ from sklearn.metrics.pairwise import cosine_similarity
from functions_cluster import *
from create_input_cluster import *
specific = [k for k in categ_dicts if categ_dicts[k] == 'Specific']
def best_sim_score(light_raw, candidate_num, pairs_list, m, simU, simG):
"""
Finds the best match between a 'key word' and several legal names from the OpenAIRE database.
@ -134,7 +138,6 @@ def Aff_Ids(input, dix_org, dix_mult, dix_city_ror, dix_country_ror, simU, simG)
light_aff = input[0]
vectorizer = CountVectorizer()
lnamelist = list(dix_org.keys())
dix = {} # will store indeces and legalnames of organizations of the DOI { i : [legalname1, legalname2,...]}
#pairs = []
result = {}
@ -153,7 +156,7 @@ def Aff_Ids(input, dix_org, dix_mult, dix_city_ror, dix_country_ror, simU, simG)
similar_k = []
pairs_k = []
if s in lnamelist:
if s in dix_org:
similarity = 1
similar_k.append(similarity)
@ -167,7 +170,7 @@ def Aff_Ids(input, dix_org, dix_mult, dix_city_ror, dix_country_ror, simU, simG)
dix[k].append(s)
else:
for x in lnamelist:
for x in dix_org:
if is_contained(s, x):
x_vector = vectorizer.fit_transform([x]).toarray()
@ -257,25 +260,35 @@ def Aff_Ids(input, dix_org, dix_mult, dix_city_ror, dix_country_ror, simU, simG)
ids = [dix_org[x[0]] for x in best]
for i,x in enumerate(matched_org):
# id_list = []
if dix_mult[x] == 'unique':
if 'institu' in x and 'univ' in x:
if dix_city_ror[x][0] not in light_aff and dix_country_ror[x][0] not in light_aff:
pass
else:
ids[i] = dix_org[x]
if dix_mult[x] != 'unique':
if x in list(dix_city_ror.keys()):
match_found0 = False
if x in dix_city_ror:
match_found = False
for city in dix_city_ror[x]:
if city[0] in light_aff:
if city[0] not in x:
ids[i] = city[1]
match_found0 = True
ids[i] = city[1]
match_found = True
break
else:
if light_aff.count(city[0]) >1:
ids[i] = city[1]
match_found = True
break
if not match_found:
for city in dix_city_ror[x]:
if city[0] in light_aff and city[0] not in x:
ids[i] = city[1]
match_found0 = True
print('ok')
break
if not match_found:
@ -309,11 +322,30 @@ def Aff_Ids(input, dix_org, dix_mult, dix_city_ror, dix_country_ror, simU, simG)
ids[i] = country[1]
match_found2 = True
break
if not match_found2:
if 'univ' in x:
ids[i] = dix_org[x]
else:
for sp in specific:
if sp in x:
ids[i] = dix_org[x]
results = [[x[0],x[1], ids[i]] for i,x in enumerate(best)]
# results_upd = []
return results #[[result[to_check[i]] for i in ready] + [to_check[2]], best[0]]
# for r in results:
# if 'ror.org' in r[2]:
# if dix_status[r[2]][0] == 'active':
# results_upd.append([r[0],r[1], 'ROR', r[2], 'active'])
# else:
# if dix_status[r[2]][1] == '':
# results_upd.append([r[0],r[1], 'ROR', r[2], dix_status[r[2]][0]])
# else:
# results_upd.append([r[0],r[1], 'ROR', r[2], dix_status[r[2]][0]])
# results_upd.append([r[0],r[1], 'ROR', dix_status[r[2]][1], 'active'])
return results

97
strings.py Normal file
View File

@ -0,0 +1,97 @@
import time
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, DoubleType
from threading import Thread
from affro_cluster import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, first, collect_list, udf, collect_set
import sys
spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
folder_path = sys.argv[1]
hdfs_output_path = sys.argv[2]
matchings_schema = ArrayType(
StructType([
StructField("Provenance", StringType(), nullable=True),
StructField("PID", StringType(), nullable=True),
StructField("Value", StringType(), nullable=True),
StructField("Confidence", DoubleType(), nullable=True),
StructField("Status", StringType(), nullable=True)
])
)
operation_counter = spark.sparkContext.accumulator(0)
#Version of affro application on a single raw_aff_string and returns just the Matchins set
def oalex_affro(aff_string):
global operation_counter
try:
matchings = affro(aff_string)
operation_counter += 1
# Ensure matchings is a list, even if affro returns a single dict
if not isinstance(matchings, list):
matchings = [matchings]
# Create the result as a tuple that matches matchings_schema
result = []
for matching in matchings:
# Assuming 'matching' is a dictionary that contains 'Provenance', 'PID', 'Value', 'Confidence', 'Status'
result.append((
matching.get("Provenance", None),
matching.get("PID", None),
matching.get("Value", None),
float(matching.get("Confidence", None)),
matching.get("Status", None)
))
return result
except Exception as e:
print(f"Error processing affiliation string {aff_string}: {str(e)}")
return ()
oalex_affro_udf = udf(oalex_affro, matchings_schema)
monitor_done = False
def monitor_counter(interval):
while True:
print(f"Number of calls to AffRo: {operation_counter.value}")
time.sleep(interval)
if monitor_done:
break
exploded = spark.read.json(folder_path) \
.filter(col("doi").isNotNull()) \
.select(
col("doi").alias("DOI"),
col("rors").alias("OAlex"),
explode(col("raw_aff_string")).alias("affiliation") #this allows to split all the raw_aff_string and to parallelize better
)
monitor_thread = Thread(target=monitor_counter, args=(600,), daemon=True)
monitor_thread.start()
affs = exploded \
.select("affiliation") \
.distinct() \
.withColumn("Matchings", oalex_affro_udf(col("affiliation")))
affs.join(exploded, on="affiliation") \
.select(col("DOI"),
col("OAlex"),
explode(col("Matchings")).alias("match")
) \
.groupBy("DOI") \
.agg(first("OAlex").alias("OAlex"), #for each DOI it says what are the other columns Since OALEX is equal for each doi just select the first, while use the collect_list function to aggregate the Matchings
collect_set("match").alias("Matchings") #each exploded match is collected again
) \
.write \
.mode("overwrite") \
.option("compression","gzip") \
.json(hdfs_output_path)
monitor_done = True
monitor_thread.join()

View File

@ -1,3 +1,4 @@
colege street
universi
research institu
laboratory

View File

@ -5,7 +5,6 @@ at
de
for
et
für
des
in
as
@ -14,3 +13,6 @@ and
fur
for
und
der
aus
dem

View File

@ -3,8 +3,10 @@ import os
from pyspark.sql import SparkSession
from affro_cluster import *
folder_path = '/user/zeppelin/miriam.baglioni/AffStringFromIISDataset2'
#folder_path = 'check'
import sys
folder_path = sys.argv[1]
hdfs_output_path = sys.argv[2]
json_file_names = []
@ -14,7 +16,6 @@ for file_name in os.listdir(folder_path):
if file_name != '_SUCCESS':
json_file_names.append(file_name)
# json_file_names now contains the names of all JSON files in the folder
# Initialize Spark session
spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
@ -40,14 +41,14 @@ def update_record(record):
for author in record['authors']:
author_object = {}
if 'orcid.org/0' in author['fullName']:
author_object['Name'] = {'Full':author['fullName'].split(',')[1], 'First' : None, 'Last' : None}
author_object['Name'] = {'Full':author['fullName'].split(',')[1].strip(), 'First' : 'None', 'Last' : 'None', 'Type' : 'None'}
author_object['ORCID'] = author['fullName'].split(',')[0][:36]
else:
author_object['Name'] = {'Full':author['fullName'], 'First' : None, 'Last' : None}
author_object['ORCID'] = None
author_object['Name'] = {'Full':author['fullName'].strip(), 'First' : 'None', 'Last' : 'None', 'Type' : 'None'}
author_object['ORCID'] = 'None'
author_object['Raw_affiliations'] = [affiliation['raw_affiliation_string'] for affiliation in author['affiliations']]
all_affs_with_ror = []
have_ror = False
have_id = False
for affiliation in author['affiliations']:
# author_object['Raw_affiliations'] = [x for x in affiliation['raw_affiliation_string']]
if 'ORCID: 0' in affiliation['raw_affiliation_string']:
@ -57,32 +58,48 @@ def update_record(record):
x = affiliation['raw_affiliation_string']
author_object['ORCID'] = 'https://orcid.org/'+x.split('ORCID ')[1]
if 'ror.org' in affiliation['raw_affiliation_string']:
have_ror = True
have_id = True
all_affs_with_ror.append({
'Origin': 'data',
'RORid': affiliation['raw_affiliation_string'][0:25],
'Confidence': None
})
elif 'grid.' in affiliation['raw_affiliation_string']:
have_id = True
for k in dix_grids:
if k in affiliation['raw_affiliation_string'].split(' ')[0]:
try:
all_affs_with_ror.append({
'Provenance': 'Data',
'PID' : 'ROR',
'Value' : dix_grids[k] ,
'Confidence': 1
})
except:
pass
else:
if len(affro(affiliation['raw_affiliation_string']))>0:
author_object['Organization_PIDs'] = affro(affiliation['raw_affiliation_string'])
author_object['Organization_PIDs'] = remove_duplicates([json.loads(x) for x in author_object['Organization_PIDs']])
author_object['Matchings'] = affro(affiliation['raw_affiliation_string'])
try:
author_object['Matchings'] = remove_duplicates([json.loads(x) for x in author_object['Matchings']])
except:
author_object['Matchings'] = remove_duplicates([x for x in author_object['Matchings']])
else:
author_object['Organization_PIDs'] = []
author_object['Matchings'] = []
if have_ror == True:
author_object['Organization_PIDs'] = all_affs_with_ror
order = ["Name", "Raw_affiliations", "Organization_PIDs", "ORCID"]
if have_id == True:
author_object['Matchings'] = all_affs_with_ror
order = ["Name", "Raw_affiliations", "Matchings", "ORCID"]
reordered_data = {k: author_object[k] for k in order}
authors.append(reordered_data)
organizations = remove_duplicates([x for author in authors for x in author['Organization_PIDs']])
organizations = remove_duplicates([x for author in authors for x in author['Matchings']])
updt = {'ID' : id, 'Authors' : authors, 'Organizations' : organizations}
return updt
@ -90,8 +107,6 @@ def update_record(record):
print(f"Error processing record with id {record.get('id')}: {str(e)}")
return None
for file in json_file_names:
print('start processing '+str(file))
df = spark.read.json(folder_path + '/' + file)
@ -106,7 +121,7 @@ for file in json_file_names:
json_data = json_rdd.collect()
# Create a new filename by appending "_output.json" to the original filename (without extension)
output_file_name = file+'_output.json'
output_file_name = hdfs_output_path + "/" + file+'_output.json'
print('end processing '+str(file))
with open(output_file_name, 'w') as f: