conductor-worker-python/PyExec.py

115 lines
5.1 KiB
Python
Raw Permalink Normal View History

2020-10-14 18:06:28 +02:00
import configparser
import importlib
import logging
import logging.config
2020-10-14 18:06:28 +02:00
import sys
import os
import random
2021-02-12 10:35:22 +01:00
import argparse
2020-10-14 18:06:28 +02:00
from pyexecplugins.pyexecplugins import *
from conductor.ConductorWorker import ConductorWorker
class PyExec():
errors = { "info" : logging.INFO, "error" : logging.ERROR, "debug" : logging.DEBUG}
2021-02-12 10:35:22 +01:00
def __init__(self, args):
self.init(args.config, args.commands)
2020-10-14 18:06:28 +02:00
def computeDomain(self, plg):
domain = self.getDomain(plg)
commondomain = self.getDomain("common")
if domain is not None and commondomain is not None:
domain = domain + "," + commondomain
elif commondomain is not None:
domain = commondomain
return domain
def getDomain(self, plg):
if self.cfg.has_section(plg):
return self.cfg[plg].get("domain")
else:
return None
2021-02-12 10:35:22 +01:00
def init(self, config, pluginlist):
2020-10-14 18:06:28 +02:00
self.cfg = configparser.ConfigParser()
2021-02-12 10:35:22 +01:00
self.cfg.read(config)
2020-10-14 18:06:28 +02:00
self.workerid = self.cfg["common"].get("workerid", "pythonworker-" + str(random.randint(1000,10000)))
loggerconfigfile = self.cfg["common"].get("loggerconfig", "logging-docker.cfg")
print("Loading logger configuration from: " + loggerconfigfile)
logging.config.fileConfig(loggerconfigfile, disable_existing_loggers=False)
self.logger = logging.getLogger("pyexec")
self.logger.info("Initializing PyExec worker %s with config.cfg", self.workerid)
2020-10-14 18:06:28 +02:00
self.threads = self.cfg["common"].getint("threads", 3)
self.pollrate = self.cfg["common"].getfloat("pollrate", .1)
self.server = os.environ.get('CONDUCTOR_SERVER', self.cfg["common"].get("server", "http://localhost:8080/api"))
2021-07-22 17:42:43 +02:00
self.auth = os.environ.get('CONDUCTOR_WORKER_AUTH', self.cfg["common"].get("auth"))
# if self.auth is None:
# error = "Unable to find auth details neither in ENV nor in config file"
# self.logger.critical(error)
# raise Exception(error)
2020-10-14 18:06:28 +02:00
for plg in pluginlist:
path = "pyexecplugins." + plg
try:
importlib.import_module(path)
except Exception as err:
logging.warning("Skipping plugin %s %s", path, err)
continue
PyExecPlugin.registerTaskDefinitions(self.server, self.auth)
2020-10-14 18:06:28 +02:00
def start(self):
self.logger.info("Started PyExec with plugins: %s", PyExecPlugin.getPluginNames())
2021-07-22 17:42:43 +02:00
cc = ConductorWorker(self.server, self.threads, self.pollrate, self.workerid, self.auth)
2020-10-14 18:06:28 +02:00
# start workers for all included plugins that are available also as standalone tasks ...
pluginnames = PyExecPlugin.getPluginNames()
for i in range(len(pluginnames)):
plugin = PyExecPlugin.get(pluginnames[i])
if plugin.hasDefinition(plugin):
self.logger.debug("Starting plugin %s", plugin)
2020-10-14 18:06:28 +02:00
domain = self.computeDomain(plugin.taskdef["name"])
cc.start(plugin.taskdef["name"], self.pyexec, i == len(pluginnames)-1, domain)
2020-10-14 18:06:28 +02:00
# ... plus fallback pyexec for managing non standalone calls, Nop, Identity and Sequence
#domain = self.computeDomain("pyexec")
#cc.start('pyexec', self.pyexec, True, domain)
2020-10-14 18:06:28 +02:00
def pyexec(self, task):
try:
self.logger.info("Executing task {} of type {}[{}] from wkf {}[{}]".format(
2020-10-14 18:06:28 +02:00
task["workflowTask"]["taskReferenceName"],
task["taskDefName"],
task["taskId"],
task["workflowType"],
task["workflowInstanceId"]))
# Get operation from input or task type (in case of standalone tasks) fallback to Nop
operation = task["inputData"].get("operation") or task["taskDefName"] or "Nop"
self.logger.info("Operation is %s", operation)
2020-10-14 18:06:28 +02:00
# Get plugin by checking either name or alias (in case of standalone tasks the task type is aliased to the plugin name)
p = PyExecPlugin.get(operation) or PyExecPlugin.getAlias(operation)
self.logger.debug("Plugin is %s", p)
2020-10-14 18:06:28 +02:00
if p != None:
pi = p(task["inputData"], self.cfg)
2020-10-14 18:06:28 +02:00
ret = pi.execute()
else: raise Exception("Operation {} not found.".format(operation))
return { "status" : "COMPLETED", "output" : ret, "logs" : ["one","two"]}
except Exception as exc:
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
2020-11-12 18:38:54 +01:00
self.logger.debug("%s %s %s", exc_type, fname, exc_tb.tb_lineno)
self.logger.error("%s", str(exc))
2020-10-14 18:06:28 +02:00
return { "status" : "FAILED", "output" : { "message" : str(exc) }, "logs" : ["one","two"]}
if __name__ == '__main__':
2021-02-12 10:35:22 +01:00
parser = argparse.ArgumentParser()
2021-03-22 15:30:31 +01:00
parser.add_argument('-c', '--config', nargs="?", default='config.cfg', const='config.cfg', help='Specify a config file. Default is config.cfg')
parser.add_argument('commands', nargs='+', help='Use Ansible, Packer, Http, Eval, Mail for declaring the tasks to poll for')
2021-02-12 10:35:22 +01:00
args = parser.parse_args()
pyexec = PyExec(args)
2020-10-14 18:06:28 +02:00
pyexec.start()