conductor-worker-python/PyExec.py

115 lines
5.1 KiB
Python

import configparser
import importlib
import logging
import logging.config
import sys
import os
import random
import argparse
from pyexecplugins.pyexecplugins import *
from conductor.ConductorWorker import ConductorWorker
class PyExec():
errors = { "info" : logging.INFO, "error" : logging.ERROR, "debug" : logging.DEBUG}
def __init__(self, args):
self.init(args.config, args.commands)
def computeDomain(self, plg):
domain = self.getDomain(plg)
commondomain = self.getDomain("common")
if domain is not None and commondomain is not None:
domain = domain + "," + commondomain
elif commondomain is not None:
domain = commondomain
return domain
def getDomain(self, plg):
if self.cfg.has_section(plg):
return self.cfg[plg].get("domain")
else:
return None
def init(self, config, pluginlist):
self.cfg = configparser.ConfigParser()
self.cfg.read(config)
self.workerid = self.cfg["common"].get("workerid", "pythonworker-" + str(random.randint(1000,10000)))
loggerconfigfile = self.cfg["common"].get("loggerconfig", "logging-docker.cfg")
print("Loading logger configuration from: " + loggerconfigfile)
logging.config.fileConfig(loggerconfigfile, disable_existing_loggers=False)
self.logger = logging.getLogger("pyexec")
self.logger.info("Initializing PyExec worker %s with config.cfg", self.workerid)
self.threads = self.cfg["common"].getint("threads", 3)
self.pollrate = self.cfg["common"].getfloat("pollrate", .1)
self.server = os.environ.get('CONDUCTOR_SERVER', self.cfg["common"].get("server", "http://localhost:8080/api"))
self.auth = os.environ.get('CONDUCTOR_WORKER_AUTH', self.cfg["common"].get("auth"))
# if self.auth is None:
# error = "Unable to find auth details neither in ENV nor in config file"
# self.logger.critical(error)
# raise Exception(error)
for plg in pluginlist:
path = "pyexecplugins." + plg
try:
importlib.import_module(path)
except Exception as err:
logging.warning("Skipping plugin %s %s", path, err)
continue
PyExecPlugin.registerTaskDefinitions(self.server, self.auth)
def start(self):
self.logger.info("Started PyExec with plugins: %s", PyExecPlugin.getPluginNames())
cc = ConductorWorker(self.server, self.threads, self.pollrate, self.workerid, self.auth)
# start workers for all included plugins that are available also as standalone tasks ...
pluginnames = PyExecPlugin.getPluginNames()
for i in range(len(pluginnames)):
plugin = PyExecPlugin.get(pluginnames[i])
if plugin.hasDefinition(plugin):
self.logger.debug("Starting plugin %s", plugin)
domain = self.computeDomain(plugin.taskdef["name"])
cc.start(plugin.taskdef["name"], self.pyexec, i == len(pluginnames)-1, domain)
# ... plus fallback pyexec for managing non standalone calls, Nop, Identity and Sequence
#domain = self.computeDomain("pyexec")
#cc.start('pyexec', self.pyexec, True, domain)
def pyexec(self, task):
try:
self.logger.info("Executing task {} of type {}[{}] from wkf {}[{}]".format(
task["workflowTask"]["taskReferenceName"],
task["taskDefName"],
task["taskId"],
task["workflowType"],
task["workflowInstanceId"]))
# Get operation from input or task type (in case of standalone tasks) fallback to Nop
operation = task["inputData"].get("operation") or task["taskDefName"] or "Nop"
self.logger.info("Operation is %s", operation)
# Get plugin by checking either name or alias (in case of standalone tasks the task type is aliased to the plugin name)
p = PyExecPlugin.get(operation) or PyExecPlugin.getAlias(operation)
self.logger.debug("Plugin is %s", p)
if p != None:
pi = p(task["inputData"], self.cfg)
ret = pi.execute()
else: raise Exception("Operation {} not found.".format(operation))
return { "status" : "COMPLETED", "output" : ret, "logs" : ["one","two"]}
except Exception as exc:
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
self.logger.debug("%s %s %s", exc_type, fname, exc_tb.tb_lineno)
self.logger.error("%s", str(exc))
return { "status" : "FAILED", "output" : { "message" : str(exc) }, "logs" : ["one","two"]}
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', nargs="?", default='config.cfg', const='config.cfg', help='Specify a config file. Default is config.cfg')
parser.add_argument('commands', nargs='+', help='Use Ansible, Packer, Http, Eval, Mail for declaring the tasks to poll for')
args = parser.parse_args()
pyexec = PyExec(args)
pyexec.start()