import sys import threading from impala.dbapi import connect import numpy as np from time import perf_counter from concurrent.futures import ThreadPoolExecutor import json import logging from _datetime import datetime # Configure logging. targets = logging.StreamHandler(sys.stdout), logging.FileHandler('pdfCoverageEvaluator.log') logging_level = logging.INFO logging.basicConfig(format='%(asctime)s %(threadName)s %(levelname)s %(filename)s:%(funcName)s(@%(lineno)d) %(message)s', datefmt='%Y-%m-%d %H:%M:%S %Z', level=logging_level, handlers=targets) logger = logging.getLogger() input_file_path = "pids_to_check.json" # Default value. The user has to provide the path nevertheless (in the cmd-args). max_num_records_to_process = -1 # The default is to process all records. previous_results_file = None # This is rarely used. impala_host = "impala-gw.openaire.eu" db = "pdfaggregation_i" cur_time = datetime.now().strftime('%Y-%m-%d__%H-%M-%S') results_json_file_path = "results__" + cur_time + ".json" number_of_threads = 12 # The target machine is a 2-core, 4-threads CPU. This script runs faster with 12 threads in parallel, in that machine. # I tested 100 pids with 8, 10, 12, 16, and 20 threads. The number "12" was the best. def getCursorForImpala(): global impala_host conn = connect(host=impala_host, port=21050) return conn.cursor() def testImpalaConnection(): global db logging.info("Going to test the connections with Impala..") cursor = getCursorForImpala() result_data = {} try: cursor.execute( 'select * from ' + db + '.publication_pids limit 1') # Test the "publication" table instead of the "payload" view, in order to avoid catching the tables of that view in a "merging-state". result_row = cursor.fetchone() logging.info("result_row: " + str(result_row)) if result_row: logging.info("The testing of the connection with Impala succeeded! Test results from \"publication_pids\" table:") for i, col in enumerate(cursor.description): logging.info(str(col[0]) + " = '" + str(result_row[i]) + "'") result_data[col[0]] = str(result_row[i]) logging.info("result_data: " + str(result_data)) return True else: logging.error("No data was retrieved during testing!") return False except Exception as e: logging.error(e) return False def load_json_records(json_file_path, max_num_records_to_process): json_records = [] try: with open(json_file_path) as json_file: for counter, line in enumerate(json_file): line = line.strip() if len(line) > 0 and (max_num_records_to_process == -1 or counter < max_num_records_to_process): # "counter" starts from 0 json_records.append(json.loads(line)) except Exception as e: logging.error("Problem when opening file: " + json_file_path, e) return None return json_records def extract_PIDs(json_records): # Get only the pids, in a dictionary, having both the doi and the pmid (if available) pids = [] for record in json_records: pids_or_record = {} try: doi = record["doi"] if doi: pids_or_record['doi'] = doi except KeyError: pass try: pmid = record["pmid"] if pmid: pids_or_record['pmid'] = pmid except KeyError: pass if pids_or_record: # If not empty. pids.append(pids_or_record) return pids def extract_PIDs_from_previous_results(json_records): pids = set() for record in json_records: try: pid = record["pid"] if pid: pids.add(pid) except KeyError: pass return pids def reduce_to_unprocessed_records(json_records_to_be_processed, pids_of_previous_records): unprocessed_records = [] for record in json_records_to_be_processed: doi = None try: doi = record["doi"] except KeyError: pass pmid = None try: pmid = record["pmid"] except KeyError: pass # Skip if no pid exists if (doi is None or not doi) and (pmid is None or not pmid): continue # Skip if pid has given a result before. if (doi is not None and doi and doi in pids_of_previous_records) or (pmid is not None and pmid and pmid in pids_of_previous_records): continue unprocessed_record = {} if doi is not None and doi: unprocessed_record['doi'] = doi if pmid is not None and pmid: unprocessed_record['pmid'] = pmid if unprocessed_record: unprocessed_records.append(unprocessed_record) return unprocessed_records def get_empty_result(doi, pmid): result_data = {'dedupid': 'None', 'id': 'None'} if doi is not None: result_data['pid'] = doi result_data['pid_type'] = 'doi' else: result_data['pid'] = pmid result_data['pid_type'] = 'pmid' result_data['fulltext_url'] = 'None' result_data['location'] = 'None' return result_data def get_query_to_check_coverage_for_record(doi, pmid): # Check that we have at least one file-location associated with one of the pids of this record. predicate = '0' # Control value to match nothing. if doi is not None: predicate = "ppid.pid = '" + doi + "'" if pmid is not None: predicate += " or ppid.pid = '" + pmid + "'" elif pmid is not None: predicate = "ppid.pid = '" + pmid + "'" # Else no pid is available. This is already caught by the caller method. return "select pu.dedupid, pu.id, ppid.pid, ppid.type as pid_type, p.actual_url as fulltext_url, p.`location` from " + db + ".publication_pids ppid\n" + \ " join " + db + ".publication pu on pu.id = ppid.id\n" + \ " left outer join " + db + ".payload p on p.id = ppid.id\n" + \ "where " + predicate + \ "\norder by coalesce(fulltext_url, 'z')" + \ "\nlimit 1;" # Order alphabetically, ascending. This is important, so that the pdf-results are preferred over the non-pdf-results. # We use "left outer join" for "payload", in order to retrieve results also for non-pdf-covered PIDs. thread_local_var = threading.local() num_of_pids_covered = 0 # integer variable shared between threads lock = threading.Lock() # lock object, used to safely increment the above variable def get_coverage_for_record(pids_of_record, retry_count): global num_of_pids_covered if retry_count > 10: logging.warning("Could not find the requested payload-type table in an non-merging state, after " + (retry_count - 1) + " retries!") return None if not hasattr(thread_local_var, 'cursor'): # If this is the first time this thread accesses its "thread-local" variable, introduce and assign the cersor attribute." thread_local_var.__setattr__('cursor', getCursorForImpala()) doi = None try: doi = pids_of_record['doi'] except KeyError: pass # The "doi" does not exist. Let's hope the "pmid" exists. pmid = None try: pmid = pids_of_record['pmid'] except KeyError: pass # The "pmid" does not exist (it's common). if (doi is None or not doi) and (pmid is None or not pmid): logging.warning("Current record has no pid: " + str(pids_of_record)) return None result_data = {} try: thread_local_var.cursor.execute(get_query_to_check_coverage_for_record(doi, pmid)) result_row = thread_local_var.cursor.fetchone() if result_row: if logger.isEnabledFor(logging.DEBUG): logging.debug("result_row: " + str(result_row)) # DEBUG! for i, col in enumerate(thread_local_var.cursor.description): if logger.isEnabledFor(logging.DEBUG): logging.debug(str(col[0]) + " = '" + str(result_row[i]) + "'") # DEBUG! result_data[str(col[0])] = str(result_row[i]) if result_data['location'] != 'None': logging.info("PDF-coverage for record with pid: \"" + result_data['pid'] + "\" of type \"" + result_data['pid_type'] + "\" and location: \"" + result_data[ 'location'] + "\".") with lock: num_of_pids_covered += 1 else: logging.info("No coverage! Record with doi: \"" + str(doi) + "\" and pmid: \"" + str(pmid) + "\"") if logger.isEnabledFor(logging.DEBUG): logging.debug("result_data: " + str(result_data)) # DEBUG! return result_data else: logging.warning("Not in DB! Record with doi: \"" + str(doi) + "\" and pmid: \"" + str(pmid) + "\"") return get_empty_result(doi, pmid) except Exception as e: ex_msg = str(e) if "Could not resolve table reference" in ex_msg or "Could not open HDFS file" in ex_msg: if logger.isEnabledFor(logging.DEBUG): logging.debug("One of the tables is in a merging-state!") # DEBUG! return get_coverage_for_record(pids_of_record, (++retry_count)) else: logging.error("Error when querying for 'doi': '" + str(doi) + "' and 'pmid': '" + str(pmid) + "'\n", type(e).__name__, e) return get_empty_result(doi, pmid) pids_per_batch = 1_000 def check_pids_against_pdf_aggregation_db_and_write_results(pids_of_records): global num_of_pids_covered num_of_batches = int(len(pids_of_records) / pids_per_batch) if num_of_batches == 0: num_of_batches = 1 num_of_batches_str = str(num_of_batches) # Split the pids into multiple batches (sub-lists) of < pids_per_batch > elements each, chunked_arrays = np.array_split(np.array(pids_of_records), num_of_batches) pids_of_records_sublists = [list(array) for array in chunked_arrays] start = perf_counter() num_of_all_records = len(pids_of_records) logging.info( "Submitting the " + str(num_of_all_records) + " tasks to the 'ThreadPoolExecutor', in " + num_of_batches_str + " batches of " + str(len(pids_of_records_sublists[0])) + " elements each.") with ThreadPoolExecutor(max_workers=number_of_threads) as executor: for batch_counter, pids_of_records_batch in enumerate(pids_of_records_sublists): result_futures = [] for count, pids_of_record in enumerate(pids_of_records_batch): if pids_of_record: result_futures.append(executor.submit(get_coverage_for_record, pids_of_record, 0)) batch_counter_str = str(batch_counter) logging.info("Finished submitting the tasks for batch_" + batch_counter_str + " (out of " + num_of_batches_str + "), will wait for them to finish executing..") # After all tasks for this batch have been submitted. Iterate through the futures and wait for all of them to finish and use their results. results_data = [] for count, result_future in enumerate(result_futures): try: result_data = result_future.result() if result_data is not None: results_data.append(result_data) except Exception as e: logging.error("Error when checking a future's result for batch_" + batch_counter_str, e) if len(results_data) > 0: logging.info("Writing " + str(len(results_data)) + " results (from batch_" + batch_counter_str + ") to the output file..") try: with open(results_json_file_path, "a") as outfile: for result in results_data: outfile.write(json.dumps(result) + "\n") except Exception as e: logging.critical("Error when writing to output-file: " + results_json_file_path, e) # It's very serious to lose results, but it may be a temporal error, appearing in the middle of the execution. So, we should not stop the whole execution. else: logging.info("Batch_" + batch_counter_str + " gave no results.") finish = perf_counter() elapsed_time = finish - start if elapsed_time >= 3600: elapsed_time = elapsed_time / 3600 time_message = "Finished in " + str(round(elapsed_time, 2)) + " hours. " elif elapsed_time >= 60: elapsed_time = elapsed_time / 60 time_message = "Finished in " + str(round(elapsed_time, 2)) + " minutes. " else: time_message = "Finished in " + str(round(elapsed_time, 2)) + " seconds. " logging.info(time_message + str(num_of_pids_covered) + " out of " + str(num_of_all_records) + " pids had coverage.") def print_records(json_records): records_to_print = '' for record in json_records: records_to_print += str(record) + "\n" logging.info(records_to_print) if __name__ == '__main__': args = sys.argv num_args = len(args) if num_args < 2 or num_args > 4: # (the 1st arg is the script's name) logging.critical("Wrong number of argument given (" + str( num_args - 1) + ")! Should give between 1 and 3 arguments, in that order: , , ") sys.exit(1) elif num_args == 2: input_file_path = args[1] max_num_records_to_process = -1 elif num_args == 3: input_file_path = args[1] max_num_records_to_process = int(args[2]) if max_num_records_to_process == 0: max_num_records_to_process = -1 elif num_args == 4: input_file_path = args[1] max_num_records_to_process = int(args[2]) if max_num_records_to_process == 0: max_num_records_to_process = -1 previous_results_file = args[3] if not testImpalaConnection(): logging.info("Exiting!") exit(2) if max_num_records_to_process == -1: logging.info("Going to process all available pid-records..") else: logging.info("Going to process at most " + str(max_num_records_to_process) + " pid-records..") json_records_to_be_processed = load_json_records(input_file_path, max_num_records_to_process) logging.info("Loaded " + str(len(json_records_to_be_processed)) + " json-records from input_file: " + input_file_path) if not json_records_to_be_processed: logging.critical("No json-records were found in the input! Exiting...") exit(3) # print_records(json_records_to_be_processed) pids_of_records = extract_PIDs(json_records_to_be_processed) num_pid_records_str = str(len(pids_of_records)) logging.info("Extracted " + num_pid_records_str + " pid-records from input_file: " + input_file_path) if not pids_of_records: logging.critical("No pids were extracted from the input json-records! Exiting...") exit(4) json_records_previously_processed = None if previous_results_file is not None: logging.info("Going to check the input-records if they were previously processed..") json_records_previously_processed = load_json_records(previous_results_file, -1) logging.info("Loaded " + str(len(json_records_previously_processed)) + " json-records from input_file: " + previous_results_file) if json_records_previously_processed: pids_of_previous_records = extract_PIDs_from_previous_results(json_records_previously_processed) logging.info("Extracted " + str(len(pids_of_previous_records)) + " pid-records from previous_results_input_file: " + previous_results_file) if pids_of_previous_records: unprocessed_records = reduce_to_unprocessed_records(json_records_to_be_processed, pids_of_previous_records) logging.info("Reduced the " + num_pid_records_str + " pid-records to " + str(len(unprocessed_records)) + " after checking if they have been processed before.") pids_of_records = unprocessed_records else: logging.error("No pids were extracted from the previous-results-input! Continue by checking all selected pids from input..") else: logging.warning("No json-records were found in the previous-results-input! Continue by checking all selected pids from input..") # if logger.isEnabledFor(logging.DEBUG): # logging.info(pids_of_records) check_pids_against_pdf_aggregation_db_and_write_results(pids_of_records) exit(0)