Add the pdfCoverageEvaluator software.
This commit is contained in:
commit
e6d6382bd0
|
@ -0,0 +1,201 @@
|
|||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright 2024 OpenAIRE AMKE
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
|
@ -0,0 +1,26 @@
|
|||
# PDF-coverage-evaluator
|
||||
|
||||
This python-3 script checks each PID from a given collection, against the pdf-aggregation-service-DB,
|
||||
in order to find whether that PID exists in the aggregator's DB and has full-text coverage.
|
||||
<br>
|
||||
|
||||
In detail, it does the following:
|
||||
- extracts the pids from a json-file (DOIs and PMIDs)
|
||||
- if a "previous-results" file is provided, extracts the pid from there as well and reduced the original input to the pids which have not been processed before.
|
||||
- splits them in batches and for each batch it submits each pid-evaluation-job to a "ThreadPoolExecutor", which uses 12 threads.
|
||||
- for each one of the PID-pairs, makes a query with Impala, to quickly acquire the following: "dedupid", "id", "pid", "pid_type", "fulltext_url", "location"
|
||||
- saves the results in a json-file, including the pid for which it made the check (for example in case a record has both "doi" and "pmid" and a fulltext was detected for the "doi" (at least), then the output-record has the "doi" as its PID)
|
||||
|
||||
|
||||
## Install & Run:
|
||||
```python3 --version; sudo apt install -y python3 python3-pip; sudo pip3 install --upgrade pip; cd pdfCoverageEvaluator; sudo pip3 install -r requirements.txt; python3 pdfCoverageEvaluator.py ${input_file_path} ${max_num_to_process} ${previous_results_file_path}```
|
||||
|
||||
|
||||
### Install & run, using the provided scripts
|
||||
1) **transferToRemoteMachineAndExecute.sh**: this script transfers the project to the defined location on a remote machine, replaces the project-files there and executes the software it inside a screen,
|
||||
in order to not lose the execution in case the session is closed before the software finishes.
|
||||
2) **transferToRemoteMachine.sh**: this script just transfers the project to the defined location on a remote machine and replaces the project-files there.
|
||||
|
||||
|
||||
### Checking the logs
|
||||
The log file is located inside the project's directory and has this name: "pdfCoverageEvaluator.log"
|
|
@ -0,0 +1,409 @@
|
|||
import sys
|
||||
import threading
|
||||
|
||||
from impala.dbapi import connect
|
||||
import numpy as np
|
||||
from time import perf_counter
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import json
|
||||
import logging
|
||||
from _datetime import datetime
|
||||
|
||||
# Configure logging.
|
||||
targets = logging.StreamHandler(sys.stdout), logging.FileHandler('pdfCoverageEvaluator.log')
|
||||
logging_level = logging.INFO
|
||||
logging.basicConfig(format='%(asctime)s %(threadName)s %(levelname)s %(filename)s:%(funcName)s(@%(lineno)d) %(message)s', datefmt='%Y-%m-%d %H:%M:%S %Z', level=logging_level,
|
||||
handlers=targets)
|
||||
logger = logging.getLogger()
|
||||
|
||||
input_file_path = "pids_to_check.json" # Default value. The user has to provide the path nevertheless (in the cmd-args).
|
||||
max_num_records_to_process = -1 # The default is to process all records.
|
||||
previous_results_file = None # This is rarely used.
|
||||
impala_host = "impala-gw.openaire.eu"
|
||||
db = "pdfaggregation_i"
|
||||
|
||||
cur_time = datetime.now().strftime('%Y-%m-%d__%H-%M-%S')
|
||||
|
||||
results_json_file_path = "results__" + cur_time + ".json"
|
||||
number_of_threads = 12 # The target machine is a 2-core, 4-threads CPU. This script runs faster with 12 threads in parallel, in that machine.
|
||||
|
||||
|
||||
# I tested 100 pids with 8, 10, 12, 16, and 20 threads. The number "12" was the best.
|
||||
|
||||
|
||||
def getCursorForImpala():
|
||||
global impala_host
|
||||
conn = connect(host=impala_host, port=21050)
|
||||
return conn.cursor()
|
||||
|
||||
|
||||
def testImpalaConnection():
|
||||
global db
|
||||
|
||||
logging.info("Going to test the connections with Impala..")
|
||||
cursor = getCursorForImpala()
|
||||
|
||||
result_data = {}
|
||||
try:
|
||||
cursor.execute(
|
||||
'select * from ' + db + '.publication_pids limit 1') # Test the "publication" table instead of the "payload" view, in order to avoid catching the tables of that view in a "merging-state".
|
||||
result_row = cursor.fetchone()
|
||||
logging.info("result_row: " + str(result_row))
|
||||
if result_row:
|
||||
logging.info("The testing of the connection with Impala succeeded! Test results from \"publication_pids\" table:")
|
||||
for i, col in enumerate(cursor.description):
|
||||
logging.info(str(col[0]) + " = '" + str(result_row[i]) + "'")
|
||||
result_data[col[0]] = str(result_row[i])
|
||||
logging.info("result_data: " + str(result_data))
|
||||
return True
|
||||
else:
|
||||
logging.error("No data was retrieved during testing!")
|
||||
return False
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
return False
|
||||
|
||||
|
||||
def load_json_records(json_file_path, max_num_records_to_process):
|
||||
json_records = []
|
||||
try:
|
||||
with open(json_file_path) as json_file:
|
||||
for counter, line in enumerate(json_file):
|
||||
line = line.strip()
|
||||
if len(line) > 0 and (max_num_records_to_process == -1 or counter < max_num_records_to_process): # "counter" starts from 0
|
||||
json_records.append(json.loads(line))
|
||||
except Exception as e:
|
||||
logging.error("Problem when opening file: " + json_file_path, e)
|
||||
return None
|
||||
return json_records
|
||||
|
||||
|
||||
def extract_PIDs(json_records):
|
||||
# Get only the pids, in a dictionary, having both the doi and the pmid (if available)
|
||||
pids = []
|
||||
for record in json_records:
|
||||
pids_or_record = {}
|
||||
try:
|
||||
doi = record["doi"]
|
||||
if doi:
|
||||
pids_or_record['doi'] = doi
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
try:
|
||||
pmid = record["pmid"]
|
||||
if pmid:
|
||||
pids_or_record['pmid'] = pmid
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
if pids_or_record: # If not empty.
|
||||
pids.append(pids_or_record)
|
||||
|
||||
return pids
|
||||
|
||||
|
||||
def extract_PIDs_from_previous_results(json_records):
|
||||
pids = set()
|
||||
for record in json_records:
|
||||
try:
|
||||
pid = record["pid"]
|
||||
if pid:
|
||||
pids.add(pid)
|
||||
except KeyError:
|
||||
pass
|
||||
return pids
|
||||
|
||||
|
||||
def reduce_to_unprocessed_records(json_records_to_be_processed, pids_of_previous_records):
|
||||
unprocessed_records = []
|
||||
for record in json_records_to_be_processed:
|
||||
doi = None
|
||||
try:
|
||||
doi = record["doi"]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
pmid = None
|
||||
try:
|
||||
pmid = record["pmid"]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# Skip if no pid exists
|
||||
if (doi is None or not doi) and (pmid is None or not pmid):
|
||||
continue
|
||||
|
||||
# Skip if pid has given a result before.
|
||||
if (doi is not None and doi and doi in pids_of_previous_records) or (pmid is not None and pmid and pmid in pids_of_previous_records):
|
||||
continue
|
||||
|
||||
unprocessed_record = {}
|
||||
if doi is not None and doi:
|
||||
unprocessed_record['doi'] = doi
|
||||
if pmid is not None and pmid:
|
||||
unprocessed_record['pmid'] = pmid
|
||||
|
||||
if unprocessed_record:
|
||||
unprocessed_records.append(unprocessed_record)
|
||||
|
||||
return unprocessed_records
|
||||
|
||||
|
||||
def get_empty_result(doi, pmid):
|
||||
result_data = {'dedupid': 'None', 'id': 'None'}
|
||||
if doi is not None:
|
||||
result_data['pid'] = doi
|
||||
result_data['pid_type'] = 'doi'
|
||||
else:
|
||||
result_data['pid'] = pmid
|
||||
result_data['pid_type'] = 'pmid'
|
||||
result_data['fulltext_url'] = 'None'
|
||||
result_data['location'] = 'None'
|
||||
return result_data
|
||||
|
||||
|
||||
def get_query_to_check_coverage_for_record(doi, pmid):
|
||||
# Check that we have at least one file-location associated with one of the pids of this record.
|
||||
|
||||
predicate = '0' # Control value to match nothing.
|
||||
if doi is not None:
|
||||
predicate = "ppid.pid = '" + doi + "'"
|
||||
if pmid is not None:
|
||||
predicate += " or ppid.pid = '" + pmid + "'"
|
||||
elif pmid is not None:
|
||||
predicate = "ppid.pid = '" + pmid + "'"
|
||||
# Else no pid is available. This is already caught by the caller method.
|
||||
|
||||
return "select pu.dedupid, pu.id, ppid.pid, ppid.type as pid_type, p.actual_url as fulltext_url, p.`location` from " + db + ".publication_pids ppid\n" + \
|
||||
" join " + db + ".publication pu on pu.id = ppid.id\n" + \
|
||||
" left outer join " + db + ".payload p on p.id = ppid.id\n" + \
|
||||
"where " + predicate + \
|
||||
"\norder by coalesce(fulltext_url, 'z')" + \
|
||||
"\nlimit 1;" # Order alphabetically, ascending. This is important, so that the pdf-results are preferred over the non-pdf-results.
|
||||
# We use "left outer join" for "payload", in order to retrieve results also for non-pdf-covered PIDs.
|
||||
|
||||
|
||||
thread_local_var = threading.local()
|
||||
|
||||
num_of_pids_covered = 0 # integer variable shared between threads
|
||||
lock = threading.Lock() # lock object, used to safely increment the above variable
|
||||
|
||||
|
||||
def get_coverage_for_record(pids_of_record, retry_count):
|
||||
global num_of_pids_covered
|
||||
|
||||
if retry_count > 10:
|
||||
logging.warning("Could not find the requested payload-type table in an non-merging state, after " + (retry_count - 1) + " retries!")
|
||||
return None
|
||||
|
||||
if not hasattr(thread_local_var, 'cursor'): # If this is the first time this thread accesses its "thread-local" variable, introduce and assign the cersor attribute."
|
||||
thread_local_var.__setattr__('cursor', getCursorForImpala())
|
||||
|
||||
doi = None
|
||||
try:
|
||||
doi = pids_of_record['doi']
|
||||
except KeyError:
|
||||
pass # The "doi" does not exist. Let's hope the "pmid" exists.
|
||||
|
||||
pmid = None
|
||||
try:
|
||||
pmid = pids_of_record['pmid']
|
||||
except KeyError:
|
||||
pass # The "pmid" does not exist (it's common).
|
||||
|
||||
if (doi is None or not doi) and (pmid is None or not pmid):
|
||||
logging.warning("Current record has no pid: " + str(pids_of_record))
|
||||
return None
|
||||
|
||||
result_data = {}
|
||||
try:
|
||||
thread_local_var.cursor.execute(get_query_to_check_coverage_for_record(doi, pmid))
|
||||
result_row = thread_local_var.cursor.fetchone()
|
||||
if result_row:
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logging.debug("result_row: " + str(result_row)) # DEBUG!
|
||||
for i, col in enumerate(thread_local_var.cursor.description):
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logging.debug(str(col[0]) + " = '" + str(result_row[i]) + "'") # DEBUG!
|
||||
result_data[str(col[0])] = str(result_row[i])
|
||||
|
||||
if result_data['location'] != 'None':
|
||||
logging.info("PDF-coverage for record with pid: \"" + result_data['pid'] + "\" of type \"" + result_data['pid_type'] + "\" and location: \"" + result_data[
|
||||
'location'] + "\".")
|
||||
with lock:
|
||||
num_of_pids_covered += 1
|
||||
else:
|
||||
logging.info("No coverage! Record with doi: \"" + str(doi) + "\" and pmid: \"" + str(pmid) + "\"")
|
||||
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logging.debug("result_data: " + str(result_data)) # DEBUG!
|
||||
|
||||
return result_data
|
||||
else:
|
||||
logging.warning("Not in DB! Record with doi: \"" + str(doi) + "\" and pmid: \"" + str(pmid) + "\"")
|
||||
return get_empty_result(doi, pmid)
|
||||
except Exception as e:
|
||||
ex_msg = str(e)
|
||||
if "Could not resolve table reference" in ex_msg or "Could not open HDFS file" in ex_msg:
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logging.debug("One of the tables is in a merging-state!") # DEBUG!
|
||||
return get_coverage_for_record(pids_of_record, (++retry_count))
|
||||
else:
|
||||
logging.error("Error when querying for 'doi': '" + str(doi) + "' and 'pmid': '" + str(pmid) + "'\n", type(e).__name__, e)
|
||||
return get_empty_result(doi, pmid)
|
||||
|
||||
|
||||
pids_per_batch = 1_000
|
||||
|
||||
|
||||
def check_pids_against_pdf_aggregation_db_and_write_results(pids_of_records):
|
||||
global num_of_pids_covered
|
||||
|
||||
num_of_batches = int(len(pids_of_records) / pids_per_batch)
|
||||
if num_of_batches == 0:
|
||||
num_of_batches = 1
|
||||
|
||||
num_of_batches_str = str(num_of_batches)
|
||||
|
||||
# Split the pids into multiple batches (sub-lists) of < pids_per_batch > elements each,
|
||||
chunked_arrays = np.array_split(np.array(pids_of_records), num_of_batches)
|
||||
pids_of_records_sublists = [list(array) for array in chunked_arrays]
|
||||
|
||||
start = perf_counter()
|
||||
|
||||
num_of_all_records = len(pids_of_records)
|
||||
|
||||
logging.info(
|
||||
"Submitting the " + str(num_of_all_records) + " tasks to the 'ThreadPoolExecutor', in " + num_of_batches_str + " batches of " + str(len(pids_of_records_sublists[0]))
|
||||
+ " elements each.")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=number_of_threads) as executor:
|
||||
|
||||
for batch_counter, pids_of_records_batch in enumerate(pids_of_records_sublists):
|
||||
result_futures = []
|
||||
for count, pids_of_record in enumerate(pids_of_records_batch):
|
||||
if pids_of_record:
|
||||
result_futures.append(executor.submit(get_coverage_for_record, pids_of_record, 0))
|
||||
|
||||
batch_counter_str = str(batch_counter)
|
||||
logging.info("Finished submitting the tasks for batch_" + batch_counter_str + " (out of " + num_of_batches_str + "), will wait for them to finish executing..")
|
||||
# After all tasks for this batch have been submitted. Iterate through the futures and wait for all of them to finish and use their results.
|
||||
results_data = []
|
||||
for count, result_future in enumerate(result_futures):
|
||||
try:
|
||||
result_data = result_future.result()
|
||||
if result_data is not None:
|
||||
results_data.append(result_data)
|
||||
|
||||
except Exception as e:
|
||||
logging.error("Error when checking a future's result for batch_" + batch_counter_str, e)
|
||||
|
||||
if len(results_data) > 0:
|
||||
logging.info("Writing " + str(len(results_data)) + " results (from batch_" + batch_counter_str + ") to the output file..")
|
||||
try:
|
||||
with open(results_json_file_path, "a") as outfile:
|
||||
for result in results_data:
|
||||
outfile.write(json.dumps(result) + "\n")
|
||||
except Exception as e:
|
||||
logging.critical("Error when writing to output-file: " + results_json_file_path, e)
|
||||
# It's very serious to lose results, but it may be a temporal error, appearing in the middle of the execution. So, we should not stop the whole execution.
|
||||
else:
|
||||
logging.info("Batch_" + batch_counter_str + " gave no results.")
|
||||
|
||||
finish = perf_counter()
|
||||
|
||||
elapsed_time = finish - start
|
||||
if elapsed_time >= 3600:
|
||||
elapsed_time = elapsed_time / 3600
|
||||
time_message = "Finished in " + str(round(elapsed_time, 2)) + " hours. "
|
||||
elif elapsed_time >= 60:
|
||||
elapsed_time = elapsed_time / 60
|
||||
time_message = "Finished in " + str(round(elapsed_time, 2)) + " minutes. "
|
||||
else:
|
||||
time_message = "Finished in " + str(round(elapsed_time, 2)) + " seconds. "
|
||||
|
||||
logging.info(time_message + str(num_of_pids_covered) + " out of " + str(num_of_all_records) + " pids had coverage.")
|
||||
|
||||
|
||||
def print_records(json_records):
|
||||
records_to_print = ''
|
||||
for record in json_records:
|
||||
records_to_print += str(record) + "\n"
|
||||
logging.info(records_to_print)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = sys.argv
|
||||
num_args = len(args)
|
||||
if num_args < 2 or num_args > 4: # (the 1st arg is the script's name)
|
||||
logging.critical("Wrong number of argument given (" + str(
|
||||
num_args - 1) + ")! Should give between 1 and 3 arguments, in that order: <input_file> , <max_num_records_to_process> , <previous_results_file_path>")
|
||||
sys.exit(1)
|
||||
elif num_args == 2:
|
||||
input_file_path = args[1]
|
||||
max_num_records_to_process = -1
|
||||
elif num_args == 3:
|
||||
input_file_path = args[1]
|
||||
max_num_records_to_process = int(args[2])
|
||||
if max_num_records_to_process == 0:
|
||||
max_num_records_to_process = -1
|
||||
elif num_args == 4:
|
||||
input_file_path = args[1]
|
||||
max_num_records_to_process = int(args[2])
|
||||
if max_num_records_to_process == 0:
|
||||
max_num_records_to_process = -1
|
||||
previous_results_file = args[3]
|
||||
|
||||
if not testImpalaConnection():
|
||||
logging.info("Exiting!")
|
||||
exit(2)
|
||||
|
||||
if max_num_records_to_process == -1:
|
||||
logging.info("Going to process all available pid-records..")
|
||||
else:
|
||||
logging.info("Going to process at most " + str(max_num_records_to_process) + " pid-records..")
|
||||
|
||||
json_records_to_be_processed = load_json_records(input_file_path, max_num_records_to_process)
|
||||
logging.info("Loaded " + str(len(json_records_to_be_processed)) + " json-records from input_file: " + input_file_path)
|
||||
|
||||
if not json_records_to_be_processed:
|
||||
logging.critical("No json-records were found in the input! Exiting...")
|
||||
exit(3)
|
||||
|
||||
# print_records(json_records_to_be_processed)
|
||||
|
||||
pids_of_records = extract_PIDs(json_records_to_be_processed)
|
||||
num_pid_records_str = str(len(pids_of_records))
|
||||
logging.info("Extracted " + num_pid_records_str + " pid-records from input_file: " + input_file_path)
|
||||
|
||||
if not pids_of_records:
|
||||
logging.critical("No pids were extracted from the input json-records! Exiting...")
|
||||
exit(4)
|
||||
|
||||
json_records_previously_processed = None
|
||||
if previous_results_file is not None:
|
||||
logging.info("Going to check the input-records if they were previously processed..")
|
||||
|
||||
json_records_previously_processed = load_json_records(previous_results_file, -1)
|
||||
logging.info("Loaded " + str(len(json_records_previously_processed)) + " json-records from input_file: " + previous_results_file)
|
||||
|
||||
if json_records_previously_processed:
|
||||
pids_of_previous_records = extract_PIDs_from_previous_results(json_records_previously_processed)
|
||||
logging.info("Extracted " + str(len(pids_of_previous_records)) + " pid-records from previous_results_input_file: " + previous_results_file)
|
||||
|
||||
if pids_of_previous_records:
|
||||
unprocessed_records = reduce_to_unprocessed_records(json_records_to_be_processed, pids_of_previous_records)
|
||||
logging.info("Reduced the " + num_pid_records_str + " pid-records to " + str(len(unprocessed_records)) + " after checking if they have been processed before.")
|
||||
pids_of_records = unprocessed_records
|
||||
else:
|
||||
logging.error("No pids were extracted from the previous-results-input! Continue by checking all selected pids from input..")
|
||||
else:
|
||||
logging.warning("No json-records were found in the previous-results-input! Continue by checking all selected pids from input..")
|
||||
|
||||
# if logger.isEnabledFor(logging.DEBUG):
|
||||
# logging.info(pids_of_records)
|
||||
|
||||
check_pids_against_pdf_aggregation_db_and_write_results(pids_of_records)
|
||||
|
||||
exit(0)
|
|
@ -0,0 +1,2 @@
|
|||
impyla~=0.19.0
|
||||
numpy~=1.19.5
|
|
@ -0,0 +1,11 @@
|
|||
# We can only connect to Impala from authorized IPs, so we have to transfer this python-project-directory to such a machine.
|
||||
|
||||
remote_user_with_ip='<USER>@<IP>' # Fill this information as needed.
|
||||
|
||||
echo -e "Removing remote directory.."
|
||||
ssh -t ${remote_user_with_ip} "rm -rf pdfCoverageEvaluator"
|
||||
|
||||
cd ../
|
||||
scp -r -C pdfCoverageEvaluator ${remote_user_with_ip}:.
|
||||
|
||||
ssh -t ${remote_user_with_ip} "python3 --version; sudo apt install -y python3-pip; sudo pip3 install -r ./pdfCoverageEvaluator/requirements.txt"
|
|
@ -0,0 +1,18 @@
|
|||
# We can only connect to Impala from authorized IPs, so we have to transfer this python-project-directory to such a machine.
|
||||
|
||||
remote_user_with_ip='<USER>@<IP>' # Fill this information as needed.
|
||||
|
||||
echo -e "Removing remote directory.."
|
||||
ssh -t ${remote_user_with_ip} "rm -rf pdfCoverageEvaluator"
|
||||
|
||||
cd ../
|
||||
scp -r -C pdfCoverageEvaluator ${remote_user_with_ip}:.
|
||||
|
||||
input_file_path='pids_to_check.json'
|
||||
max_num_to_process=-1 # -1 == All records
|
||||
#previous_results_file_path='previous_results.json' # Uncomment this, if you want to exclude the pids from the input which have already been processed and gave a result in a previous run.
|
||||
|
||||
ssh -t ${remote_user_with_ip} "python3 --version; sudo apt install -y python3 python3-pip; sudo pip3 install --upgrade pip; cd pdfCoverageEvaluator; sudo pip3 install -r requirements.txt; (screen -d -m python3 pdfCoverageEvaluator.py ${input_file_path} ${max_num_to_process} ${previous_results_file_path} && screen -r); tail -n 100 pdfCoverageEvaluator.log"
|
||||
|
||||
# In case the program fails and the screen terminates without logs, use the following command to debug (without the screen):
|
||||
#ssh -t ${remote_user_with_ip} "python3 --version; sudo apt install -y python3 python3-pip; sudo pip3 install --upgrade pip; cd pdfCoverageEvaluator; sudo pip3 install -r requirements.txt; python3 pdfCoverageEvaluator.py ${input_file_path} ${max_num_to_process}" # ${previous_results_file_path}"
|
Loading…
Reference in New Issue