From 4526d0c99995e433326269f5f042aab015de1602 Mon Sep 17 00:00:00 2001 From: dcore94 Date: Wed, 14 Oct 2020 18:06:28 +0200 Subject: [PATCH] major refactoring --- Dockerfile | 2 +- PyExec.py | 100 ++++++++++ README.md | 8 +- conductor/conductor.py | 3 +- config.cfg | 12 +- entrypoint.sh | 5 +- pyexec.py | 47 ----- pyexecplugins/Ansible.py | 11 +- pyexecplugins/Eval.py | 21 +++ pyexecplugins/GatewaysToRestTask.py | 45 ----- pyexecplugins/Http.py | 72 +++++++ pyexecplugins/Packer.py | 42 +++++ pyexecplugins/Shell.py | 35 ++++ pyexecplugins/pyexecplugins.py | 279 ++++++++++++++++------------ pyrest.py | 43 ----- requirements.txt | 1 - setup.py | 4 +- 17 files changed, 456 insertions(+), 274 deletions(-) create mode 100644 PyExec.py delete mode 100644 pyexec.py create mode 100644 pyexecplugins/Eval.py delete mode 100644 pyexecplugins/GatewaysToRestTask.py create mode 100644 pyexecplugins/Http.py create mode 100644 pyexecplugins/Packer.py create mode 100644 pyexecplugins/Shell.py delete mode 100644 pyrest.py diff --git a/Dockerfile b/Dockerfile index 3cf7ecc..c9085e1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,7 @@ FROM python:3.7 COPY conductor/ /app/conductor COPY pyexecplugins/ /app/pyexecplugins -COPY requirements.txt pyrest.py pyexec.py config.cfg entrypoint.sh /app/ +COPY requirements.txt PyExec.py config.cfg entrypoint.sh /app/ RUN pip install -r /app/requirements.txt WORKDIR /app ENTRYPOINT ["/app/entrypoint.sh"] diff --git a/PyExec.py b/PyExec.py new file mode 100644 index 0000000..55263f0 --- /dev/null +++ b/PyExec.py @@ -0,0 +1,100 @@ +import configparser +import importlib +import logging +import sys +import os +import random +from pyexecplugins.pyexecplugins import * +from conductor.ConductorWorker import ConductorWorker + +class PyExec(): + errors = { "info" : logging.INFO, "error" : logging.ERROR, "debug" : logging.DEBUG} + + def __init__(self, pluginlist): + self.init(pluginlist) + + def computeDomain(self, plg): + domain = self.getDomain(plg) + commondomain = self.getDomain("common") + if domain is not None and commondomain is not None: + domain = domain + "," + commondomain + elif commondomain is not None: + domain = commondomain + return domain + + def getDomain(self, plg): + if self.cfg.has_section(plg): + return self.cfg[plg].get("domain") + else: + return None + + def init(self, pluginlist): + self.cfg = configparser.ConfigParser() + self.cfg.read("config.cfg") + self.workerid = self.cfg["common"].get("workerid", "pythonworker-" + str(random.randint(1000,10000))) + + logging.basicConfig(level=self.errors[self.cfg["common"]["loglevel"]]) + logging.info("Initializing PyExec worker %s with config.cfg", self.workerid) + + self.threads = self.cfg["common"].getint("threads", 3) + self.pollrate = self.cfg["common"].getfloat("pollrate", .1) + self.server = os.environ.get('CONDUCTOR_SERVER', self.cfg["common"].get("server", "http://localhost:8080/api")) + for plg in pluginlist: + path = "pyexecplugins." + plg + try: + importlib.import_module(path) + except Exception as err: + logging.warning("Skipping plugin %s %s", path, err) + continue + + PyExecPlugin.registerTaskDefinitions(self.server) + + def start(self): + logging.info("Started PyExec with plugins: %s", PyExecPlugin.getPluginNames()) + cc = ConductorWorker(self.server, self.threads, self.pollrate, self.workerid) + + # start workers for all included plugins that are available also as standalone tasks ... + for plg in PyExecPlugin.getPlugins(): + plugin = PyExecPlugin.get(plg) + if plugin.supportsStandalone(plugin): + domain = self.computeDomain(plugin.taskdef["name"]) + cc.start(plugin.taskdef["name"], self.pyexec, False, domain) + + # ... plus fallback pyexec for managing non standalone calls, Nop, Identity and Sequence + domain = self.computeDomain("pyexec") + cc.start('pyexec', self.pyexec, True, domain) + + def pyexec(self, task): + try: + logging.info("Executing task {} of type {}[{}] from wkf {}[{}]".format( + task["workflowTask"]["taskReferenceName"], + task["taskDefName"], + task["taskId"], + task["workflowType"], + task["workflowInstanceId"])) + + # Get operation from input or task type (in case of standalone tasks) fallback to Nop + operation = task["inputData"].get("operation") or task["taskDefName"] or "Nop" + logging.debug("Operation is %s", operation) + + # Get plugin by checking either name or alias (in case of standalone tasks the task type is aliased to the plugin name) + p = PyExecPlugin.get(operation) or PyExecPlugin.getAlias(operation) + logging.debug("Plugin is %s", p) + + if p != None: + pi = p(task["inputData"]) + ret = pi.execute() + + else: raise Exception("Operation {} not found.".format(operation)) + return { "status" : "COMPLETED", "output" : ret, "logs" : ["one","two"]} + + except Exception as exc: + exc_type, exc_obj, exc_tb = sys.exc_info() + fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] + logging.debug(exc_type, fname, exc_tb.tb_lineno) + logging.error(str(exc)) + return { "status" : "FAILED", "output" : { "message" : str(exc) }, "logs" : ["one","two"]} + +if __name__ == '__main__': + pyexec = PyExec(sys.argv[1:]) + pyexec.start() diff --git a/README.md b/README.md index 58ae02d..bad1990 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,14 @@ # Python workers for Conductor Python worker for Conductor provides : -1. PYREST Comprehensive REST client -2. PYEXEC Execute code from available modules +1. PyExec: a plugin based set of workers for Netflix OSS Conductor written in Python3 for ## Install ### Example +``` +python3 PyExec # Runs a bare PyExec supporting only Nop, Identity and Sequence plugins +python3 PyExec Http Ansible Packer #this runs PyExec with pyrest, pyansible and pypacker plugins enabled +python3 PyExec Shell Eval # runs a a PyExec with only pyshell and py eval enabled. pyshell executes shell commands, pyeval executes arbitrary Python3 code +``` diff --git a/conductor/conductor.py b/conductor/conductor.py index 47ed3b4..ae84943 100644 --- a/conductor/conductor.py +++ b/conductor/conductor.py @@ -62,7 +62,7 @@ class BaseClient(object): theHeader = self.mergeTwoDicts(self.headers, headers) if body is not None: - jsonBody = json.dumps(body, ensure_ascii=False) + jsonBody = json.dumps(body, ensure_ascii=True) resp = requests.put(theUrl, params=queryParams, data=jsonBody, headers=theHeader) else: resp = requests.put(theUrl, params=queryParams, headers=theHeader) @@ -190,7 +190,6 @@ class TaskClient(BaseClient): params['workerid'] = workerid if domain is not None: params['domain'] = domain - try: return self.get(url, params) except Exception as err: diff --git a/config.cfg b/config.cfg index 3515b5d..2374e47 100644 --- a/config.cfg +++ b/config.cfg @@ -1,13 +1,11 @@ [common] -server = http://nubis2.int.d4science.net:8080/api -#domain = comma separated list of task domains to be applied to all task types as fallback - -[pyrest] +loglevel = info +server = http://conductorserver:8080/api +#server = http://localhost:8080/api threads = 3 pollrate = .1 -#domain = comma separated list of task domains to be applied to pyrest task type +#domain = comma separated list of task domains to be applied to all task types as fallback +domain = marco [pyexec] -threads = 3 -pollrate = .1 #domain = comma separated list of task domains to be applied to pyexec task type diff --git a/entrypoint.sh b/entrypoint.sh index e04591c..2683a8f 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -9,7 +9,6 @@ while [[ "$(curl -s -o /dev/null -L -w ''%{http_code}'' $BASE/health)" != 200 ]] sleep 5 done -echo 'Starting workers...' +echo 'Starting default workers...' -python3 pyrest.py & -python3 pyexec.py +python3 PyExec.py Http Shell Eval diff --git a/pyexec.py b/pyexec.py deleted file mode 100644 index 7f07520..0000000 --- a/pyexec.py +++ /dev/null @@ -1,47 +0,0 @@ -from conductor.ConductorWorker import ConductorWorker -import configparser -import sys -import os -from pyexecplugins.pyexecplugins import * -from pyexecplugins.Ansible import Ansible - -def pyexec(task): - try: - print("Executing task {} of type {}[{}] from wkf {}[{}]".format(task["workflowTask"]["taskReferenceName"], task["taskDefName"], task["taskId"], task["workflowType"], task["workflowInstanceId"])) - - operation = task["inputData"].get("operation") or "Nop" - p = PyExecPlugin.get(operation) - if p != None: - pi = p(task["inputData"]) - ret = pi.execute() - else: raise Exception("Operation {} not found.".format(operation)) - return { "status" : "COMPLETED", "output" : ret, "logs" : ["one","two"]} - - except Exception as exc: - exc_type, exc_obj, exc_tb = sys.exc_info() - fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] - print(exc_type, fname, exc_tb.tb_lineno) - return { "status" : "FAILED", "output" : { "message" : "Internal error: {}".format(exc)}, "logs" : ["one","two"]} - -def main(): - - print("Started PyExec with plugins:", PyExecPlugin.getPlugins()) - - cfg = configparser.ConfigParser() - cfg.read("config.cfg") - - threads = cfg["pyexec"].getint("threads") - pollrate = cfg["pyexec"].getfloat("pollrate") - server = os.environ.get('CONDUCTOR_SERVER', cfg["common"]["server"]) - - domain = cfg["pyexec"].get("domain") - if domain is None: - domain = cfg["common"].get("domain") - elif cfg["common"].get("domain") is not None: - domain = domain + "," + cfg["common"].get("domain") - - cc = ConductorWorker(server, threads, pollrate, "pyexec") - cc.start('pyexec', pyexec, True, domain) - -if __name__ == '__main__': - main() diff --git a/pyexecplugins/Ansible.py b/pyexecplugins/Ansible.py index ba70d73..4157bf0 100644 --- a/pyexecplugins/Ansible.py +++ b/pyexecplugins/Ansible.py @@ -1,6 +1,7 @@ from pyexecplugins.pyexecplugins import PyExecPlugin import json import shutil +import logging import ansible.constants as C from ansible.executor.task_queue_manager import TaskQueueManager from ansible.module_utils.common.collections import ImmutableDict @@ -41,6 +42,14 @@ class ResultsCollectorJSONCallback(CallbackBase): class Ansible(PyExecPlugin): name = "Ansible" + taskdef = { + "name" : "pyansible", + "retryCount" : 0, + "description" : "Execute ansible playbook", + "inputKeys" : ["playbook", "hosts", "connection", "verbosity", "extra_vars", "gather_facts"], + "outputKeys" : ["ok", "failed", "unreachable"], + "ownerEmail" : "m.lettere@gmail.com" + } def __init__(self, data=None): super().__init__(data) @@ -93,7 +102,7 @@ class Ansible(PyExecPlugin): # create data structure that represents our play play_sources = loader.load(self.playbook) for play_source in play_sources: - print(">>> Executing playsource:",play_source) + logging.debug("Executing playsource:",play_source) play_source["gather_facts"] = self.gather_facts # Create play object, playbook objects use .load instead of init or new methods, # this will also automatically create the task objects from the info provided in play_source diff --git a/pyexecplugins/Eval.py b/pyexecplugins/Eval.py new file mode 100644 index 0000000..16e6664 --- /dev/null +++ b/pyexecplugins/Eval.py @@ -0,0 +1,21 @@ +from pyexecplugins.pyexecplugins import PyExecPlugin + +class Plugin(PyExecPlugin): + name = "Eval" + taskdef = { + "name" : "pyeval", + "description" : "Execute arbitrary python code", + "inputKeys" : ["code"], + "outputKeys" : ["result"], + "ownerEmail" : "m.lettere@gmail.com" + } + + def __init__(self, data=None): + super().__init__(data) + + def execute(self): + code = self.data.get("code") + if code != None: + ret = eval(code, { "data" : self.data}) + return ret + diff --git a/pyexecplugins/GatewaysToRestTask.py b/pyexecplugins/GatewaysToRestTask.py deleted file mode 100644 index aa205f8..0000000 --- a/pyexecplugins/GatewaysToRestTask.py +++ /dev/null @@ -1,45 +0,0 @@ -from pyexecplugins.pyexecplugins import * - -class GatewayToRestTask(PyExecPlugin): - name = "GatewayToRestTask" - - def __init__(self, data=None): - super().__init__(data) - self.urls = data["gateway_list"] - - def execute(self): - output = { "tasks" : [] , "urls": {}} - i = 0 - for url in self.urls: - i += 1 - trefname = "dt{}".format(i) - output["urls"][trefname] = {} - output["tasks"].append({ - "name" : "pyrest", - "taskReferenceName" : trefname, - "type" : "SIMPLE", - "inputParameters" : { - "url" : "${workflow.input.keycloak_admin_url}", - "method" : "POST", - "expect" : [201, 409], - "headers" : { - "Authorization" : "Bearer ${authorize.output.body.access_token}", - "Content-Type" : "application/json" - }, - "body" : { - "clientId": url.split("/")[2], - "baseUrl": url, - "enabled": True, - "clientAuthenticatorType": "client-secret", - "redirectUris": [url + "/*"], - "webOrigins": ["/*"], - "standardFlowEnabled": True, - "directAccessGrantsEnabled": True, - "publicClient": True, - "protocol": "openid-connect", - "attributes": {"login_theme": "d4science" }, - "fullScopeAllowed": True - } - } - }) - return output diff --git a/pyexecplugins/Http.py b/pyexecplugins/Http.py new file mode 100644 index 0000000..a507a0a --- /dev/null +++ b/pyexecplugins/Http.py @@ -0,0 +1,72 @@ +import requests +import json +import logging +from pyexecplugins.pyexecplugins import PyExecPlugin + +class Plugin(PyExecPlugin): + name = "Http" + + taskdef = { + "name" : "pyrest", + "description" : "Execute an HTTP request with pyrest worker", + "inputKeys" : ["url", "body", "contentType", "method", "accept", "headers", "connectionTimeout", "readTimeout"], + "outputKeys" : ["body", "status", "reason", "headers"], + "ownerEmail" : "m.lettere@gmail.com" + } + + def __init__(self, data): + super().__init__(data) + self.method = data.get("method") or "get" + self.url = data.get("url") + self.headers = data.get("headers") or {} + self.contenttype = self.headers.get("Content-Type") + self.accept = self.headers.get("Accept") + self.body = data.get("body") + self.params = data.get("params") + self.expect = data.get("expect") + self.fail = data.get("fail") + + def doRequest(self): + logging.debug(self.method, self.url, self.contenttype, self.accept) + if self.contenttype != None and self.contenttype.find("json") != -1: + self.response = requests.request(self.method, self.url, headers=self.headers, json = self.body) + else: + self.response = requests.request(self.method, self.url, headers=self.headers, data = self.body, params = self.params) + return self.response + + def computeStatus(self): + if self.fail == False: + return "COMPLETED" + elif self.expect == None: + return "COMPLETED" if self.response.ok else "FAILED" + else: + if type(self.expect) == list: + return "COMPLETED" if (self.response.status_code in self.expect) else "FAILED" + else: + return "COMPLETED" if (self.response.status_code == self.expect) else "FAILED" + + def buildOutput(self, status): + hdrs = {} + for k in self.response.headers.keys(): hdrs[k] = self.response.headers[k] + + logging.info("Response: {} {}".format(self.response.status_code, self.response.reason)) + + if hdrs.get("Content-Type") != None and hdrs["Content-Type"].find("json") != -1 or self.accept != None and self.accept.find("json") != -1: + outbody = self.response.json() if len(self.response.content) != 0 else None + else: + outbody = self.response.text + + logging.debug(outbody) + if status == "FAILED": + raise("HTTP call failed with status {} ({}) - {}".format(self.response.status_code, self.response.reson, str(outbody))) + return { + "body" : outbody, + "headers" : hdrs, + "status" : self.response.status_code, + "reason" : self.response.reason + } + + def execute(self): + self.doRequest() + status = self.computeStatus() + return self.buildOutput(status) diff --git a/pyexecplugins/Packer.py b/pyexecplugins/Packer.py new file mode 100644 index 0000000..76df738 --- /dev/null +++ b/pyexecplugins/Packer.py @@ -0,0 +1,42 @@ +from pyexecplugins.pyexecplugins import PyExecPlugin +import tempfile +import json +import subprocess + +class Plugin(PyExecPlugin): + name = "Packer" + + taskdef = { + "name" : "pypacker", + "retryCount" : 0, + "description" : "Executes packer.io command line for build and validate. It has been isolated in order to be able to start the worker only where OS dependencies are matched.", + "inputKeys" : ["command", "template"], + "outputKeys" : ["results"], + "ownerEmail" : "m.lettere@gmail.com" + } + + def __init__(self, data=None): + super().__init__(data) + self.template = data["template"] + self.command = data.get("command", "build") + + def execute(self): + fp = tempfile.NamedTemporaryFile(mode="w", delete=False) + fp.write(json.dumps(self.template)) + fp.close() + completed = subprocess.run(["packer", self.command, fp.name], capture_output=True, text=True) + if(completed.returncode != 0): + raise Exception("packer.io failed: {} - out:{} - err:{}".format( + completed.returncode, + completed.stdout if completed.stdout != None else "", + completed.stderr if completed.stderr != None else "") + ) + return { + 'results' : [ + { + 'returncode' : completed.returncode, + 'stdout' : completed.stdout, + 'stderr': completed.stderr + } + ] + } diff --git a/pyexecplugins/Shell.py b/pyexecplugins/Shell.py new file mode 100644 index 0000000..8ad73d6 --- /dev/null +++ b/pyexecplugins/Shell.py @@ -0,0 +1,35 @@ +import subprocess +import logging +from pyexecplugins.pyexecplugins import PyExecPlugin + +class Plugin(PyExecPlugin): + name = "Shell" + + taskdef = { + "name" : "pyshell", + "description" : "Execute an Shell commands on target machine. Commands are in the form of an array of objects like { 'line': 'ls -l', 'expect' : 0, 'withshell' : False}. For expect 0 is the default, for withshell False is the default thus they can be omitted.", + "inputKeys" : ["commands"], + "outputKeys" : ["results"], + "ownerEmail" : "m.lettere@gmail.com" + } + + def __init__(self, data=None): + super().__init__(data) + + def execute(self): + results = [] + for cmdentry in self.data.get("commands"): + expect = cmdentry.get("expect", 0) + cmd = cmdentry.get("line") + #Use shell=True for very complex cmdlines that do have quotes and other amenities + withshell = cmdentry.get("withshell", self.data.get("withshell", False)) + if cmd == None: + continue + else: + cmd = cmd.split() if not withshell else cmd + logging.debug("Going to execute (withshell=%b) %s", withshell, cmd) + completed = subprocess.run(cmd, shell=withshell, capture_output=True, text=True) + results.append({ 'returncode' : completed.returncode, 'stdout' : completed.stdout, 'stderr': completed.stderr}) + if completed.returncode != expect: + raise Exception("Intermediate command failed with status code " + str(completed.returncode) ) + return { "results": results } diff --git a/pyexecplugins/pyexecplugins.py b/pyexecplugins/pyexecplugins.py index 3c60945..a652999 100644 --- a/pyexecplugins/pyexecplugins.py +++ b/pyexecplugins/pyexecplugins.py @@ -1,28 +1,65 @@ -import requests -import subprocess +import requests; import json class PyExecPlugins(type): def __init__(cls, name, bases, attrs): if not hasattr(cls, "plugins"): - print("Initializing plugins") + #print("Initializing plugins") cls.plugins = {} + cls.alias = {} else: - print("Appending a plugin ", cls.name, cls) + #print("Appending a plugin ", cls.name, cls) cls.plugins[cls.name] = cls - + #Alias plugin also with taskdefinition so it will be lookup-able with poth Operation name and taskdefinition + if cls.taskdef is not None: + cls.alias[cls.taskdef["name"]] = cls + def getPlugins(cls): return cls.plugins + def getPluginNames(cls): + return [*cls.plugins.keys()] + def get(cls, name): return cls.plugins.get(name) + + def getAlias(cls, name): + return cls.alias.get(name) + + def registerTaskDefinitions(cls, server): + url = server + "/metadata/taskdefs" + #pyexec generic taskdefinition + taskdefs = [ + { + "name" : "pyexec", + "description" : "Execute PyExec operations", + "inputKeys" : ["operation"], + "outputKeys" : ["ret"], + "ownerEmail" : "m.lettere@gmail.com" + } + ] + for plg in cls.getPluginNames(): + if cls.plugins[plg].taskdef is not None: + taskdef = cls.plugins[plg].taskdef + taskdefs.append(taskdef) + + #Post all new (or not) task definitions to orchestrator + headers = {'Content-Type': 'application/json'} + try: + response = requests.request("POST", url, headers=headers, data=json.dumps(taskdefs)) + except Exception as e: + print("Unable to register task defs", e) class PyExecPlugin(object, metaclass=PyExecPlugins): def __init__(self, data=None): self.data = data + def supportsStandalone(self): + return (self.taskdef is not None) + class Nop(PyExecPlugin): name = "Nop" + taskdef = None def __init__(self, data=None): super().__init__(data) @@ -32,6 +69,7 @@ class Nop(PyExecPlugin): class Identity(PyExecPlugin): name = "Identity" + taskdef = None def __init__(self, data=None): super().__init__(data) @@ -42,6 +80,7 @@ class Identity(PyExecPlugin): class Sequence(PyExecPlugin): name = "Sequence" + taskdef = None def __init__(self, data=None): super().__init__(data) @@ -66,121 +105,121 @@ class Sequence(PyExecPlugin): self.handleSequence(self.data.get("tasks",[]),self.result) return {"result": self.result} -class Http(PyExecPlugin): - name = "Http" - - def __init__(self, data): - super().__init__(data) - self.method = data.get("method") or "get" - self.url = data.get("url") - self.headers = data.get("headers") or {} - self.contenttype = self.headers.get("Content-Type") - self.accept = self.headers.get("Accept") - self.body = data.get("body") - self.params = data.get("params") - self.expect = data.get("expect") - self.fail = data.get("fail") - - def doRequest(self): - #print(self.method, self.url, self.contenttype, self.accept) - if self.contenttype != None and self.contenttype.find("json") != -1: - self.response = requests.request(self.method, self.url, headers=self.headers, json = self.body) - else: - self.response = requests.request(self.method, self.url, headers=self.headers, data = self.body, params = self.params) - return self.response - - def computeStatus(self): - if self.fail == False: - return "COMPLETED" - elif self.expect == None: - return "COMPLETED" if self.response.ok else "FAILED" - else: - if type(self.expect) == list: - return "COMPLETED" if (self.response.status_code in self.expect) else "FAILED" - else: - return "COMPLETED" if (self.response.status_code == self.expect) else "FAILED" - - def buildOutput(self, status): - hdrs = {} - for k in self.response.headers.keys(): hdrs[k] = self.response.headers[k] - - print("Response: {} {}".format(self.response.status_code, self.response.reason)) - - if hdrs.get("Content-Type") != None and hdrs["Content-Type"].find("json") != -1 or self.accept != None and self.accept.find("json") != -1: - outbody = self.response.json() if len(self.response.content) != 0 else None - else: - outbody = self.response.text - #print(outbody) - return { - 'status': status, - 'output': { - "body" : outbody, - "headers" : hdrs, - "status" : self.response.status_code, - "reason" : self.response.reason - }, - 'logs': ['one', 'two'] - } - - def execute(self): - self.doRequest() - status = self.computeStatus() - return self.buildOutput(status) +#class Http(PyExecPlugin): +# name = "Http" +# +# def __init__(self, data): +# super().__init__(data) +# self.method = data.get("method") or "get" +# self.url = data.get("url") +# self.headers = data.get("headers") or {} +# self.contenttype = self.headers.get("Content-Type") +# self.accept = self.headers.get("Accept") +# self.body = data.get("body") +# self.params = data.get("params") +# self.expect = data.get("expect") +# self.fail = data.get("fail") +# +# def doRequest(self): +# #print(self.method, self.url, self.contenttype, self.accept) +# if self.contenttype != None and self.contenttype.find("json") != -1: +# self.response = requests.request(self.method, self.url, headers=self.headers, json = self.body) +# else: +# self.response = requests.request(self.method, self.url, headers=self.headers, data = self.body, params = self.params) +# return self.response +# +# def computeStatus(self): +# if self.fail == False: +# return "COMPLETED" +# elif self.expect == None: +# return "COMPLETED" if self.response.ok else "FAILED" +# else: +# if type(self.expect) == list: +# return "COMPLETED" if (self.response.status_code in self.expect) else "FAILED" +# else: +# return "COMPLETED" if (self.response.status_code == self.expect) else "FAILED" +# +# def buildOutput(self, status): +# hdrs = {} +# for k in self.response.headers.keys(): hdrs[k] = self.response.headers[k] +# +# print("Response: {} {}".format(self.response.status_code, self.response.reason)) +# +# if hdrs.get("Content-Type") != None and hdrs["Content-Type"].find("json") != -1 or self.accept != None and #self.accept.find("json") != -1: +# outbody = self.response.json() if len(self.response.content) != 0 else None +# else: +# outbody = self.response.text +# #print(outbody) +# return { +# 'status': status, +# 'output': { +# "body" : outbody, +# "headers" : hdrs, +# "status" : self.response.status_code, +# "reason" : self.response.reason +# }, +# 'logs': ['one', 'two'] +# } +# +# def execute(self): +# self.doRequest() +# status = self.computeStatus() +# return self.buildOutput(status) -class Eval(PyExecPlugin): - name = "Eval" - - def __init__(self, data=None): - super().__init__(data) - - def execute(self): - code = self.data.get("code") - if code != None: - ret = eval(code, { "data" : self.data}) - return { "result" : ret } +#class Eval(PyExecPlugin): +# name = "Eval" +# +# def __init__(self, data=None): +# super().__init__(data) +# +# def execute(self): +# code = self.data.get("code") +# if code != None: +# ret = eval(code, { "data" : self.data}) +# return { "result" : ret } -class Write(PyExecPlugin): - name = "Write" - - def __init__(self, data=None): - super().__init__(data) - - def execute(self): - print("File opening at", self.data["path"]) - fo = open(self.data["path"], "w") - print("File opened", fo) - typ = self.data.get("type", "text") - if typ == "json": - fo.write(json.dumps(self.data.get("content","{}"))) - else: - fo.write(self.data.get("content", "")) - fo.close() - return {} +#class Write(PyExecPlugin): +# name = "Write" +# +# def __init__(self, data=None): +# super().__init__(data) +# +# def execute(self): +# print("File opening at", self.data["path"]) +# fo = open(self.data["path"], "w") +# print("File opened", fo) +# typ = self.data.get("type", "text") +# if typ == "json": +# fo.write(json.dumps(self.data.get("content","{}"))) +# else: +# fo.write(self.data.get("content", "")) +# fo.close() +# return {} -class Shell(PyExecPlugin): - name = "Shell" - - def __init__(self, data=None): - super().__init__(data) - - def execute(self): - output = { - 'results' : [], - 'status' : "COMPLETED" - } - for cmdentry in self.data.get("commands"): - expect = cmdentry.get("expect", 0) - cmd = cmdentry.get("line") - #Use shell=True for very complex cmdlines that do have quotes and ther amenities - withshell = cmdentry.get("withshell", self.data.get("withshell", False)) - if cmd == None: - continue - else: - cmd = cmd.split() if not withshell else cmd - print("Going to execute", withshell, cmd) - completed = subprocess.run(cmd, shell=withshell, capture_output=True, text=True) - output["results"].append({ 'returncode' : completed.returncode, 'stdout' : completed.stdout, 'stderr': completed.stderr}) - if completed.returncode != expect: - output["status"] = "FAILED" - break - return output +#class Shell(PyExecPlugin): +# name = "Shell" +# +# def __init__(self, data=None): +# super().__init__(data) +# +# def execute(self): +# output = { +# 'results' : [], +# 'status' : "COMPLETED" +# } +# for cmdentry in self.data.get("commands"): +# expect = cmdentry.get("expect", 0) +# cmd = cmdentry.get("line") +# #Use shell=True for very complex cmdlines that do have quotes and ther amenities +# withshell = cmdentry.get("withshell", self.data.get("withshell", False)) +# if cmd == None: +# continue +# else: +# cmd = cmd.split() if not withshell else cmd +# print("Going to execute", withshell, cmd) +# completed = subprocess.run(cmd, shell=withshell, capture_output=True, text=True) +# output["results"].append({ 'returncode' : completed.returncode, 'stdout' : completed.stdout, 'stderr': completed.stderr}) +# if completed.returncode != expect: +# output["status"] = "FAILED" +# break +# return output diff --git a/pyrest.py b/pyrest.py deleted file mode 100644 index 6edc410..0000000 --- a/pyrest.py +++ /dev/null @@ -1,43 +0,0 @@ -from conductor.ConductorWorker import ConductorWorker -from pyexecplugins.pyexecplugins import Http -import configparser -import sys -import os - -def log(task, message): - print("[{}:{}][{}:{}] - {}".format(task["workflowType"], task["workflowInstanceId"], task["taskDefName"], task["workflowTask"]["taskReferenceName"], message)) - -def pyrest(task): - log(task, "Starting execution ...") - log(task, "Data is {} {} {}".format(task["inputData"].get("method"), task["inputData"]["url"], task["inputData"].get("body"))) - http = Http(task["inputData"]) - try: - ret = http.execute() - return ret - except Exception as exc: - exc_type, exc_obj, exc_tb = sys.exc_info() - fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] - return { - "status" : "FAILED", - "output" : { "message" : "Internal error: {}".format(exc)}, "logs" : ["one","two"] - } - -def main(): - cfg = configparser.ConfigParser() - cfg.read("config.cfg") - - threads = cfg["pyrest"].getint("threads") - pollrate = cfg["pyrest"].getfloat("pollrate") - server = os.environ.get('CONDUCTOR_SERVER', cfg["common"]["server"]) - - domain = cfg["pyrest"].get("domain") - if domain is None: - domain = cfg["common"].get("domain") - elif cfg["common"].get("domain") is not None: - domain = domain + "," + cfg["common"].get("domain") - - cc = ConductorWorker(server, threads, pollrate, "pyrest") - cc.start('pyrest', pyrest, True, domain) - -if __name__ == '__main__': - main() diff --git a/requirements.txt b/requirements.txt index 17cb9c3..a3751e6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ -ansible >= 2.8.3 requests >= 2.19.1 diff --git a/setup.py b/setup.py index f10a2fd..f41a7f8 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ from setuptools import setup setup( name = 'nwconductorworkers', packages = ['nwconductorworkers'], # this must be the same as the name above - version = '1.0.0', + version = '2.0.0', description = 'Python based Conductor Workers', author = 'Marco Lettere', author_email = 'marco.lettere@nubisware.com', @@ -42,4 +42,4 @@ setup( 'Topic :: Software Development :: Libraries :: Python Modules', 'Topic :: System :: Networking' ], -) + )