refactored to use strategies

This commit is contained in:
dcore94 2024-03-08 16:29:04 +01:00
parent 31399d110d
commit e4f56664df
1 changed files with 9 additions and 49 deletions

View File

@ -20,6 +20,7 @@ from conductor.conductor import WFClientMgr
from threading import Thread
import threading
import socket
from pollstrategies import PollStrategy
hostname = socket.gethostname()
@ -43,7 +44,7 @@ class ConductorWorker:
script can run multiple workers using the wait argument. For more
details, view the start method
"""
def __init__(self, server_url, thread_count, polling_interval, worker_id=None, auth=None):
def __init__(self, server_url, worker_id=None, auth=None):
"""
Parameters
----------
@ -68,44 +69,9 @@ class ConductorWorker:
wfcMgr = WFClientMgr(server_url, hdrs)
self.workflowClient = wfcMgr.workflowClient
self.taskClient = wfcMgr.taskClient
self.thread_count = thread_count
self.polling_interval = polling_interval
self.worker_id = worker_id or hostname
def execute(self, task, exec_function):
try:
logging.getLogger("pyexec").debug("Exec function is %s", exec_function)
resp = exec_function(task)
if type(resp) is not dict or not all(key in resp for key in ('status', 'output', 'logs')):
raise Exception('Task execution function MUST return a response as a dict with status, output and logs fields')
task['status'] = resp['status']
task['outputData'] = resp['output']
task['logs'] = resp['logs']
if 'reasonForIncompletion' in resp:
task['reasonForIncompletion'] = resp['reasonForIncompletion']
self.taskClient.updateTask(task)
except Exception as err:
logging.getLogger("pyexec").error('Error executing task: %s\n%s', task["referenceTaskName"], str(err))
task['status'] = 'FAILED'
self.taskClient.updateTask(task)
def poll_and_execute(self, taskType, exec_function, domain=None):
while True:
time.sleep(float(self.polling_interval))
start = time.time()
polled = None
try:
polled = self.taskClient.pollForTask(taskType, self.worker_id, domain)
end = time.time()
logging.getLogger("pyexec").debug("Polled in %s sec", (end - start))
except Exception as e:
logging.getLogger("pyexec").warning("Error during main loop: %s", str(e))
if polled is not None:
self.taskClient.ackTask(polled['taskId'], self.worker_id)
self.execute(polled, exec_function)
def start(self, taskType, exec_function, wait, domain=None):
def start(self, taskType, exec_function, strategy, wait, domain=None):
"""
start begins the continuous polling of the conductor server
@ -132,12 +98,12 @@ class ConductorWorker:
further details refer to the conductor server documentation
By default, it is set to None
"""
logging.getLogger("pyexec").debug('Polling for task %s at a %f ms interval with %d threads for task execution, with worker id as %s' % (taskType, self.polling_interval * 1000, self.thread_count, self.worker_id))
for x in range(0, int(self.thread_count)):
thread = Thread(target=self.poll_and_execute, args=(taskType, exec_function, domain,))
thread.daemon = True
logging.getLogger("pyexec").debug("Starting Thread %d for %s", x, taskType)
thread.start()
logging.getLogger("pyexec").debug('Polling with strategy %s for task %s at a %f ms interval with %d threads for task execution, with worker id as %s' % ( strategy["strategy"], taskType, int(strategy["pollrate"]) * 1000, int(strategy["threads"]), self.worker_id))
strategy["worker_id"] = self.worker_id
thread = Thread(target=PollStrategy.build(strategy).poll_and_execute, args=(self.taskClient, taskType, exec_function, domain,))
thread.daemon = True
logging.getLogger("pyexec").debug("Starting Thread for %s", taskType)
thread.start()
if wait:
expectedthreads = threading.active_count()
while 1:
@ -149,12 +115,6 @@ class ConductorWorker:
logging.getLogger("pyexec").fatal("!!! Exiting")
sys.exit(1)
def exc(taskType, inputData, startTime, retryCount, status, callbackAfterSeconds, pollCount):
logging.getLogger("pyexec").debug('Executing the function')
return {'status': 'COMPLETED', 'output': {}}
def main():
cc = ConductorWorker('http://localhost:8080/api', 5, 0.1)
cc.start(sys.argv[1], exc, False)