From 6b0ffd349c9ad9b1a36f5854bb1085072e1cce24 Mon Sep 17 00:00:00 2001 From: Mauro Mugnaini Date: Mon, 17 Apr 2023 17:27:44 +0200 Subject: [PATCH] Worker now is resilient to connection's errors --- .gitignore | 1 + conductor/ConductorWorker.py | 20 +++++++++++++++----- conductor/conductor.py | 2 +- docker/d4science/entrypoint.sh | 2 +- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index 8f6a23c..cafab3c 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ __pycache__/ *.py[cod] *$py.class *.log +*.log.* diff --git a/conductor/ConductorWorker.py b/conductor/ConductorWorker.py index 6e6bef5..872fc36 100644 --- a/conductor/ConductorWorker.py +++ b/conductor/ConductorWorker.py @@ -18,6 +18,7 @@ import time import logging from conductor.conductor import WFClientMgr from threading import Thread +import threading import socket hostname = socket.gethostname() @@ -84,7 +85,7 @@ class ConductorWorker: task['reasonForIncompletion'] = resp['reasonForIncompletion'] self.taskClient.updateTask(task) except Exception as err: - print('Error executing task: ', task["referenceTaskName"], str(err) ) + logging.getLogger("pyexec").error('Error executing task: %s\n%s', task["referenceTaskName"], str(err)) task['status'] = 'FAILED' self.taskClient.updateTask(task) @@ -92,9 +93,14 @@ class ConductorWorker: while True: time.sleep(float(self.polling_interval)) start = time.time() - polled = self.taskClient.pollForTask(taskType, self.worker_id, domain) - end = time.time() - #logging.getLogger("pyexec").debug("Polled in %s sec", (end - start)) + polled = None + try: + polled = self.taskClient.pollForTask(taskType, self.worker_id, domain) + end = time.time() + logging.getLogger("pyexec").debug("Polled in %s sec", (end - start)) + except Exception as e: + logging.getLogger("pyexec").warning("Error during main loop: %s", str(e)) + if polled is not None: self.taskClient.ackTask(polled['taskId'], self.worker_id) self.execute(polled, exec_function) @@ -133,8 +139,12 @@ class ConductorWorker: logging.getLogger("pyexec").debug("Starting Thread %d for %s", x, taskType) thread.start() if wait: + expectedthreads = threading.active_count() while 1: - time.sleep(1) + time.sleep(60) + activecount = threading.active_count() + if expectedthreads != activecount: + logging.getLogger("pyexec").fatal("!!! Active/expected running threads count is: %d/%d", activecount, expectedthreads) def exc(taskType, inputData, startTime, retryCount, status, callbackAfterSeconds, pollCount): diff --git a/conductor/conductor.py b/conductor/conductor.py index e19afd1..1ca975a 100644 --- a/conductor/conductor.py +++ b/conductor/conductor.py @@ -201,7 +201,7 @@ class TaskClient(BaseClient): if reRegisterFunction is not None: reRegisterFunction() else: - logging.getLogger("pyexec").debug('Error while polling %s', str(err)) + logging.getLogger("pyexec").warning('Error while polling: %s', str(err)) return None diff --git a/docker/d4science/entrypoint.sh b/docker/d4science/entrypoint.sh index 4e4d391..941c598 100755 --- a/docker/d4science/entrypoint.sh +++ b/docker/d4science/entrypoint.sh @@ -9,6 +9,6 @@ while [[ "$(curl -s -o /dev/null -L -w ''%{http_code}'' $HEALTH)" != 200 ]]; do sleep 5 done -echo 'Starting default workers with $worker_plugins ...' +echo "Starting default workers with $worker_plugins ..." python3 PyExec.py $worker_plugins