115 lines
5.1 KiB
Python
115 lines
5.1 KiB
Python
import configparser
|
|
import importlib
|
|
import logging
|
|
import logging.config
|
|
import sys
|
|
import os
|
|
import random
|
|
import argparse
|
|
from pyexecplugins.pyexecplugins import *
|
|
from conductor.ConductorWorker import ConductorWorker
|
|
|
|
class PyExec():
|
|
errors = { "info" : logging.INFO, "error" : logging.ERROR, "debug" : logging.DEBUG}
|
|
|
|
def __init__(self, args):
|
|
self.init(args.config, args.commands)
|
|
|
|
def computeDomain(self, plg):
|
|
domain = self.getDomain(plg)
|
|
commondomain = self.getDomain("common")
|
|
if domain is not None and commondomain is not None:
|
|
domain = domain + "," + commondomain
|
|
elif commondomain is not None:
|
|
domain = commondomain
|
|
return domain
|
|
|
|
def getDomain(self, plg):
|
|
if self.cfg.has_section(plg):
|
|
return self.cfg[plg].get("domain")
|
|
else:
|
|
return None
|
|
|
|
def init(self, config, pluginlist):
|
|
self.cfg = configparser.ConfigParser()
|
|
self.cfg.read(config)
|
|
self.workerid = self.cfg["common"].get("workerid", "pythonworker-" + str(random.randint(1000,10000)))
|
|
loggerconfigfile = self.cfg["common"].get("loggerconfig", "logging-docker.cfg")
|
|
print("Loading logger configuration from: " + loggerconfigfile)
|
|
logging.config.fileConfig(loggerconfigfile, disable_existing_loggers=False)
|
|
self.logger = logging.getLogger("pyexec")
|
|
self.logger.info("Initializing PyExec worker %s with config.cfg", self.workerid)
|
|
self.threads = self.cfg["common"].getint("threads", 3)
|
|
self.pollrate = self.cfg["common"].getfloat("pollrate", .1)
|
|
self.server = os.environ.get('CONDUCTOR_SERVER', self.cfg["common"].get("server", "http://localhost:8080/api"))
|
|
self.auth = os.environ.get('CONDUCTOR_WORKER_AUTH', self.cfg["common"].get("auth"))
|
|
# if self.auth is None:
|
|
# error = "Unable to find auth details neither in ENV nor in config file"
|
|
# self.logger.critical(error)
|
|
# raise Exception(error)
|
|
|
|
for plg in pluginlist:
|
|
path = "pyexecplugins." + plg
|
|
try:
|
|
importlib.import_module(path)
|
|
except Exception as err:
|
|
logging.warning("Skipping plugin %s %s", path, err)
|
|
continue
|
|
|
|
PyExecPlugin.registerTaskDefinitions(self.server, self.auth)
|
|
|
|
def start(self):
|
|
self.logger.info("Started PyExec with plugins: %s", PyExecPlugin.getPluginNames())
|
|
cc = ConductorWorker(self.server, self.threads, self.pollrate, self.workerid, self.auth)
|
|
|
|
# start workers for all included plugins that are available also as standalone tasks ...
|
|
pluginnames = PyExecPlugin.getPluginNames()
|
|
for i in range(len(pluginnames)):
|
|
plugin = PyExecPlugin.get(pluginnames[i])
|
|
if plugin.hasDefinition(plugin):
|
|
self.logger.debug("Starting plugin %s", plugin)
|
|
domain = self.computeDomain(plugin.taskdef["name"])
|
|
cc.start(plugin.taskdef["name"], self.pyexec, i == len(pluginnames)-1, domain)
|
|
|
|
# ... plus fallback pyexec for managing non standalone calls, Nop, Identity and Sequence
|
|
#domain = self.computeDomain("pyexec")
|
|
#cc.start('pyexec', self.pyexec, True, domain)
|
|
|
|
def pyexec(self, task):
|
|
try:
|
|
self.logger.info("Executing task {} of type {}[{}] from wkf {}[{}]".format(
|
|
task["workflowTask"]["taskReferenceName"],
|
|
task["taskDefName"],
|
|
task["taskId"],
|
|
task["workflowType"],
|
|
task["workflowInstanceId"]))
|
|
|
|
# Get operation from input or task type (in case of standalone tasks) fallback to Nop
|
|
operation = task["inputData"].get("operation") or task["taskDefName"] or "Nop"
|
|
self.logger.info("Operation is %s", operation)
|
|
|
|
# Get plugin by checking either name or alias (in case of standalone tasks the task type is aliased to the plugin name)
|
|
p = PyExecPlugin.get(operation) or PyExecPlugin.getAlias(operation)
|
|
self.logger.debug("Plugin is %s", p)
|
|
|
|
if p != None:
|
|
pi = p(task["inputData"], self.cfg)
|
|
ret = pi.execute()
|
|
else: raise Exception("Operation {} not found.".format(operation))
|
|
return { "status" : "COMPLETED", "output" : ret, "logs" : ["one","two"]}
|
|
|
|
except Exception as exc:
|
|
exc_type, exc_obj, exc_tb = sys.exc_info()
|
|
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
|
|
self.logger.debug("%s %s %s", exc_type, fname, exc_tb.tb_lineno)
|
|
self.logger.error("%s", str(exc))
|
|
return { "status" : "FAILED", "output" : { "message" : str(exc) }, "logs" : ["one","two"]}
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('-c', '--config', nargs="?", default='config.cfg', const='config.cfg', help='Specify a config file. Default is config.cfg')
|
|
parser.add_argument('commands', nargs='+', help='Use Ansible, Packer, Http, Eval, Mail for declaring the tasks to poll for')
|
|
args = parser.parse_args()
|
|
pyexec = PyExec(args)
|
|
pyexec.start()
|