From 028ed9e93493a6922910f6ca1652f9a6aa87fecb Mon Sep 17 00:00:00 2001 From: dcore94 Date: Fri, 19 Jun 2020 10:50:02 +0200 Subject: [PATCH] First share --- .gitignore | 3 + README.md | 10 + conductor/ConductorWorker.py | 143 +++++++++++ conductor/__init__.py | 19 ++ conductor/conductor.py | 381 ++++++++++++++++++++++++++++ config.cfg | 11 + defaultmodule.py | 9 + keycloak_auth.py | 25 ++ pyexec.py | 39 +++ pyexecplugins/GatewaysToRestTask.py | 45 ++++ pyexecplugins/pyexecplugins.py | 104 ++++++++ pyrest.py | 20 ++ setup.cfg | 2 + setup.py | 45 ++++ 14 files changed, 856 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 conductor/ConductorWorker.py create mode 100644 conductor/__init__.py create mode 100644 conductor/conductor.py create mode 100644 config.cfg create mode 100644 defaultmodule.py create mode 100644 keycloak_auth.py create mode 100644 pyexec.py create mode 100644 pyexecplugins/GatewaysToRestTask.py create mode 100644 pyexecplugins/pyexecplugins.py create mode 100644 pyrest.py create mode 100644 setup.cfg create mode 100644 setup.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6e4266f --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +*.py[cod] +*$py.class diff --git a/README.md b/README.md new file mode 100644 index 0000000..58ae02d --- /dev/null +++ b/README.md @@ -0,0 +1,10 @@ +# Python workers for Conductor +Python worker for Conductor provides : + +1. PYREST Comprehensive REST client +2. PYEXEC Execute code from available modules + +## Install + +### Example + diff --git a/conductor/ConductorWorker.py b/conductor/ConductorWorker.py new file mode 100644 index 0000000..90300be --- /dev/null +++ b/conductor/ConductorWorker.py @@ -0,0 +1,143 @@ +# +# Copyright 2017 Netflix, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import print_function, absolute_import +import sys +import time +from conductor.conductor import WFClientMgr +from threading import Thread +import socket + +hostname = socket.gethostname() + + +class ConductorWorker: + """ + Main class for implementing Conductor Workers + + A conductor worker is a separate system that executes the various + tasks that the conductor server queues up for execution. The worker + can run on the same instance as the server or on a remote instance. + + The worker generally provides a wrapper around some function that + performs the actual execution of the task. The function that is + being executed must return a `dict` with the `status`, `output` and + `log` keys. If these keys are not present, the worker will raise an + Exception after completion of the task. + + The start method is used to begin continous polling and execution + of the tasks that the conductor server makes available. The same + script can run multiple workers using the wait argument. For more + details, view the start method + """ + def __init__(self, server_url, thread_count, polling_interval, worker_id=None): + """ + Parameters + ---------- + server_url: str + The url to the server hosting the conductor api. + Ex: 'http://localhost:8080/api' + thread_count: int + The number of threads that will be polling for and + executing tasks in case of using the start method. + polling_interval: float + The number of seconds that each worker thread will wait + between polls to the conductor server. + worker_id: str, optional + The worker_id of the worker that is going to execute the + task. For further details, refer to the documentation + By default, it is set to hostname of the machine + """ + wfcMgr = WFClientMgr(server_url) + self.workflowClient = wfcMgr.workflowClient + self.taskClient = wfcMgr.taskClient + self.thread_count = thread_count + self.polling_interval = polling_interval + self.worker_id = worker_id or hostname + + def execute(self, task, exec_function): + try: + #print("Exec function is", exec_function) + resp = exec_function(task) + if type(resp) is not dict or not all(key in resp for key in ('status', 'output', 'logs')): + raise Exception('Task execution function MUST return a response as a dict with status, output and logs fields') + task['status'] = resp['status'] + task['outputData'] = resp['output'] + task['logs'] = resp['logs'] + if 'reasonForIncompletion' in resp: + task['reasonForIncompletion'] = resp['reasonForIncompletion'] + self.taskClient.updateTask(task) + except Exception as err: + print('Error executing task: ' + str(err)) + task['status'] = 'FAILED' + self.taskClient.updateTask(task) + + def poll_and_execute(self, taskType, exec_function, domain=None): + while True: + time.sleep(float(self.polling_interval)) + polled = self.taskClient.pollForTask(taskType, self.worker_id, domain) + if polled is not None: + self.taskClient.ackTask(polled['taskId'], self.worker_id) + self.execute(polled, exec_function) + + def start(self, taskType, exec_function, wait, domain=None): + """ + start begins the continuous polling of the conductor server + + Parameters + ---------- + taskType: str + The name of the task that the worker is looking to execute + exec_function: function + The function that the worker will execute. The function + must return a dict with the `status`, `output` and `logs` + keys present. If this is not present, an Exception will be + raised + wait: bool + Whether the worker will block execution of further code. + Since the workers are being run in daemon threads, when the + program completes execution, all the threads are destroyed. + Setting wait to True prevents the program from ending. + If multiple workers are being called from the same program, + all but the last start call but have wait set to False. + The last start call must always set wait to True. If a + single worker is being called, set wait to True. + domain: str, optional + The domain of the task under which the worker will run. For + further details refer to the conductor server documentation + By default, it is set to None + """ + print('Polling for task %s at a %f ms interval with %d threads for task execution, with worker id as %s' % (taskType, self.polling_interval * 1000, self.thread_count, self.worker_id)) + for x in range(0, int(self.thread_count)): + thread = Thread(target=self.poll_and_execute, args=(taskType, exec_function, domain,)) + thread.daemon = True + thread.start() + if wait: + while 1: + time.sleep(1) + + +def exc(taskType, inputData, startTime, retryCount, status, callbackAfterSeconds, pollCount): + print('Executing the function') + return {'status': 'COMPLETED', 'output': {}} + + +def main(): + cc = ConductorWorker('http://localhost:8080/api', 5, 0.1) + cc.start(sys.argv[1], exc, False) + cc.start(sys.argv[2], exc, True) + + +if __name__ == '__main__': + main() diff --git a/conductor/__init__.py b/conductor/__init__.py new file mode 100644 index 0000000..137d70b --- /dev/null +++ b/conductor/__init__.py @@ -0,0 +1,19 @@ +# +# Copyright 2017 Netflix, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +__version__ = '1.0.0' +VERSION = tuple(map(int, __version__.split('.'))) + +__all__ = ['conductor','ConductorWorker'] diff --git a/conductor/conductor.py b/conductor/conductor.py new file mode 100644 index 0000000..47ed3b4 --- /dev/null +++ b/conductor/conductor.py @@ -0,0 +1,381 @@ +# +# Copyright 2017 Netflix, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import print_function +import requests +import json +import sys +import socket +import warnings + + +hostname = socket.gethostname() + + +class BaseClient(object): + printUrl = False + headers = {'Content-Type': 'application/json', 'Accept': 'application/json'} + + def __init__(self, baseURL, baseResource): + self.baseURL = baseURL + self.baseResource = baseResource + + def get(self, resPath, queryParams=None): + theUrl = "{}/{}".format(self.baseURL, resPath) + resp = requests.get(theUrl, params=queryParams) + self.__checkForSuccess(resp) + if(resp.content == b''): + return None + else: + return resp.json() + + def post(self, resPath, queryParams, body, headers=None): + theUrl = "{}/{}".format(self.baseURL, resPath) + theHeader = self.headers + if headers is not None: + theHeader = self.mergeTwoDicts(self.headers, headers) + if body is not None: + jsonBody = json.dumps(body, ensure_ascii=True) + resp = requests.post(theUrl, params=queryParams, data=jsonBody, headers=theHeader) + else: + resp = requests.post(theUrl, params=queryParams, headers=theHeader) + + self.__checkForSuccess(resp) + return self.__return(resp, theHeader) + + def put(self, resPath, queryParams=None, body=None, headers=None): + theUrl = "{}/{}".format(self.baseURL, resPath) + theHeader = self.headers + if headers is not None: + theHeader = self.mergeTwoDicts(self.headers, headers) + + if body is not None: + jsonBody = json.dumps(body, ensure_ascii=False) + resp = requests.put(theUrl, params=queryParams, data=jsonBody, headers=theHeader) + else: + resp = requests.put(theUrl, params=queryParams, headers=theHeader) + + self.__print(resp) + self.__checkForSuccess(resp) + + def delete(self, resPath, queryParams): + theUrl = "{}/{}".format(self.baseURL, resPath) + resp = requests.delete(theUrl, params=queryParams) + self.__print(resp) + self.__checkForSuccess(resp) + + def makeUrl(self, urlformat=None, *argv): + url = self.baseResource + '/' + if urlformat: + url += urlformat.format(*argv) + return url + + def makeParams(self, **kwargs): + return dict((k, v) for k, v in kwargs.items() if v is not None) or None + + def mergeTwoDicts(self, x, y): + z = x.copy() + z.update(y) + return z + + def __print(self, resp): + if self.printUrl: + print(resp.url) + + def __return(self, resp, header): + retval = '' + if len(resp.text) > 0: + if header['Accept'] == 'text/plain': + retval = resp.text + elif header['Accept'] == 'application/json': + retval = resp.json() + else: + retval = resp.text + return retval + + def __checkForSuccess(self, resp): + try: + resp.raise_for_status() + except requests.HTTPError: + print("ERROR: " + resp.text) + raise + + +class MetadataClient(BaseClient): + BASE_RESOURCE = 'metadata' + + def __init__(self, baseURL): + BaseClient.__init__(self, baseURL, self.BASE_RESOURCE) + + def getWorkflowDef(self, wfname, version=None): + url = self.makeUrl('workflow/{}', wfname) + return self.get(url, self.makeParams(version=version)) + + def createWorkflowDef(self, wfdObj): + url = self.makeUrl('workflow') + return self.post(url, None, wfdObj) + + def updateWorkflowDefs(self, listOfWfdObj): + url = self.makeUrl('workflow') + self.put(url, None, listOfWfdObj) + + def getAllWorkflowDefs(self): + url = self.makeUrl('workflow') + return self.get(url) + + def unRegisterWorkflowDef(self, wfname, version): + url = self.makeUrl("workflow/{name}/{version}".format(name=wfname, version=version)) + self.delete(url, None) + + def getTaskDef(self, tdName): + url = self.makeUrl('taskdefs/{}', tdName) + return self.get(url) + + def registerTaskDefs(self, listOfTaskDefObj): + url = self.makeUrl('taskdefs') + return self.post(url, None, listOfTaskDefObj) + + def registerTaskDef(self, taskDefObj): + """registerTaskDef is deprecated since PUT /metadata/taskdefs does not + register but updates a task definition. Use updateTaskDef function + instead. + """ + warnings.warn(self.registerTaskDef.__doc__, DeprecationWarning) + url = self.makeUrl('taskdefs') + self.put(url, None, taskDefObj) + + def updateTaskDef(self, taskDefObj): + url = self.makeUrl('taskdefs') + self.put(url, None, taskDefObj) + + def unRegisterTaskDef(self, tdName, reason=None): + url = self.makeUrl('taskdefs/{}', tdName) + self.delete(url, self.makeParams(reason=reason)) + + def getAllTaskDefs(self): + url = self.makeUrl('taskdefs') + return self.get(url) + + +class TaskClient(BaseClient): + BASE_RESOURCE = 'tasks' + + def __init__(self, baseURL): + BaseClient.__init__(self, baseURL, self.BASE_RESOURCE) + + def getTask(self, taskId): + url = self.makeUrl('{}', taskId) + return self.get(url) + + def updateTask(self, taskObj): + url = self.makeUrl('') + headers = {'Accept': 'text/plain; encodingcharset=utf-8'} + self.post(url, None, taskObj, headers) + + def pollForTask(self, taskType, workerid, domain=None): + url = self.makeUrl('poll/{}', taskType) + params = {} + params['workerid'] = workerid + if domain is not None: + params['domain'] = domain + + try: + return self.get(url, params) + except Exception as err: + print('Error while polling ' + str(err)) + return None + + def pollForBatch(self, taskType, count, timeout, workerid, domain=None): + url = self.makeUrl('poll/batch/{}', taskType) + params = {} + params['workerid'] = workerid + params['count'] = count + params['timeout'] = timeout + + if domain is not None: + params['domain'] = domain + + try: + return self.get(url, params) + except Exception as err: + print('Error while polling ' + str(err)) + return None + + def ackTask(self, taskId, workerid): + url = self.makeUrl('{}/ack', taskId) + params = {} + params['workerid'] = workerid + headers = {'Accept': 'application/json'} + value = self.post(url, params, None, headers) + return value in ['true', True] + + def getTasksInQueue(self, taskName): + url = self.makeUrl('queue/{}', taskName) + return self.get(url) + + def removeTaskFromQueue(self, taskId, reason=None): + url = self.makeUrl('queue/{}', taskId) + params = {} + params['reason'] = reason + self.delete(url, params) + + def getTaskQueueSizes(self, listOfTaskName): + url = self.makeUrl('queue/sizes') + return self.post(url, None, listOfTaskName) + + +class WorkflowClient(BaseClient): + BASE_RESOURCE = 'workflow' + + def __init__(self, baseURL): + BaseClient.__init__(self, baseURL, self.BASE_RESOURCE) + + def getWorkflow(self, wfId, includeTasks=True): + url = self.makeUrl('{}', wfId) + params = {} + params['includeTasks'] = includeTasks + return self.get(url, params) + + def getRunningWorkflows(self, wfName, version=None, startTime=None, endTime=None): + url = self.makeUrl('running/{}', wfName) + params = {} + params['version'] = version + params['startTime'] = startTime + params['endTime'] = endTime + return self.get(url, params) + + def startWorkflow(self, wfName, inputjson, version=None, correlationId=None): + url = self.makeUrl('{}', wfName) + params = {} + params['version'] = version + params['correlationId'] = correlationId + headers = {'Accept': 'text/plain'} + return self.post(url, params, inputjson, headers) + + def terminateWorkflow(self, wfId, reason=None): + url = self.makeUrl('{}', wfId) + params = {} + params['reason'] = reason + self.delete(url, params) + + def removeWorkflow(self, wfId, archiveWorkflow, reason=None): + url = self.makeUrl('{}/remove', wfId) + self.delete(url, self.makeParams(archiveWorkflow=archiveWorkflow, reason=reason)) + + def pauseWorkflow(self, wfId): + url = self.makeUrl('{}/pause', wfId) + self.put(url) + + def resumeWorkflow(self, wfId): + url = self.makeUrl('{}/resume', wfId) + self.put(url) + + def skipTaskFromWorkflow(self, wfId, taskRefName, skipTaskRequest): + url = self.makeUrl('{}/skiptask/{}', wfId, taskRefName) + self.post(url, None, skipTaskRequest) + + def rerunWorkflow(self, wfId, taskRefName, rerunWorkflowRequest): + url = self.makeUrl('{}/rerun', wfId) + return self.post(url, None, rerunWorkflowRequest) + + def restartWorkflow(self, wfId, taskRefName, fromTaskRef): + url = self.makeUrl('{}/restart', wfId) + params = {} + params['from'] = fromTaskRef + self.post(url, params, None) + +class EventServicesClient(BaseClient): + BASE_RESOURCE = 'event' + + def __init__(self, baseURL): + BaseClient.__init__(self, baseURL, self.BASE_RESOURCE) + + def getEventHandlerDef(self, event, activeOnly=True): + url = self.makeUrl('{}', event) + params = {} + params['activeOnly'] = activeOnly + return self.get(url, params) + + def getEventHandlerDefs(self): + url = self.makeUrl() + return self.get(url) + + def createEventHandlerDef(self, ehObj): + url = self.makeUrl() + return self.post(url, None, ehObj) + + def updateEventHandlerDef(self, ehObj): + url = self.makeUrl() + return self.put(url, None, ehObj) + + def removeEventHandler(self, ehName): + url = self.makeUrl('{}', ehName) + self.delete(url, {}) + + def getEventHandlerQueues(self): + url = self.makeUrl('queues') + return self.get(url) + + def getEventHandlerQueuesProviders(self): + url = self.makeUrl('queues/providers') + return self.get(url) + +class WFClientMgr: + def __init__(self, server_url='http://localhost:8080/api/'): + self.workflowClient = WorkflowClient(server_url) + self.taskClient = TaskClient(server_url) + self.metadataClient = MetadataClient(server_url) + + +def main(): + if len(sys.argv) < 3: + print("Usage - python conductor server_url command parameters...") + return None + + server_url = sys.argv[1] + command = sys.argv[2] + wfcMgr = WFClientMgr(server_url) + wfc = wfcMgr.workflowClient + if command == 'start': + if len(sys.argv) < 7: + print('python conductor server_url start workflow_name input_json [version] [correlationId]') + return None + wfName = sys.argv[3] + input = json.loads(sys.argv[5]) + correlationId = sys.argv[6] + workflowId = wfc.startWorkflow(wfName, input, 1, correlationId) + print(workflowId) + return workflowId + elif command == 'get': + if len(sys.argv) < 4: + print('python conductor server_url get workflow_id') + return None + wfId = sys.argv[3] + wfjson = wfc.getWorkflow(wfId) + print(json.dumps(wfjson, indent=True, separators=(',', ': '))) + return wfjson + elif command == 'terminate': + if len(sys.argv) < 4: + print('python conductor server_url terminate workflow_id') + return None + wfId = sys.argv[3] + wfc.terminateWorkflow(wfId) + print('OK') + return wfId + + +if __name__ == '__main__': + main() + diff --git a/config.cfg b/config.cfg new file mode 100644 index 0000000..bac39ba --- /dev/null +++ b/config.cfg @@ -0,0 +1,11 @@ +[common] +server = http://localhost:8080/api + +[pyrest] +threads = 5 +pollrate = .1 + +[pyexec] +threads = 3 +pollrate = .1 + diff --git a/defaultmodule.py b/defaultmodule.py new file mode 100644 index 0000000..04283fa --- /dev/null +++ b/defaultmodule.py @@ -0,0 +1,9 @@ +def identity(data): + print("default:identity") + print(data) + return data + +def nop(data): + print("default:nop") + print(data) + return 1 diff --git a/keycloak_auth.py b/keycloak_auth.py new file mode 100644 index 0000000..33bdc8a --- /dev/null +++ b/keycloak_auth.py @@ -0,0 +1,25 @@ +from conductor.ConductorWorker import ConductorWorker +import requests +import json + +def execute_keycloak_auth(task): + print("Inside execution...") + print(json.dumps(task)) + + response = requests.post(task["inputData"]["uri"], + {"grant_type" : "client_credentials", "client_id" : task["inputData"]["client_id"], "client_secret" : task["inputData"]["client_secret"]}) + + if response.status_code == 200: + print("Executed") + return {'status': 'COMPLETED', 'output': { "token" : response.json()}, 'logs': ['one', 'two']} + else: + print(response.text) + return {'status': 'FAILED', 'output' : {}, 'logs': ['one', 'two']} + +def main(): + print('Starting Keycloak authenticator') + cc = ConductorWorker('http://localhost:8080/api', 1, 0.1) + cc.start('kc_auth', execute_keycloak_auth, True) + +if __name__ == '__main__': + main() diff --git a/pyexec.py b/pyexec.py new file mode 100644 index 0000000..471e5d2 --- /dev/null +++ b/pyexec.py @@ -0,0 +1,39 @@ +from conductor.ConductorWorker import ConductorWorker +import configparser +from pyexecplugins.pyexecplugins import * +from pyexecplugins.GatewaysToRestTask import * + +def pyexec(task): + try: + operation = task["inputData"].get("operation") or "Nop" + + p = PyExecPlugin.get(operation) + + if p != None: + pi = p(task["inputData"]) + ret = pi.execute() + else: raise Exception("Operation {} not found.".format(operation)) + + return { "status" : "COMPLETED", "output" : ret, "logs" : ["one","two"]} + + except Exception as err: + exc_type, exc_obj, exc_tb = sys.exc_info() + fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] + print(exc_type, fname, exc_tb.tb_lineno) + return { "status" : "FAILED", "output" : { "message" : "Internal error: {}".format(err)}, "logs" : ["one","two"]} + + +def main(): + + print("Started PyExec with plugins:", PyExecPlugin.getPlugins()) + + cfg = configparser.ConfigParser() + cfg.read("config.cfg") + + threads = cfg["pyexec"].getint("threads") + pollrate = cfg["pyexec"].getfloat("pollrate") + cc = ConductorWorker(cfg["common"]["server"], threads, pollrate, "pyexec") + cc.start('pyexec', pyexec, True) + +if __name__ == '__main__': + main() diff --git a/pyexecplugins/GatewaysToRestTask.py b/pyexecplugins/GatewaysToRestTask.py new file mode 100644 index 0000000..aa205f8 --- /dev/null +++ b/pyexecplugins/GatewaysToRestTask.py @@ -0,0 +1,45 @@ +from pyexecplugins.pyexecplugins import * + +class GatewayToRestTask(PyExecPlugin): + name = "GatewayToRestTask" + + def __init__(self, data=None): + super().__init__(data) + self.urls = data["gateway_list"] + + def execute(self): + output = { "tasks" : [] , "urls": {}} + i = 0 + for url in self.urls: + i += 1 + trefname = "dt{}".format(i) + output["urls"][trefname] = {} + output["tasks"].append({ + "name" : "pyrest", + "taskReferenceName" : trefname, + "type" : "SIMPLE", + "inputParameters" : { + "url" : "${workflow.input.keycloak_admin_url}", + "method" : "POST", + "expect" : [201, 409], + "headers" : { + "Authorization" : "Bearer ${authorize.output.body.access_token}", + "Content-Type" : "application/json" + }, + "body" : { + "clientId": url.split("/")[2], + "baseUrl": url, + "enabled": True, + "clientAuthenticatorType": "client-secret", + "redirectUris": [url + "/*"], + "webOrigins": ["/*"], + "standardFlowEnabled": True, + "directAccessGrantsEnabled": True, + "publicClient": True, + "protocol": "openid-connect", + "attributes": {"login_theme": "d4science" }, + "fullScopeAllowed": True + } + } + }) + return output diff --git a/pyexecplugins/pyexecplugins.py b/pyexecplugins/pyexecplugins.py new file mode 100644 index 0000000..e929d14 --- /dev/null +++ b/pyexecplugins/pyexecplugins.py @@ -0,0 +1,104 @@ +import requests +import json + +class PyExecPlugins(type): + def __init__(cls, name, bases, attrs): + if not hasattr(cls, "plugins"): + print("Initializing plugins") + cls.plugins = {} + else: + print("Appending a plugin ", cls.name, cls) + cls.plugins[cls.name] = cls + + def getPlugins(cls): + return cls.plugins + + def get(cls, name): + return cls.plugins.get(name) + +class PyExecPlugin(object, metaclass=PyExecPlugins): + def __init__(self, data=None): + self.data = data + +class Nop(PyExecPlugin): + name = "Nop" + + def __init__(self, data=None): + super().__init__(data) + + def execute(self): + return None + +class Identity(PyExecPlugin): + name = "Identity" + + def __init__(self, data=None): + super().__init__(data) + + def execute(self): + return self.data + +class Http(PyExecPlugin): + name = "Http" + + def __init__(self, data): + super().__init__(data) + self.method = data.get("method") or "get" + self.url = data.get("url") + self.headers = data.get("headers") or {} + self.contenttype = self.headers.get("Content-Type") + self.accept = self.headers.get("Accept") + self.body = data.get("body") + self.expect = data.get("expect") + self.fail = data.get("fail") + + def doRequest(self): + #print(self.method, self.url, self.contenttype, self.accept) + if self.contenttype != None and self.contenttype.find("json") != -1: + self.response = requests.request(self.method, self.url, headers=self.headers, json = self.body) + else: + self.response = requests.request(self.method, self.url, headers=self.headers, data = self.body) + return self.response + + def computeStatus(self): + if self.fail == False: + return "COMPLETED" + elif self.expect == None: + return "COMPLETED" if self.response.ok else "FAILED" + else: + if type(self.expect) == list: + return "COMPLETED" if (self.response.status_code in self.expect) else "FAILED" + else: + return "COMPLETED" if (self.response.status_code == self.expect) else "FAILED" + + def buildOutput(self, status): + hdrs = {} + for k in self.response.headers.keys(): hdrs[k] = self.response.headers[k] + + print("Response: {} {}".format(self.response.status_code, self.response.reason)) + + if hdrs.get("Content-Type") != None and hdrs["Content-Type"].find("json") != -1 or self.accept != None and self.accept.find("json") != -1: + outbody = self.response.json() + else: + outbody = self.response.text + + return { + 'status': status, + 'output': { + "body" : outbody, + "headers" : hdrs, + "status" : self.response.status_code, + "reason" : self.response.reason}, + 'logs': ['one', 'two']} + + def execute(self): + try: + self.doRequest() + status = self.computeStatus() + return self.buildOutput(status) + + except Exception as err: + return { + "status" : "FAILED", + "output" : { "message" : "Internal error: {}".format(err)}, "logs" : ["one","two"] + } diff --git a/pyrest.py b/pyrest.py new file mode 100644 index 0000000..35b754f --- /dev/null +++ b/pyrest.py @@ -0,0 +1,20 @@ +from conductor.ConductorWorker import ConductorWorker +from pyexecplugins.pyexecplugins import Http +import configparser +import sys + +def pyrest(task): + http = Http(task["inputData"]) + return http.execute() + +def main(): + cfg = configparser.ConfigParser() + cfg.read("config.cfg") + + threads = cfg["pyrest"].getint("threads") + pollrate = cfg["pyrest"].getfloat("pollrate") + cc = ConductorWorker(cfg["common"]["server"], threads, pollrate, "pyrest") + cc.start('pyrest', pyrest, True) + +if __name__ == '__main__': + main() diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..b88034e --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[metadata] +description-file = README.md diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..f10a2fd --- /dev/null +++ b/setup.py @@ -0,0 +1,45 @@ +# +# Copyright 2020 Nubisware S.r.l +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from setuptools import setup + +setup( + name = 'nwconductorworkers', + packages = ['nwconductorworkers'], # this must be the same as the name above + version = '1.0.0', + description = 'Python based Conductor Workers', + author = 'Marco Lettere', + author_email = 'marco.lettere@nubisware.com', + url = 'https://github.com/nubisware/nvconductorworkers', + download_url = 'https://github.com/nubisware/nvconductorworkers/releases', + keywords = ['conductor', 'python'], + license = 'Apache 2.0', + install_requires = [ + 'requests', + ], + classifiers = [ + 'Development Status :: 4 - Beta', + 'Intended Audience :: Developers', + 'Intended Audience :: System Administrators', + 'License :: OSI Approved :: Apache Software License', + 'Programming Language :: Python :: 3', + 'Topic :: Workflow', + 'Topic :: Microservices', + 'Topic :: Orchestration', + 'Topic :: Internet', + 'Topic :: Software Development :: Libraries :: Python Modules', + 'Topic :: System :: Networking' + ], +)