added strategies for polling

This commit is contained in:
dcore94 2024-03-08 16:25:55 +01:00
parent fff03667b4
commit 63aa34cb06
1 changed files with 176 additions and 0 deletions

176
pollstrategies/__init__.py Normal file
View File

@ -0,0 +1,176 @@
import logging
import time
import websocket
from threading import Thread
class PollStrategy():
config = None
logger = None
emptypollcount = None
currentemptypollcount = None
reactive = False
# Build a factory given a configuration
@staticmethod
def build(strategy):
if strategy["strategy"].lower() == "simple":
return SimplePollStrategy(strategy)
elif strategy["strategy"].lower() == "batch":
return BatchPollStrategy(strategy)
elif strategy["strategy"].lower() == "adaptive":
return AdaptivePollStrategy(strategy)
else:
raise Exception("Unknown poll strategy: " + strategy["srtategy"])
def __init__(self, config):
self.config = config
self.emptypollcount = self.currentemptypollcount = 20
self.logger = logging.getLogger("pyexec")
self.reactive = self.config["reactive"].lower() == "true"
def __str__(self) -> str:
return "{} strategy for {} ".format("reactive " + self.config["strategy"] if self.isReactive() else self.config["strategy"], self.getTask())
def poll_and_execute(self, taskType, exec_function, domain=None):
pass
def getTask(self):
return self.config["task"]
def getThreadCount(self):
return int(self.config["threads"])
def getPollRate(self):
return int(self.config["pollrate"])
def getPollRateMS(self):
return self.getPollRate() * 1000
def getNotificationEndpoint(self):
return self.config["notification"]
def getWorkerID(self):
return self.config["worker_id"]
def executeWrapper(self, taskClient, task, exec_function):
try:
resp = exec_function(task)
if type(resp) is not dict or not all(key in resp for key in ('status', 'output', 'logs')):
raise Exception('Task execution function MUST return a response as a dict with status, output and logs fields')
task['status'] = resp['status']
task['outputData'] = resp['output']
task['logs'] = resp['logs']
if 'reasonForIncompletion' in resp:
task['reasonForIncompletion'] = resp['reasonForIncompletion']
taskClient.updateTask(task)
except Exception as err:
self.logger.error('Error executing task: %s\n%s', task["referenceTaskName"], str(err))
task['status'] = 'FAILED'
taskClient.updateTask(task)
def isReactive(self):
return self.reactive
def resetEmptyPollCount(self):
if not self.isReactive():
pass
else:
self.currentemptypollcount = self.emptypollcount
def updateEmptyPollCount(self):
if not self.isReactive():
pass
else:
self.currentemptypollcount -= 1
self.logger.debug("{} has empty poll count of {}".format(self, self.currentemptypollcount))
def waitNotificationIfRequired(self):
if not self.isReactive() and not self.getNotificationEndpoint():
pass
elif self.currentemptypollcount >= 0:
pass
else:
self.resetEmptyPollCount()
try:
ws = websocket.create_connection(self.getNotificationEndpoint(), timetout=300)
except:
self.logger.warn("{} Error while connecting to notification endpoint. Won't wait....".format(self))
return
try:
self.logger.debug("{} wating for notification".format(str(self)))
data = ws.recv()
self.logger.debug("{} received notification {}".format(str(self), data))
except:
self.logger.warn("{} Error while wating for notification.", str(self))
class SimplePollStrategy(PollStrategy):
def __init__(self, config):
super().__init__(config)
def poll_and_execute(self, taskClient, taskType, exec_function, domain=None):
self.resetEmptyPollCount()
start = end = None
while True:
# After first iteration sleep for remaining time to accomodate pollrate
if start != None:
self.logger.debug("Sleeping for {}".format(self.getPollRate()))
time.sleep(self.getPollRate())
start = time.time()
polled = None
try:
self.logger.debug("Polling")
polled = taskClient.pollForTask(taskType, self.getWorkerID(), domain)
end = time.time()
self.logger.debug("{} Polled in {} seconds".format(str(self), (end - start)))
except Exception as e:
self.logger.warning("{} error during main loop: {}", str(self), str(e))
if polled is not None:
self.resetEmptyPollCount()
taskClient.ackTask(polled['taskId'], self.getWorkerID())
self.executeWrapper(taskClient, polled, exec_function)
else:
self.updateEmptyPollCount()
self.waitNotificationIfRequired()
class BatchPollStrategy(PollStrategy):
def __init__(self, config):
super().__init__(config)
def poll_and_execute(self, taskClient, taskType, exec_function, domain=None):
self.resetEmptyPollCount()
while True:
start = time.time()
polled = None
try:
polled = taskClient.pollForBatch(taskType, self.getThreadCount(), self.getPollRateMS(), self.getWorkerID(), domain)
end = time.time()
self.logger.debug("{} Polled in {} seconds".format(str(self), (end - start)))
except Exception as e:
self.logger.warning("{} Error during main loop: {}", str(self), str(e))
if polled and len(polled) > 0:
self.resetEmptyPollCount()
threads = []
for task in polled:
taskClient.ackTask(task['taskId'], self.getWorkerID())
thread = Thread(target=self.executeWrapper, args=(taskClient, task, exec_function,))
threads.append(thread)
thread.start()
for t in threads:
t.join()
else:
self.updateEmptyPollCount()
self.waitNotificationIfRequired()
class AdaptivePollStrategy(PollStrategy):
def __init__(self, config):
super().__init__(config)
def poll_and_execute(self, taskClient, askType, exec_function, domain=None):
pass