First share
This commit is contained in:
commit
028ed9e934
|
@ -0,0 +1,3 @@
|
|||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
|
@ -0,0 +1,10 @@
|
|||
# Python workers for Conductor
|
||||
Python worker for Conductor provides :
|
||||
|
||||
1. PYREST Comprehensive REST client
|
||||
2. PYEXEC Execute code from available modules
|
||||
|
||||
## Install
|
||||
|
||||
### Example
|
||||
|
|
@ -0,0 +1,143 @@
|
|||
#
|
||||
# Copyright 2017 Netflix, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from __future__ import print_function, absolute_import
|
||||
import sys
|
||||
import time
|
||||
from conductor.conductor import WFClientMgr
|
||||
from threading import Thread
|
||||
import socket
|
||||
|
||||
hostname = socket.gethostname()
|
||||
|
||||
|
||||
class ConductorWorker:
|
||||
"""
|
||||
Main class for implementing Conductor Workers
|
||||
|
||||
A conductor worker is a separate system that executes the various
|
||||
tasks that the conductor server queues up for execution. The worker
|
||||
can run on the same instance as the server or on a remote instance.
|
||||
|
||||
The worker generally provides a wrapper around some function that
|
||||
performs the actual execution of the task. The function that is
|
||||
being executed must return a `dict` with the `status`, `output` and
|
||||
`log` keys. If these keys are not present, the worker will raise an
|
||||
Exception after completion of the task.
|
||||
|
||||
The start method is used to begin continous polling and execution
|
||||
of the tasks that the conductor server makes available. The same
|
||||
script can run multiple workers using the wait argument. For more
|
||||
details, view the start method
|
||||
"""
|
||||
def __init__(self, server_url, thread_count, polling_interval, worker_id=None):
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
server_url: str
|
||||
The url to the server hosting the conductor api.
|
||||
Ex: 'http://localhost:8080/api'
|
||||
thread_count: int
|
||||
The number of threads that will be polling for and
|
||||
executing tasks in case of using the start method.
|
||||
polling_interval: float
|
||||
The number of seconds that each worker thread will wait
|
||||
between polls to the conductor server.
|
||||
worker_id: str, optional
|
||||
The worker_id of the worker that is going to execute the
|
||||
task. For further details, refer to the documentation
|
||||
By default, it is set to hostname of the machine
|
||||
"""
|
||||
wfcMgr = WFClientMgr(server_url)
|
||||
self.workflowClient = wfcMgr.workflowClient
|
||||
self.taskClient = wfcMgr.taskClient
|
||||
self.thread_count = thread_count
|
||||
self.polling_interval = polling_interval
|
||||
self.worker_id = worker_id or hostname
|
||||
|
||||
def execute(self, task, exec_function):
|
||||
try:
|
||||
#print("Exec function is", exec_function)
|
||||
resp = exec_function(task)
|
||||
if type(resp) is not dict or not all(key in resp for key in ('status', 'output', 'logs')):
|
||||
raise Exception('Task execution function MUST return a response as a dict with status, output and logs fields')
|
||||
task['status'] = resp['status']
|
||||
task['outputData'] = resp['output']
|
||||
task['logs'] = resp['logs']
|
||||
if 'reasonForIncompletion' in resp:
|
||||
task['reasonForIncompletion'] = resp['reasonForIncompletion']
|
||||
self.taskClient.updateTask(task)
|
||||
except Exception as err:
|
||||
print('Error executing task: ' + str(err))
|
||||
task['status'] = 'FAILED'
|
||||
self.taskClient.updateTask(task)
|
||||
|
||||
def poll_and_execute(self, taskType, exec_function, domain=None):
|
||||
while True:
|
||||
time.sleep(float(self.polling_interval))
|
||||
polled = self.taskClient.pollForTask(taskType, self.worker_id, domain)
|
||||
if polled is not None:
|
||||
self.taskClient.ackTask(polled['taskId'], self.worker_id)
|
||||
self.execute(polled, exec_function)
|
||||
|
||||
def start(self, taskType, exec_function, wait, domain=None):
|
||||
"""
|
||||
start begins the continuous polling of the conductor server
|
||||
|
||||
Parameters
|
||||
----------
|
||||
taskType: str
|
||||
The name of the task that the worker is looking to execute
|
||||
exec_function: function
|
||||
The function that the worker will execute. The function
|
||||
must return a dict with the `status`, `output` and `logs`
|
||||
keys present. If this is not present, an Exception will be
|
||||
raised
|
||||
wait: bool
|
||||
Whether the worker will block execution of further code.
|
||||
Since the workers are being run in daemon threads, when the
|
||||
program completes execution, all the threads are destroyed.
|
||||
Setting wait to True prevents the program from ending.
|
||||
If multiple workers are being called from the same program,
|
||||
all but the last start call but have wait set to False.
|
||||
The last start call must always set wait to True. If a
|
||||
single worker is being called, set wait to True.
|
||||
domain: str, optional
|
||||
The domain of the task under which the worker will run. For
|
||||
further details refer to the conductor server documentation
|
||||
By default, it is set to None
|
||||
"""
|
||||
print('Polling for task %s at a %f ms interval with %d threads for task execution, with worker id as %s' % (taskType, self.polling_interval * 1000, self.thread_count, self.worker_id))
|
||||
for x in range(0, int(self.thread_count)):
|
||||
thread = Thread(target=self.poll_and_execute, args=(taskType, exec_function, domain,))
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
if wait:
|
||||
while 1:
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def exc(taskType, inputData, startTime, retryCount, status, callbackAfterSeconds, pollCount):
|
||||
print('Executing the function')
|
||||
return {'status': 'COMPLETED', 'output': {}}
|
||||
|
||||
|
||||
def main():
|
||||
cc = ConductorWorker('http://localhost:8080/api', 5, 0.1)
|
||||
cc.start(sys.argv[1], exc, False)
|
||||
cc.start(sys.argv[2], exc, True)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -0,0 +1,19 @@
|
|||
#
|
||||
# Copyright 2017 Netflix, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
__version__ = '1.0.0'
|
||||
VERSION = tuple(map(int, __version__.split('.')))
|
||||
|
||||
__all__ = ['conductor','ConductorWorker']
|
|
@ -0,0 +1,381 @@
|
|||
#
|
||||
# Copyright 2017 Netflix, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
from __future__ import print_function
|
||||
import requests
|
||||
import json
|
||||
import sys
|
||||
import socket
|
||||
import warnings
|
||||
|
||||
|
||||
hostname = socket.gethostname()
|
||||
|
||||
|
||||
class BaseClient(object):
|
||||
printUrl = False
|
||||
headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}
|
||||
|
||||
def __init__(self, baseURL, baseResource):
|
||||
self.baseURL = baseURL
|
||||
self.baseResource = baseResource
|
||||
|
||||
def get(self, resPath, queryParams=None):
|
||||
theUrl = "{}/{}".format(self.baseURL, resPath)
|
||||
resp = requests.get(theUrl, params=queryParams)
|
||||
self.__checkForSuccess(resp)
|
||||
if(resp.content == b''):
|
||||
return None
|
||||
else:
|
||||
return resp.json()
|
||||
|
||||
def post(self, resPath, queryParams, body, headers=None):
|
||||
theUrl = "{}/{}".format(self.baseURL, resPath)
|
||||
theHeader = self.headers
|
||||
if headers is not None:
|
||||
theHeader = self.mergeTwoDicts(self.headers, headers)
|
||||
if body is not None:
|
||||
jsonBody = json.dumps(body, ensure_ascii=True)
|
||||
resp = requests.post(theUrl, params=queryParams, data=jsonBody, headers=theHeader)
|
||||
else:
|
||||
resp = requests.post(theUrl, params=queryParams, headers=theHeader)
|
||||
|
||||
self.__checkForSuccess(resp)
|
||||
return self.__return(resp, theHeader)
|
||||
|
||||
def put(self, resPath, queryParams=None, body=None, headers=None):
|
||||
theUrl = "{}/{}".format(self.baseURL, resPath)
|
||||
theHeader = self.headers
|
||||
if headers is not None:
|
||||
theHeader = self.mergeTwoDicts(self.headers, headers)
|
||||
|
||||
if body is not None:
|
||||
jsonBody = json.dumps(body, ensure_ascii=False)
|
||||
resp = requests.put(theUrl, params=queryParams, data=jsonBody, headers=theHeader)
|
||||
else:
|
||||
resp = requests.put(theUrl, params=queryParams, headers=theHeader)
|
||||
|
||||
self.__print(resp)
|
||||
self.__checkForSuccess(resp)
|
||||
|
||||
def delete(self, resPath, queryParams):
|
||||
theUrl = "{}/{}".format(self.baseURL, resPath)
|
||||
resp = requests.delete(theUrl, params=queryParams)
|
||||
self.__print(resp)
|
||||
self.__checkForSuccess(resp)
|
||||
|
||||
def makeUrl(self, urlformat=None, *argv):
|
||||
url = self.baseResource + '/'
|
||||
if urlformat:
|
||||
url += urlformat.format(*argv)
|
||||
return url
|
||||
|
||||
def makeParams(self, **kwargs):
|
||||
return dict((k, v) for k, v in kwargs.items() if v is not None) or None
|
||||
|
||||
def mergeTwoDicts(self, x, y):
|
||||
z = x.copy()
|
||||
z.update(y)
|
||||
return z
|
||||
|
||||
def __print(self, resp):
|
||||
if self.printUrl:
|
||||
print(resp.url)
|
||||
|
||||
def __return(self, resp, header):
|
||||
retval = ''
|
||||
if len(resp.text) > 0:
|
||||
if header['Accept'] == 'text/plain':
|
||||
retval = resp.text
|
||||
elif header['Accept'] == 'application/json':
|
||||
retval = resp.json()
|
||||
else:
|
||||
retval = resp.text
|
||||
return retval
|
||||
|
||||
def __checkForSuccess(self, resp):
|
||||
try:
|
||||
resp.raise_for_status()
|
||||
except requests.HTTPError:
|
||||
print("ERROR: " + resp.text)
|
||||
raise
|
||||
|
||||
|
||||
class MetadataClient(BaseClient):
|
||||
BASE_RESOURCE = 'metadata'
|
||||
|
||||
def __init__(self, baseURL):
|
||||
BaseClient.__init__(self, baseURL, self.BASE_RESOURCE)
|
||||
|
||||
def getWorkflowDef(self, wfname, version=None):
|
||||
url = self.makeUrl('workflow/{}', wfname)
|
||||
return self.get(url, self.makeParams(version=version))
|
||||
|
||||
def createWorkflowDef(self, wfdObj):
|
||||
url = self.makeUrl('workflow')
|
||||
return self.post(url, None, wfdObj)
|
||||
|
||||
def updateWorkflowDefs(self, listOfWfdObj):
|
||||
url = self.makeUrl('workflow')
|
||||
self.put(url, None, listOfWfdObj)
|
||||
|
||||
def getAllWorkflowDefs(self):
|
||||
url = self.makeUrl('workflow')
|
||||
return self.get(url)
|
||||
|
||||
def unRegisterWorkflowDef(self, wfname, version):
|
||||
url = self.makeUrl("workflow/{name}/{version}".format(name=wfname, version=version))
|
||||
self.delete(url, None)
|
||||
|
||||
def getTaskDef(self, tdName):
|
||||
url = self.makeUrl('taskdefs/{}', tdName)
|
||||
return self.get(url)
|
||||
|
||||
def registerTaskDefs(self, listOfTaskDefObj):
|
||||
url = self.makeUrl('taskdefs')
|
||||
return self.post(url, None, listOfTaskDefObj)
|
||||
|
||||
def registerTaskDef(self, taskDefObj):
|
||||
"""registerTaskDef is deprecated since PUT /metadata/taskdefs does not
|
||||
register but updates a task definition. Use updateTaskDef function
|
||||
instead.
|
||||
"""
|
||||
warnings.warn(self.registerTaskDef.__doc__, DeprecationWarning)
|
||||
url = self.makeUrl('taskdefs')
|
||||
self.put(url, None, taskDefObj)
|
||||
|
||||
def updateTaskDef(self, taskDefObj):
|
||||
url = self.makeUrl('taskdefs')
|
||||
self.put(url, None, taskDefObj)
|
||||
|
||||
def unRegisterTaskDef(self, tdName, reason=None):
|
||||
url = self.makeUrl('taskdefs/{}', tdName)
|
||||
self.delete(url, self.makeParams(reason=reason))
|
||||
|
||||
def getAllTaskDefs(self):
|
||||
url = self.makeUrl('taskdefs')
|
||||
return self.get(url)
|
||||
|
||||
|
||||
class TaskClient(BaseClient):
|
||||
BASE_RESOURCE = 'tasks'
|
||||
|
||||
def __init__(self, baseURL):
|
||||
BaseClient.__init__(self, baseURL, self.BASE_RESOURCE)
|
||||
|
||||
def getTask(self, taskId):
|
||||
url = self.makeUrl('{}', taskId)
|
||||
return self.get(url)
|
||||
|
||||
def updateTask(self, taskObj):
|
||||
url = self.makeUrl('')
|
||||
headers = {'Accept': 'text/plain; encodingcharset=utf-8'}
|
||||
self.post(url, None, taskObj, headers)
|
||||
|
||||
def pollForTask(self, taskType, workerid, domain=None):
|
||||
url = self.makeUrl('poll/{}', taskType)
|
||||
params = {}
|
||||
params['workerid'] = workerid
|
||||
if domain is not None:
|
||||
params['domain'] = domain
|
||||
|
||||
try:
|
||||
return self.get(url, params)
|
||||
except Exception as err:
|
||||
print('Error while polling ' + str(err))
|
||||
return None
|
||||
|
||||
def pollForBatch(self, taskType, count, timeout, workerid, domain=None):
|
||||
url = self.makeUrl('poll/batch/{}', taskType)
|
||||
params = {}
|
||||
params['workerid'] = workerid
|
||||
params['count'] = count
|
||||
params['timeout'] = timeout
|
||||
|
||||
if domain is not None:
|
||||
params['domain'] = domain
|
||||
|
||||
try:
|
||||
return self.get(url, params)
|
||||
except Exception as err:
|
||||
print('Error while polling ' + str(err))
|
||||
return None
|
||||
|
||||
def ackTask(self, taskId, workerid):
|
||||
url = self.makeUrl('{}/ack', taskId)
|
||||
params = {}
|
||||
params['workerid'] = workerid
|
||||
headers = {'Accept': 'application/json'}
|
||||
value = self.post(url, params, None, headers)
|
||||
return value in ['true', True]
|
||||
|
||||
def getTasksInQueue(self, taskName):
|
||||
url = self.makeUrl('queue/{}', taskName)
|
||||
return self.get(url)
|
||||
|
||||
def removeTaskFromQueue(self, taskId, reason=None):
|
||||
url = self.makeUrl('queue/{}', taskId)
|
||||
params = {}
|
||||
params['reason'] = reason
|
||||
self.delete(url, params)
|
||||
|
||||
def getTaskQueueSizes(self, listOfTaskName):
|
||||
url = self.makeUrl('queue/sizes')
|
||||
return self.post(url, None, listOfTaskName)
|
||||
|
||||
|
||||
class WorkflowClient(BaseClient):
|
||||
BASE_RESOURCE = 'workflow'
|
||||
|
||||
def __init__(self, baseURL):
|
||||
BaseClient.__init__(self, baseURL, self.BASE_RESOURCE)
|
||||
|
||||
def getWorkflow(self, wfId, includeTasks=True):
|
||||
url = self.makeUrl('{}', wfId)
|
||||
params = {}
|
||||
params['includeTasks'] = includeTasks
|
||||
return self.get(url, params)
|
||||
|
||||
def getRunningWorkflows(self, wfName, version=None, startTime=None, endTime=None):
|
||||
url = self.makeUrl('running/{}', wfName)
|
||||
params = {}
|
||||
params['version'] = version
|
||||
params['startTime'] = startTime
|
||||
params['endTime'] = endTime
|
||||
return self.get(url, params)
|
||||
|
||||
def startWorkflow(self, wfName, inputjson, version=None, correlationId=None):
|
||||
url = self.makeUrl('{}', wfName)
|
||||
params = {}
|
||||
params['version'] = version
|
||||
params['correlationId'] = correlationId
|
||||
headers = {'Accept': 'text/plain'}
|
||||
return self.post(url, params, inputjson, headers)
|
||||
|
||||
def terminateWorkflow(self, wfId, reason=None):
|
||||
url = self.makeUrl('{}', wfId)
|
||||
params = {}
|
||||
params['reason'] = reason
|
||||
self.delete(url, params)
|
||||
|
||||
def removeWorkflow(self, wfId, archiveWorkflow, reason=None):
|
||||
url = self.makeUrl('{}/remove', wfId)
|
||||
self.delete(url, self.makeParams(archiveWorkflow=archiveWorkflow, reason=reason))
|
||||
|
||||
def pauseWorkflow(self, wfId):
|
||||
url = self.makeUrl('{}/pause', wfId)
|
||||
self.put(url)
|
||||
|
||||
def resumeWorkflow(self, wfId):
|
||||
url = self.makeUrl('{}/resume', wfId)
|
||||
self.put(url)
|
||||
|
||||
def skipTaskFromWorkflow(self, wfId, taskRefName, skipTaskRequest):
|
||||
url = self.makeUrl('{}/skiptask/{}', wfId, taskRefName)
|
||||
self.post(url, None, skipTaskRequest)
|
||||
|
||||
def rerunWorkflow(self, wfId, taskRefName, rerunWorkflowRequest):
|
||||
url = self.makeUrl('{}/rerun', wfId)
|
||||
return self.post(url, None, rerunWorkflowRequest)
|
||||
|
||||
def restartWorkflow(self, wfId, taskRefName, fromTaskRef):
|
||||
url = self.makeUrl('{}/restart', wfId)
|
||||
params = {}
|
||||
params['from'] = fromTaskRef
|
||||
self.post(url, params, None)
|
||||
|
||||
class EventServicesClient(BaseClient):
|
||||
BASE_RESOURCE = 'event'
|
||||
|
||||
def __init__(self, baseURL):
|
||||
BaseClient.__init__(self, baseURL, self.BASE_RESOURCE)
|
||||
|
||||
def getEventHandlerDef(self, event, activeOnly=True):
|
||||
url = self.makeUrl('{}', event)
|
||||
params = {}
|
||||
params['activeOnly'] = activeOnly
|
||||
return self.get(url, params)
|
||||
|
||||
def getEventHandlerDefs(self):
|
||||
url = self.makeUrl()
|
||||
return self.get(url)
|
||||
|
||||
def createEventHandlerDef(self, ehObj):
|
||||
url = self.makeUrl()
|
||||
return self.post(url, None, ehObj)
|
||||
|
||||
def updateEventHandlerDef(self, ehObj):
|
||||
url = self.makeUrl()
|
||||
return self.put(url, None, ehObj)
|
||||
|
||||
def removeEventHandler(self, ehName):
|
||||
url = self.makeUrl('{}', ehName)
|
||||
self.delete(url, {})
|
||||
|
||||
def getEventHandlerQueues(self):
|
||||
url = self.makeUrl('queues')
|
||||
return self.get(url)
|
||||
|
||||
def getEventHandlerQueuesProviders(self):
|
||||
url = self.makeUrl('queues/providers')
|
||||
return self.get(url)
|
||||
|
||||
class WFClientMgr:
|
||||
def __init__(self, server_url='http://localhost:8080/api/'):
|
||||
self.workflowClient = WorkflowClient(server_url)
|
||||
self.taskClient = TaskClient(server_url)
|
||||
self.metadataClient = MetadataClient(server_url)
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage - python conductor server_url command parameters...")
|
||||
return None
|
||||
|
||||
server_url = sys.argv[1]
|
||||
command = sys.argv[2]
|
||||
wfcMgr = WFClientMgr(server_url)
|
||||
wfc = wfcMgr.workflowClient
|
||||
if command == 'start':
|
||||
if len(sys.argv) < 7:
|
||||
print('python conductor server_url start workflow_name input_json [version] [correlationId]')
|
||||
return None
|
||||
wfName = sys.argv[3]
|
||||
input = json.loads(sys.argv[5])
|
||||
correlationId = sys.argv[6]
|
||||
workflowId = wfc.startWorkflow(wfName, input, 1, correlationId)
|
||||
print(workflowId)
|
||||
return workflowId
|
||||
elif command == 'get':
|
||||
if len(sys.argv) < 4:
|
||||
print('python conductor server_url get workflow_id')
|
||||
return None
|
||||
wfId = sys.argv[3]
|
||||
wfjson = wfc.getWorkflow(wfId)
|
||||
print(json.dumps(wfjson, indent=True, separators=(',', ': ')))
|
||||
return wfjson
|
||||
elif command == 'terminate':
|
||||
if len(sys.argv) < 4:
|
||||
print('python conductor server_url terminate workflow_id')
|
||||
return None
|
||||
wfId = sys.argv[3]
|
||||
wfc.terminateWorkflow(wfId)
|
||||
print('OK')
|
||||
return wfId
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
[common]
|
||||
server = http://localhost:8080/api
|
||||
|
||||
[pyrest]
|
||||
threads = 5
|
||||
pollrate = .1
|
||||
|
||||
[pyexec]
|
||||
threads = 3
|
||||
pollrate = .1
|
||||
|
|
@ -0,0 +1,9 @@
|
|||
def identity(data):
|
||||
print("default:identity")
|
||||
print(data)
|
||||
return data
|
||||
|
||||
def nop(data):
|
||||
print("default:nop")
|
||||
print(data)
|
||||
return 1
|
|
@ -0,0 +1,25 @@
|
|||
from conductor.ConductorWorker import ConductorWorker
|
||||
import requests
|
||||
import json
|
||||
|
||||
def execute_keycloak_auth(task):
|
||||
print("Inside execution...")
|
||||
print(json.dumps(task))
|
||||
|
||||
response = requests.post(task["inputData"]["uri"],
|
||||
{"grant_type" : "client_credentials", "client_id" : task["inputData"]["client_id"], "client_secret" : task["inputData"]["client_secret"]})
|
||||
|
||||
if response.status_code == 200:
|
||||
print("Executed")
|
||||
return {'status': 'COMPLETED', 'output': { "token" : response.json()}, 'logs': ['one', 'two']}
|
||||
else:
|
||||
print(response.text)
|
||||
return {'status': 'FAILED', 'output' : {}, 'logs': ['one', 'two']}
|
||||
|
||||
def main():
|
||||
print('Starting Keycloak authenticator')
|
||||
cc = ConductorWorker('http://localhost:8080/api', 1, 0.1)
|
||||
cc.start('kc_auth', execute_keycloak_auth, True)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -0,0 +1,39 @@
|
|||
from conductor.ConductorWorker import ConductorWorker
|
||||
import configparser
|
||||
from pyexecplugins.pyexecplugins import *
|
||||
from pyexecplugins.GatewaysToRestTask import *
|
||||
|
||||
def pyexec(task):
|
||||
try:
|
||||
operation = task["inputData"].get("operation") or "Nop"
|
||||
|
||||
p = PyExecPlugin.get(operation)
|
||||
|
||||
if p != None:
|
||||
pi = p(task["inputData"])
|
||||
ret = pi.execute()
|
||||
else: raise Exception("Operation {} not found.".format(operation))
|
||||
|
||||
return { "status" : "COMPLETED", "output" : ret, "logs" : ["one","two"]}
|
||||
|
||||
except Exception as err:
|
||||
exc_type, exc_obj, exc_tb = sys.exc_info()
|
||||
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
|
||||
print(exc_type, fname, exc_tb.tb_lineno)
|
||||
return { "status" : "FAILED", "output" : { "message" : "Internal error: {}".format(err)}, "logs" : ["one","two"]}
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
print("Started PyExec with plugins:", PyExecPlugin.getPlugins())
|
||||
|
||||
cfg = configparser.ConfigParser()
|
||||
cfg.read("config.cfg")
|
||||
|
||||
threads = cfg["pyexec"].getint("threads")
|
||||
pollrate = cfg["pyexec"].getfloat("pollrate")
|
||||
cc = ConductorWorker(cfg["common"]["server"], threads, pollrate, "pyexec")
|
||||
cc.start('pyexec', pyexec, True)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -0,0 +1,45 @@
|
|||
from pyexecplugins.pyexecplugins import *
|
||||
|
||||
class GatewayToRestTask(PyExecPlugin):
|
||||
name = "GatewayToRestTask"
|
||||
|
||||
def __init__(self, data=None):
|
||||
super().__init__(data)
|
||||
self.urls = data["gateway_list"]
|
||||
|
||||
def execute(self):
|
||||
output = { "tasks" : [] , "urls": {}}
|
||||
i = 0
|
||||
for url in self.urls:
|
||||
i += 1
|
||||
trefname = "dt{}".format(i)
|
||||
output["urls"][trefname] = {}
|
||||
output["tasks"].append({
|
||||
"name" : "pyrest",
|
||||
"taskReferenceName" : trefname,
|
||||
"type" : "SIMPLE",
|
||||
"inputParameters" : {
|
||||
"url" : "${workflow.input.keycloak_admin_url}",
|
||||
"method" : "POST",
|
||||
"expect" : [201, 409],
|
||||
"headers" : {
|
||||
"Authorization" : "Bearer ${authorize.output.body.access_token}",
|
||||
"Content-Type" : "application/json"
|
||||
},
|
||||
"body" : {
|
||||
"clientId": url.split("/")[2],
|
||||
"baseUrl": url,
|
||||
"enabled": True,
|
||||
"clientAuthenticatorType": "client-secret",
|
||||
"redirectUris": [url + "/*"],
|
||||
"webOrigins": ["/*"],
|
||||
"standardFlowEnabled": True,
|
||||
"directAccessGrantsEnabled": True,
|
||||
"publicClient": True,
|
||||
"protocol": "openid-connect",
|
||||
"attributes": {"login_theme": "d4science" },
|
||||
"fullScopeAllowed": True
|
||||
}
|
||||
}
|
||||
})
|
||||
return output
|
|
@ -0,0 +1,104 @@
|
|||
import requests
|
||||
import json
|
||||
|
||||
class PyExecPlugins(type):
|
||||
def __init__(cls, name, bases, attrs):
|
||||
if not hasattr(cls, "plugins"):
|
||||
print("Initializing plugins")
|
||||
cls.plugins = {}
|
||||
else:
|
||||
print("Appending a plugin ", cls.name, cls)
|
||||
cls.plugins[cls.name] = cls
|
||||
|
||||
def getPlugins(cls):
|
||||
return cls.plugins
|
||||
|
||||
def get(cls, name):
|
||||
return cls.plugins.get(name)
|
||||
|
||||
class PyExecPlugin(object, metaclass=PyExecPlugins):
|
||||
def __init__(self, data=None):
|
||||
self.data = data
|
||||
|
||||
class Nop(PyExecPlugin):
|
||||
name = "Nop"
|
||||
|
||||
def __init__(self, data=None):
|
||||
super().__init__(data)
|
||||
|
||||
def execute(self):
|
||||
return None
|
||||
|
||||
class Identity(PyExecPlugin):
|
||||
name = "Identity"
|
||||
|
||||
def __init__(self, data=None):
|
||||
super().__init__(data)
|
||||
|
||||
def execute(self):
|
||||
return self.data
|
||||
|
||||
class Http(PyExecPlugin):
|
||||
name = "Http"
|
||||
|
||||
def __init__(self, data):
|
||||
super().__init__(data)
|
||||
self.method = data.get("method") or "get"
|
||||
self.url = data.get("url")
|
||||
self.headers = data.get("headers") or {}
|
||||
self.contenttype = self.headers.get("Content-Type")
|
||||
self.accept = self.headers.get("Accept")
|
||||
self.body = data.get("body")
|
||||
self.expect = data.get("expect")
|
||||
self.fail = data.get("fail")
|
||||
|
||||
def doRequest(self):
|
||||
#print(self.method, self.url, self.contenttype, self.accept)
|
||||
if self.contenttype != None and self.contenttype.find("json") != -1:
|
||||
self.response = requests.request(self.method, self.url, headers=self.headers, json = self.body)
|
||||
else:
|
||||
self.response = requests.request(self.method, self.url, headers=self.headers, data = self.body)
|
||||
return self.response
|
||||
|
||||
def computeStatus(self):
|
||||
if self.fail == False:
|
||||
return "COMPLETED"
|
||||
elif self.expect == None:
|
||||
return "COMPLETED" if self.response.ok else "FAILED"
|
||||
else:
|
||||
if type(self.expect) == list:
|
||||
return "COMPLETED" if (self.response.status_code in self.expect) else "FAILED"
|
||||
else:
|
||||
return "COMPLETED" if (self.response.status_code == self.expect) else "FAILED"
|
||||
|
||||
def buildOutput(self, status):
|
||||
hdrs = {}
|
||||
for k in self.response.headers.keys(): hdrs[k] = self.response.headers[k]
|
||||
|
||||
print("Response: {} {}".format(self.response.status_code, self.response.reason))
|
||||
|
||||
if hdrs.get("Content-Type") != None and hdrs["Content-Type"].find("json") != -1 or self.accept != None and self.accept.find("json") != -1:
|
||||
outbody = self.response.json()
|
||||
else:
|
||||
outbody = self.response.text
|
||||
|
||||
return {
|
||||
'status': status,
|
||||
'output': {
|
||||
"body" : outbody,
|
||||
"headers" : hdrs,
|
||||
"status" : self.response.status_code,
|
||||
"reason" : self.response.reason},
|
||||
'logs': ['one', 'two']}
|
||||
|
||||
def execute(self):
|
||||
try:
|
||||
self.doRequest()
|
||||
status = self.computeStatus()
|
||||
return self.buildOutput(status)
|
||||
|
||||
except Exception as err:
|
||||
return {
|
||||
"status" : "FAILED",
|
||||
"output" : { "message" : "Internal error: {}".format(err)}, "logs" : ["one","two"]
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
from conductor.ConductorWorker import ConductorWorker
|
||||
from pyexecplugins.pyexecplugins import Http
|
||||
import configparser
|
||||
import sys
|
||||
|
||||
def pyrest(task):
|
||||
http = Http(task["inputData"])
|
||||
return http.execute()
|
||||
|
||||
def main():
|
||||
cfg = configparser.ConfigParser()
|
||||
cfg.read("config.cfg")
|
||||
|
||||
threads = cfg["pyrest"].getint("threads")
|
||||
pollrate = cfg["pyrest"].getfloat("pollrate")
|
||||
cc = ConductorWorker(cfg["common"]["server"], threads, pollrate, "pyrest")
|
||||
cc.start('pyrest', pyrest, True)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -0,0 +1,45 @@
|
|||
#
|
||||
# Copyright 2020 Nubisware S.r.l
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name = 'nwconductorworkers',
|
||||
packages = ['nwconductorworkers'], # this must be the same as the name above
|
||||
version = '1.0.0',
|
||||
description = 'Python based Conductor Workers',
|
||||
author = 'Marco Lettere',
|
||||
author_email = 'marco.lettere@nubisware.com',
|
||||
url = 'https://github.com/nubisware/nvconductorworkers',
|
||||
download_url = 'https://github.com/nubisware/nvconductorworkers/releases',
|
||||
keywords = ['conductor', 'python'],
|
||||
license = 'Apache 2.0',
|
||||
install_requires = [
|
||||
'requests',
|
||||
],
|
||||
classifiers = [
|
||||
'Development Status :: 4 - Beta',
|
||||
'Intended Audience :: Developers',
|
||||
'Intended Audience :: System Administrators',
|
||||
'License :: OSI Approved :: Apache Software License',
|
||||
'Programming Language :: Python :: 3',
|
||||
'Topic :: Workflow',
|
||||
'Topic :: Microservices',
|
||||
'Topic :: Orchestration',
|
||||
'Topic :: Internet',
|
||||
'Topic :: Software Development :: Libraries :: Python Modules',
|
||||
'Topic :: System :: Networking'
|
||||
],
|
||||
)
|
Loading…
Reference in New Issue