Worker now is resilient to connection's errors

This commit is contained in:
Mauro Mugnaini 2023-04-17 17:27:44 +02:00
parent a0a858d170
commit 6b0ffd349c
4 changed files with 18 additions and 7 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@ __pycache__/
*.py[cod]
*$py.class
*.log
*.log.*

View File

@ -18,6 +18,7 @@ import time
import logging
from conductor.conductor import WFClientMgr
from threading import Thread
import threading
import socket
hostname = socket.gethostname()
@ -84,7 +85,7 @@ class ConductorWorker:
task['reasonForIncompletion'] = resp['reasonForIncompletion']
self.taskClient.updateTask(task)
except Exception as err:
print('Error executing task: ', task["referenceTaskName"], str(err) )
logging.getLogger("pyexec").error('Error executing task: %s\n%s', task["referenceTaskName"], str(err))
task['status'] = 'FAILED'
self.taskClient.updateTask(task)
@ -92,9 +93,14 @@ class ConductorWorker:
while True:
time.sleep(float(self.polling_interval))
start = time.time()
polled = self.taskClient.pollForTask(taskType, self.worker_id, domain)
end = time.time()
#logging.getLogger("pyexec").debug("Polled in %s sec", (end - start))
polled = None
try:
polled = self.taskClient.pollForTask(taskType, self.worker_id, domain)
end = time.time()
logging.getLogger("pyexec").debug("Polled in %s sec", (end - start))
except Exception as e:
logging.getLogger("pyexec").warning("Error during main loop: %s", str(e))
if polled is not None:
self.taskClient.ackTask(polled['taskId'], self.worker_id)
self.execute(polled, exec_function)
@ -133,8 +139,12 @@ class ConductorWorker:
logging.getLogger("pyexec").debug("Starting Thread %d for %s", x, taskType)
thread.start()
if wait:
expectedthreads = threading.active_count()
while 1:
time.sleep(1)
time.sleep(60)
activecount = threading.active_count()
if expectedthreads != activecount:
logging.getLogger("pyexec").fatal("!!! Active/expected running threads count is: %d/%d", activecount, expectedthreads)
def exc(taskType, inputData, startTime, retryCount, status, callbackAfterSeconds, pollCount):

View File

@ -201,7 +201,7 @@ class TaskClient(BaseClient):
if reRegisterFunction is not None:
reRegisterFunction()
else:
logging.getLogger("pyexec").debug('Error while polling %s', str(err))
logging.getLogger("pyexec").warning('Error while polling: %s', str(err))
return None

View File

@ -9,6 +9,6 @@ while [[ "$(curl -s -o /dev/null -L -w ''%{http_code}'' $HEALTH)" != 200 ]]; do
sleep 5
done
echo 'Starting default workers with $worker_plugins ...'
echo "Starting default workers with $worker_plugins ..."
python3 PyExec.py $worker_plugins