improved logging and optimized default threads and pollrate

This commit is contained in:
Marco Lettere 2020-11-06 18:26:44 +01:00
parent 03557f6efb
commit 66355c1f60
6 changed files with 90 additions and 84 deletions

View File

@ -32,9 +32,10 @@ class PyExec():
self.cfg = configparser.ConfigParser()
self.cfg.read("config.cfg")
self.workerid = self.cfg["common"].get("workerid", "pythonworker-" + str(random.randint(1000,10000)))
logging.basicConfig(level=self.errors[self.cfg["common"]["loglevel"]])
logging.info("Initializing PyExec worker %s with config.cfg", self.workerid)
logging.basicConfig()
self.logger = logging.getLogger("pyexec")
self.logger.setLevel(self.errors[self.cfg["common"]["loglevel"]])
self.logger.info("Initializing PyExec worker %s with config.cfg", self.workerid)
self.threads = self.cfg["common"].getint("threads", 3)
self.pollrate = self.cfg["common"].getfloat("pollrate", .1)
@ -50,23 +51,25 @@ class PyExec():
PyExecPlugin.registerTaskDefinitions(self.server)
def start(self):
logging.info("Started PyExec with plugins: %s", PyExecPlugin.getPluginNames())
self.logger.info("Started PyExec with plugins: %s", PyExecPlugin.getPluginNames())
cc = ConductorWorker(self.server, self.threads, self.pollrate, self.workerid)
# start workers for all included plugins that are available also as standalone tasks ...
for plg in PyExecPlugin.getPlugins():
plugin = PyExecPlugin.get(plg)
if plugin.supportsStandalone(plugin):
pluginnames = PyExecPlugin.getPluginNames()
for i in range(len(pluginnames)):
plugin = PyExecPlugin.get(pluginnames[i])
if plugin.hasDefinition(plugin):
self.logger.debug("Starting plugin %s", plugin)
domain = self.computeDomain(plugin.taskdef["name"])
cc.start(plugin.taskdef["name"], self.pyexec, False, domain)
cc.start(plugin.taskdef["name"], self.pyexec, i == len(pluginnames)-1, domain)
# ... plus fallback pyexec for managing non standalone calls, Nop, Identity and Sequence
domain = self.computeDomain("pyexec")
cc.start('pyexec', self.pyexec, True, domain)
#domain = self.computeDomain("pyexec")
#cc.start('pyexec', self.pyexec, True, domain)
def pyexec(self, task):
try:
logging.info("Executing task {} of type {}[{}] from wkf {}[{}]".format(
self.logger.info("Executing task {} of type {}[{}] from wkf {}[{}]".format(
task["workflowTask"]["taskReferenceName"],
task["taskDefName"],
task["taskId"],
@ -75,11 +78,11 @@ class PyExec():
# Get operation from input or task type (in case of standalone tasks) fallback to Nop
operation = task["inputData"].get("operation") or task["taskDefName"] or "Nop"
logging.debug("Operation is %s", operation)
self.logger.debug("Operation is %s", operation)
# Get plugin by checking either name or alias (in case of standalone tasks the task type is aliased to the plugin name)
p = PyExecPlugin.get(operation) or PyExecPlugin.getAlias(operation)
logging.debug("Plugin is %s", p)
self.logger.debug("Plugin is %s", p)
if p != None:
pi = p(task["inputData"])
@ -91,8 +94,8 @@ class PyExec():
except Exception as exc:
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
logging.debug(exc_type, fname, exc_tb.tb_lineno)
logging.error(str(exc))
self.logger.debug(exc_type, fname, exc_tb.tb_lineno)
self.logger.error(str(exc))
return { "status" : "FAILED", "output" : { "message" : str(exc) }, "logs" : ["one","two"]}
if __name__ == '__main__':

View File

@ -15,6 +15,7 @@
from __future__ import print_function, absolute_import
import sys
import time
import logging
from conductor.conductor import WFClientMgr
from threading import Thread
import socket
@ -68,7 +69,7 @@ class ConductorWorker:
def execute(self, task, exec_function):
try:
#print("Exec function is", exec_function)
logging.getLogger("pyexec").debug("Exec function is %s", exec_function)
resp = exec_function(task)
if type(resp) is not dict or not all(key in resp for key in ('status', 'output', 'logs')):
raise Exception('Task execution function MUST return a response as a dict with status, output and logs fields')
@ -118,10 +119,11 @@ class ConductorWorker:
further details refer to the conductor server documentation
By default, it is set to None
"""
print('Polling for task %s at a %f ms interval with %d threads for task execution, with worker id as %s' % (taskType, self.polling_interval * 1000, self.thread_count, self.worker_id))
logging.getLogger("pyexec").debug('Polling for task %s at a %f ms interval with %d threads for task execution, with worker id as %s' % (taskType, self.polling_interval * 1000, self.thread_count, self.worker_id))
for x in range(0, int(self.thread_count)):
thread = Thread(target=self.poll_and_execute, args=(taskType, exec_function, domain,))
thread.daemon = True
logging.getLogger("pyexec").debug("Starting Thread %d for %s", x, taskType)
thread.start()
if wait:
while 1:
@ -129,7 +131,7 @@ class ConductorWorker:
def exc(taskType, inputData, startTime, retryCount, status, callbackAfterSeconds, pollCount):
print('Executing the function')
logging.getLogger("pyexec").debug('Executing the function')
return {'status': 'COMPLETED', 'output': {}}

View File

@ -16,6 +16,7 @@
from __future__ import print_function
import requests
import json
import logging
import sys
import socket
import warnings
@ -191,9 +192,10 @@ class TaskClient(BaseClient):
if domain is not None:
params['domain'] = domain
try:
logging.getLogger("pyexec").debug("Polling at %s with %s", url, params)
return self.get(url, params)
except Exception as err:
print('Error while polling ' + str(err))
logging.getLogger("pyexec").debug('Error while polling %s', str(err))
return None
def pollForBatch(self, taskType, count, timeout, workerid, domain=None):

View File

@ -2,9 +2,5 @@
loglevel = info
server = http://conductor-dev.int.d4science.net/api
#server = http://localhost:8080/api
threads = 3
pollrate = .1
#domain = comma separated list of task domains to be applied to all task types as fallback
[pyexec]
#domain = comma separated list of task domains to be applied to pyexec task type
threads = 1
pollrate = 1

View File

@ -1,4 +1,5 @@
from pyexecplugins.pyexecplugins import PyExecPlugin
import logging;
class Plugin(PyExecPlugin):
name = "Eval"
@ -17,5 +18,5 @@ class Plugin(PyExecPlugin):
code = self.data.get("code")
if code != None:
ret = eval(code, { "data" : self.data})
return ret
return { "result" : ret }

View File

@ -1,14 +1,15 @@
import requests;
import json
import json;
import logging;
class PyExecPlugins(type):
def __init__(cls, name, bases, attrs):
if not hasattr(cls, "plugins"):
#print("Initializing plugins")
logging.getLogger("pyexec").debug("Initializing plugins")
cls.plugins = {}
cls.alias = {}
else:
#print("Appending a plugin ", cls.name, cls)
logging.getLogger("pyexec").debug("Appending a plugin %s - %s ", cls.name, cls)
cls.plugins[cls.name] = cls
#Alias plugin also with taskdefinition so it will be lookup-able with poth Operation name and taskdefinition
if cls.taskdef is not None:
@ -30,13 +31,13 @@ class PyExecPlugins(type):
url = server + "/metadata/taskdefs"
#pyexec generic taskdefinition
taskdefs = [
{
"name" : "pyexec",
"description" : "Execute PyExec operations",
"inputKeys" : ["operation"],
"outputKeys" : ["ret"],
"ownerEmail" : "m.lettere@gmail.com"
}
#{
# "name" : "pyexec",
# "description" : "Execute PyExec operations",
# "inputKeys" : ["operation"],
# "outputKeys" : ["ret"],
# "ownerEmail" : "m.lettere@gmail.com"
#}
]
for plg in cls.getPluginNames():
if cls.plugins[plg].taskdef is not None:
@ -44,66 +45,67 @@ class PyExecPlugins(type):
taskdefs.append(taskdef)
#Post all new (or not) task definitions to orchestrator
logging.getLogger("pyexec").debug("Recording task definitions %s", taskdefs)
headers = {'Content-Type': 'application/json'}
try:
response = requests.request("POST", url, headers=headers, data=json.dumps(taskdefs))
except Exception as e:
print("Unable to register task defs", e)
logging.getLogger("pyexec").warning("Unable to register task defs %s", e)
class PyExecPlugin(object, metaclass=PyExecPlugins):
def __init__(self, data=None):
self.data = data
def supportsStandalone(self):
def hasDefinition(self):
return (self.taskdef is not None)
class Nop(PyExecPlugin):
name = "Nop"
taskdef = None
def __init__(self, data=None):
super().__init__(data)
def execute(self):
return None
#class Nop(PyExecPlugin):
# name = "Nop"
# taskdef = None
#
# def __init__(self, data=None):
# super().__init__(data)
#
# def execute(self):
# return None
class Identity(PyExecPlugin):
name = "Identity"
taskdef = None
def __init__(self, data=None):
super().__init__(data)
def execute(self):
self.data.pop("_result", None)
return self.data
#class Identity(PyExecPlugin):
# name = "Identity"
# taskdef = None
#
# def __init__(self, data=None):
# super().__init__(data)
#
# def execute(self):
# self.data.pop("_result", None)
# return self.data
class Sequence(PyExecPlugin):
name = "Sequence"
taskdef = None
def __init__(self, data=None):
super().__init__(data)
def handleSequence(self, sequence, currentresults):
for t in sequence:
operation = t.get("operation", "Nop")
if operation == "Sequence":
self.handleSequence(t.tasks, self.results)
else:
p = PyExecPlugin.get(operation)
if p != None:
t["_result"] = currentresults
pi = p(t)
ret = pi.execute()
self.result.append(ret)
else: raise Exception("Operation {} not found.".format(operation))
def execute(self):
self.result = []
self.handleSequence(self.data.get("tasks",[]),self.result)
return {"result": self.result}
#class Sequence(PyExecPlugin):
# name = "Sequence"
# taskdef = None
#
# def __init__(self, data=None):
# super().__init__(data)
#
# def handleSequence(self, sequence, currentresults):
# for t in sequence:
# operation = t.get("operation", "Nop")
# if operation == "Sequence":
# self.handleSequence(t.tasks, self.results)
# else:
# p = PyExecPlugin.get(operation)
# if p != None:
# t["_result"] = currentresults
# pi = p(t)
# ret = pi.execute()
# self.result.append(ret)
#
# else: raise Exception("Operation {} not found.".format(operation))
#
# def execute(self):
# self.result = []
# self.handleSequence(self.data.get("tasks",[]),self.result)
# return {"result": self.result}
#class Http(PyExecPlugin):
# name = "Http"