Added significant logs at terminatio. Moved to strategies for polling

This commit is contained in:
dcore94 2024-03-08 16:25:28 +01:00
parent 62c3c35e95
commit fff03667b4
1 changed files with 23 additions and 11 deletions

View File

@ -4,6 +4,7 @@ import logging
import logging.config
import sys
import os
import time
import random
import argparse
from pyexecplugins.pyexecplugins import *
@ -40,14 +41,9 @@ class PyExec():
self.logger = logging.getLogger("pyexec")
self.logger.info("Initializing PyExec worker %s with config.cfg", self.workerid)
self.threads = self.cfg["common"].getint("threads", 3)
self.pollrate = self.cfg["common"].getfloat("pollrate", .1)
self.pollrate = self.cfg["common"].getint("pollrate", 5000)
self.server = os.environ.get('CONDUCTOR_SERVER', self.cfg["common"].get("server", "http://localhost:8080/api"))
self.auth = os.environ.get('CONDUCTOR_WORKER_AUTH', self.cfg["common"].get("auth"))
# if self.auth is None:
# error = "Unable to find auth details neither in ENV nor in config file"
# self.logger.critical(error)
# raise Exception(error)
for plg in pluginlist:
path = "pyexecplugins." + plg
try:
@ -58,18 +54,34 @@ class PyExec():
PyExecPlugin.registerTaskDefinitions(self.server, self.auth)
defaultstrategy = { "strategy" : "simple", "pollrate" : 1, "threads" : 1, "reactive" : "false" }
def computeStrategy(self, plg):
strategy = self.defaultstrategy.copy()
hassection = self.cfg.has_section(plg.name)
strategy["strategy"] = (self.cfg[plg.name].get("strategy") if hassection else None) or self.cfg["common"].get("strategy") or strategy["strategy"]
strategy["pollrate"] = (self.cfg[plg.name].get("pollrate") if hassection else None) or self.cfg["common"].get("pollrate") or strategy["pollrate"]
strategy["threads"] = (self.cfg[plg.name].get("threads") if hassection else None) or self.cfg["common"].get("threads") or strategy["threads"]
strategy["reactive"] = (self.cfg[plg.name].get("reactive") if hassection else None) or self.cfg["common"].get("reactive") or strategy["reactive"]
strategy["notification"] = self.cfg["common"].get("notification", None)
strategy["task"] = plg.name
return strategy
def start(self):
self.logger.info("Started PyExec with plugins: %s", PyExecPlugin.getPluginNames())
cc = ConductorWorker(self.server, self.threads, self.pollrate, self.workerid, self.auth)
cc = ConductorWorker(self.server, self.workerid, self.auth)
# start workers for all included plugins that are available also as standalone tasks ...
pluginnames = PyExecPlugin.getPluginNames()
for i in range(len(pluginnames)):
plugin = PyExecPlugin.get(pluginnames[i])
if plugin.hasDefinition(plugin):
self.logger.debug("Starting plugin %s", plugin)
strategy = self.computeStrategy(plugin)
self.logger.debug("Starting plugin %s with strategy %s", plugin, strategy)
domain = self.computeDomain(plugin.taskdef["name"])
cc.start(plugin.taskdef["name"], self.pyexec, i == len(pluginnames)-1, domain)
cc.start(plugin.taskdef["name"], self.pyexec, strategy, i == len(pluginnames)-1, domain)
# ... plus fallback pyexec for managing non standalone calls, Nop, Identity and Sequence
#domain = self.computeDomain("pyexec")
@ -96,14 +108,14 @@ class PyExec():
pi = p(task["inputData"], self.cfg)
ret = pi.execute()
else: raise Exception("Operation {} not found.".format(operation))
return { "status" : "COMPLETED", "output" : ret, "logs" : ["one","two"]}
return { "status" : "COMPLETED", "output" : ret, "logs" : ["Task completed at " + time.strftime("%Y-%m-%d %H:%M:%S")]}
except Exception as exc:
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
self.logger.debug("%s %s %s", exc_type, fname, exc_tb.tb_lineno)
self.logger.error("%s", str(exc))
return { "status" : "FAILED", "output" : { "message" : str(exc) }, "logs" : ["one","two"]}
return { "status" : "FAILED", "output" : {}, "logs" : ["Task failed at " + time.strftime("%Y-%m-%d %H:%M:%S"), str(exc)]}
if __name__ == '__main__':
parser = argparse.ArgumentParser()