pdfCoverageEvaluator/pdfCoverageEvaluator.py

410 lines
16 KiB
Python

import sys
import threading
from impala.dbapi import connect
import numpy as np
from time import perf_counter
from concurrent.futures import ThreadPoolExecutor
import json
import logging
from _datetime import datetime
# Configure logging.
targets = logging.StreamHandler(sys.stdout), logging.FileHandler('pdfCoverageEvaluator.log')
logging_level = logging.INFO
logging.basicConfig(format='%(asctime)s %(threadName)s %(levelname)s %(filename)s:%(funcName)s(@%(lineno)d) %(message)s', datefmt='%Y-%m-%d %H:%M:%S %Z', level=logging_level,
handlers=targets)
logger = logging.getLogger()
input_file_path = "pids_to_check.json" # Default value. The user has to provide the path nevertheless (in the cmd-args).
max_num_records_to_process = -1 # The default is to process all records.
previous_results_file = None # This is rarely used.
impala_host = "impala-gw.openaire.eu"
db = "pdfaggregation_i"
cur_time = datetime.now().strftime('%Y-%m-%d__%H-%M-%S')
results_json_file_path = "results__" + cur_time + ".json"
number_of_threads = 12 # The target machine is a 2-core, 4-threads CPU. This script runs faster with 12 threads in parallel, in that machine.
# I tested 100 pids with 8, 10, 12, 16, and 20 threads. The number "12" was the best.
def getCursorForImpala():
global impala_host
conn = connect(host=impala_host, port=21050)
return conn.cursor()
def testImpalaConnection():
global db
logging.info("Going to test the connections with Impala..")
cursor = getCursorForImpala()
result_data = {}
try:
cursor.execute(
'select * from ' + db + '.publication_pids limit 1') # Test the "publication" table instead of the "payload" view, in order to avoid catching the tables of that view in a "merging-state".
result_row = cursor.fetchone()
logging.info("result_row: " + str(result_row))
if result_row:
logging.info("The testing of the connection with Impala succeeded! Test results from \"publication_pids\" table:")
for i, col in enumerate(cursor.description):
logging.info(str(col[0]) + " = '" + str(result_row[i]) + "'")
result_data[col[0]] = str(result_row[i])
logging.info("result_data: " + str(result_data))
return True
else:
logging.error("No data was retrieved during testing!")
return False
except Exception as e:
logging.error(e)
return False
def load_json_records(json_file_path, max_num_records_to_process):
json_records = []
try:
with open(json_file_path) as json_file:
for counter, line in enumerate(json_file):
line = line.strip()
if len(line) > 0 and (max_num_records_to_process == -1 or counter < max_num_records_to_process): # "counter" starts from 0
json_records.append(json.loads(line))
except Exception as e:
logging.error("Problem when opening file: " + json_file_path, e)
return None
return json_records
def extract_PIDs(json_records):
# Get only the pids, in a dictionary, having both the doi and the pmid (if available)
pids = []
for record in json_records:
pids_or_record = {}
try:
doi = record["doi"]
if doi:
pids_or_record['doi'] = doi
except KeyError:
pass
try:
pmid = record["pmid"]
if pmid:
pids_or_record['pmid'] = pmid
except KeyError:
pass
if pids_or_record: # If not empty.
pids.append(pids_or_record)
return pids
def extract_PIDs_from_previous_results(json_records):
pids = set()
for record in json_records:
try:
pid = record["pid"]
if pid:
pids.add(pid)
except KeyError:
pass
return pids
def reduce_to_unprocessed_records(json_records_to_be_processed, pids_of_previous_records):
unprocessed_records = []
for record in json_records_to_be_processed:
doi = None
try:
doi = record["doi"]
except KeyError:
pass
pmid = None
try:
pmid = record["pmid"]
except KeyError:
pass
# Skip if no pid exists
if (doi is None or not doi) and (pmid is None or not pmid):
continue
# Skip if pid has given a result before.
if (doi is not None and doi and doi in pids_of_previous_records) or (pmid is not None and pmid and pmid in pids_of_previous_records):
continue
unprocessed_record = {}
if doi is not None and doi:
unprocessed_record['doi'] = doi
if pmid is not None and pmid:
unprocessed_record['pmid'] = pmid
if unprocessed_record:
unprocessed_records.append(unprocessed_record)
return unprocessed_records
def get_empty_result(doi, pmid):
result_data = {'dedupid': 'None', 'id': 'None'}
if doi is not None:
result_data['pid'] = doi
result_data['pid_type'] = 'doi'
else:
result_data['pid'] = pmid
result_data['pid_type'] = 'pmid'
result_data['fulltext_url'] = 'None'
result_data['location'] = 'None'
return result_data
def get_query_to_check_coverage_for_record(doi, pmid):
# Check that we have at least one file-location associated with one of the pids of this record.
predicate = '0' # Control value to match nothing.
if doi is not None:
predicate = "ppid.pid = '" + doi + "'"
if pmid is not None:
predicate += " or ppid.pid = '" + pmid + "'"
elif pmid is not None:
predicate = "ppid.pid = '" + pmid + "'"
# Else no pid is available. This is already caught by the caller method.
return "select pu.dedupid, pu.id, ppid.pid, ppid.type as pid_type, p.actual_url as fulltext_url, p.`location` from " + db + ".publication_pids ppid\n" + \
" join " + db + ".publication pu on pu.id = ppid.id\n" + \
" left outer join " + db + ".payload p on p.id = ppid.id\n" + \
"where " + predicate + \
"\norder by coalesce(fulltext_url, 'z')" + \
"\nlimit 1;" # Order alphabetically, ascending. This is important, so that the pdf-results are preferred over the non-pdf-results.
# We use "left outer join" for "payload", in order to retrieve results also for non-pdf-covered PIDs.
thread_local_var = threading.local()
num_of_pids_covered = 0 # integer variable shared between threads
lock = threading.Lock() # lock object, used to safely increment the above variable
def get_coverage_for_record(pids_of_record, retry_count):
global num_of_pids_covered
if retry_count > 10:
logging.warning("Could not find the requested payload-type table in an non-merging state, after " + (retry_count - 1) + " retries!")
return None
if not hasattr(thread_local_var, 'cursor'): # If this is the first time this thread accesses its "thread-local" variable, introduce and assign the cersor attribute."
thread_local_var.__setattr__('cursor', getCursorForImpala())
doi = None
try:
doi = pids_of_record['doi']
except KeyError:
pass # The "doi" does not exist. Let's hope the "pmid" exists.
pmid = None
try:
pmid = pids_of_record['pmid']
except KeyError:
pass # The "pmid" does not exist (it's common).
if (doi is None or not doi) and (pmid is None or not pmid):
logging.warning("Current record has no pid: " + str(pids_of_record))
return None
result_data = {}
try:
thread_local_var.cursor.execute(get_query_to_check_coverage_for_record(doi, pmid))
result_row = thread_local_var.cursor.fetchone()
if result_row:
if logger.isEnabledFor(logging.DEBUG):
logging.debug("result_row: " + str(result_row)) # DEBUG!
for i, col in enumerate(thread_local_var.cursor.description):
if logger.isEnabledFor(logging.DEBUG):
logging.debug(str(col[0]) + " = '" + str(result_row[i]) + "'") # DEBUG!
result_data[str(col[0])] = str(result_row[i])
if result_data['location'] != 'None':
logging.info("PDF-coverage for record with pid: \"" + result_data['pid'] + "\" of type \"" + result_data['pid_type'] + "\" and location: \"" + result_data[
'location'] + "\".")
with lock:
num_of_pids_covered += 1
else:
logging.info("No coverage! Record with doi: \"" + str(doi) + "\" and pmid: \"" + str(pmid) + "\"")
if logger.isEnabledFor(logging.DEBUG):
logging.debug("result_data: " + str(result_data)) # DEBUG!
return result_data
else:
logging.warning("Not in DB! Record with doi: \"" + str(doi) + "\" and pmid: \"" + str(pmid) + "\"")
return get_empty_result(doi, pmid)
except Exception as e:
ex_msg = str(e)
if "Could not resolve table reference" in ex_msg or "Could not open HDFS file" in ex_msg:
if logger.isEnabledFor(logging.DEBUG):
logging.debug("One of the tables is in a merging-state!") # DEBUG!
return get_coverage_for_record(pids_of_record, (++retry_count))
else:
logging.error("Error when querying for 'doi': '" + str(doi) + "' and 'pmid': '" + str(pmid) + "'\n", type(e).__name__, e)
return get_empty_result(doi, pmid)
pids_per_batch = 1_000
def check_pids_against_pdf_aggregation_db_and_write_results(pids_of_records):
global num_of_pids_covered
num_of_batches = int(len(pids_of_records) / pids_per_batch)
if num_of_batches == 0:
num_of_batches = 1
num_of_batches_str = str(num_of_batches)
# Split the pids into multiple batches (sub-lists) of < pids_per_batch > elements each,
chunked_arrays = np.array_split(np.array(pids_of_records), num_of_batches)
pids_of_records_sublists = [list(array) for array in chunked_arrays]
start = perf_counter()
num_of_all_records = len(pids_of_records)
logging.info(
"Submitting the " + str(num_of_all_records) + " tasks to the 'ThreadPoolExecutor', in " + num_of_batches_str + " batches of " + str(len(pids_of_records_sublists[0]))
+ " elements each.")
with ThreadPoolExecutor(max_workers=number_of_threads) as executor:
for batch_counter, pids_of_records_batch in enumerate(pids_of_records_sublists):
result_futures = []
for count, pids_of_record in enumerate(pids_of_records_batch):
if pids_of_record:
result_futures.append(executor.submit(get_coverage_for_record, pids_of_record, 0))
batch_counter_str = str(batch_counter)
logging.info("Finished submitting the tasks for batch_" + batch_counter_str + " (out of " + num_of_batches_str + "), will wait for them to finish executing..")
# After all tasks for this batch have been submitted. Iterate through the futures and wait for all of them to finish and use their results.
results_data = []
for count, result_future in enumerate(result_futures):
try:
result_data = result_future.result()
if result_data is not None:
results_data.append(result_data)
except Exception as e:
logging.error("Error when checking a future's result for batch_" + batch_counter_str, e)
if len(results_data) > 0:
logging.info("Writing " + str(len(results_data)) + " results (from batch_" + batch_counter_str + ") to the output file..")
try:
with open(results_json_file_path, "a") as outfile:
for result in results_data:
outfile.write(json.dumps(result) + "\n")
except Exception as e:
logging.critical("Error when writing to output-file: " + results_json_file_path, e)
# It's very serious to lose results, but it may be a temporal error, appearing in the middle of the execution. So, we should not stop the whole execution.
else:
logging.info("Batch_" + batch_counter_str + " gave no results.")
finish = perf_counter()
elapsed_time = finish - start
if elapsed_time >= 3600:
elapsed_time = elapsed_time / 3600
time_message = "Finished in " + str(round(elapsed_time, 2)) + " hours. "
elif elapsed_time >= 60:
elapsed_time = elapsed_time / 60
time_message = "Finished in " + str(round(elapsed_time, 2)) + " minutes. "
else:
time_message = "Finished in " + str(round(elapsed_time, 2)) + " seconds. "
logging.info(time_message + str(num_of_pids_covered) + " out of " + str(num_of_all_records) + " pids had coverage.")
def print_records(json_records):
records_to_print = ''
for record in json_records:
records_to_print += str(record) + "\n"
logging.info(records_to_print)
if __name__ == '__main__':
args = sys.argv
num_args = len(args)
if num_args < 2 or num_args > 4: # (the 1st arg is the script's name)
logging.critical("Wrong number of argument given (" + str(
num_args - 1) + ")! Should give between 1 and 3 arguments, in that order: <input_file> , <max_num_records_to_process> , <previous_results_file_path>")
sys.exit(1)
elif num_args == 2:
input_file_path = args[1]
max_num_records_to_process = -1
elif num_args == 3:
input_file_path = args[1]
max_num_records_to_process = int(args[2])
if max_num_records_to_process == 0:
max_num_records_to_process = -1
elif num_args == 4:
input_file_path = args[1]
max_num_records_to_process = int(args[2])
if max_num_records_to_process == 0:
max_num_records_to_process = -1
previous_results_file = args[3]
if not testImpalaConnection():
logging.info("Exiting!")
exit(2)
if max_num_records_to_process == -1:
logging.info("Going to process all available pid-records..")
else:
logging.info("Going to process at most " + str(max_num_records_to_process) + " pid-records..")
json_records_to_be_processed = load_json_records(input_file_path, max_num_records_to_process)
logging.info("Loaded " + str(len(json_records_to_be_processed)) + " json-records from input_file: " + input_file_path)
if not json_records_to_be_processed:
logging.critical("No json-records were found in the input! Exiting...")
exit(3)
# print_records(json_records_to_be_processed)
pids_of_records = extract_PIDs(json_records_to_be_processed)
num_pid_records_str = str(len(pids_of_records))
logging.info("Extracted " + num_pid_records_str + " pid-records from input_file: " + input_file_path)
if not pids_of_records:
logging.critical("No pids were extracted from the input json-records! Exiting...")
exit(4)
json_records_previously_processed = None
if previous_results_file is not None:
logging.info("Going to check the input-records if they were previously processed..")
json_records_previously_processed = load_json_records(previous_results_file, -1)
logging.info("Loaded " + str(len(json_records_previously_processed)) + " json-records from input_file: " + previous_results_file)
if json_records_previously_processed:
pids_of_previous_records = extract_PIDs_from_previous_results(json_records_previously_processed)
logging.info("Extracted " + str(len(pids_of_previous_records)) + " pid-records from previous_results_input_file: " + previous_results_file)
if pids_of_previous_records:
unprocessed_records = reduce_to_unprocessed_records(json_records_to_be_processed, pids_of_previous_records)
logging.info("Reduced the " + num_pid_records_str + " pid-records to " + str(len(unprocessed_records)) + " after checking if they have been processed before.")
pids_of_records = unprocessed_records
else:
logging.error("No pids were extracted from the previous-results-input! Continue by checking all selected pids from input..")
else:
logging.warning("No json-records were found in the previous-results-input! Continue by checking all selected pids from input..")
# if logger.isEnabledFor(logging.DEBUG):
# logging.info(pids_of_records)
check_pids_against_pdf_aggregation_db_and_write_results(pids_of_records)
exit(0)