conductor-worker-python/conductor/conductor.py

392 lines
13 KiB
Python

#
# Copyright 2017 Netflix, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function
import requests
import json
import logging
import sys
import socket
import warnings
hostname = socket.gethostname()
class BaseClient(object):
printUrl = False
headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}
def __init__(self, baseURL, baseResource, headers=None):
self.baseURL = baseURL.rstrip("/")
self.headers = self.mergeTwoDicts(self.headers, headers) if headers != None else self.headers
self.baseResource = baseResource
def get(self, resPath, queryParams=None, timeout=(5, 10)):
theUrl = "{}/{}".format(self.baseURL, resPath.strip("/"))
resp = requests.get(theUrl, params=queryParams, headers=self.headers, timeout=timeout)
self.__checkForSuccess(resp)
if(resp.content == b''):
return None
else:
return resp.json()
def post(self, resPath, queryParams, body, headers=None, timeout=(5, 10)):
theUrl = "{}/{}".format(self.baseURL, resPath.strip("/"))
theHeader = self.headers
if headers is not None:
theHeader = self.mergeTwoDicts(self.headers, headers)
if body is not None:
jsonBody = json.dumps(body, ensure_ascii=True)
print(theUrl, queryParams, theHeader, jsonBody)
resp = requests.post(theUrl, params=queryParams, data=jsonBody, headers=theHeader, timeout=timeout)
else:
resp = requests.post(theUrl, params=queryParams, headers=theHeaders, timeout=timeout)
self.__checkForSuccess(resp)
return self.__return(resp, theHeader)
def put(self, resPath, queryParams=None, body=None, headers=None, timeout=(5, 10)):
theUrl = "{}/{}".format(self.baseURL, resPath.strip("/"))
theHeader = self.headers
if headers is not None:
theHeader = self.mergeTwoDicts(self.headers, headers)
if body is not None:
jsonBody = json.dumps(body, ensure_ascii=True)
resp = requests.put(theUrl, params=queryParams, data=jsonBody, headers=theHeaders, timeout=timeout)
else:
resp = requests.put(theUrl, params=queryParams, headers=theHeaders, timeout=timeout)
self.__print(resp)
self.__checkForSuccess(resp)
def delete(self, resPath, queryParams, timeout=(5, 10)):
theUrl = "{}/{}".format(self.baseURL, resPath.strip("/"))
resp = requests.delete(theUrl, params=queryParams, headers=self.headers, timeout=timeout)
self.__print(resp)
self.__checkForSuccess(resp)
def makeUrl(self, urlformat=None, *argv):
url = self.baseResource + '/'
if urlformat:
url += urlformat.format(*argv)
return url
def makeParams(self, **kwargs):
return dict((k, v) for k, v in kwargs.items() if v is not None) or None
def mergeTwoDicts(self, x, y):
z = x.copy()
z.update(y)
return z
def __print(self, resp):
if self.printUrl:
print(resp.url)
def __return(self, resp, header):
retval = ''
if len(resp.text) > 0:
if header['Accept'] == 'text/plain':
retval = resp.text
elif header['Accept'] == 'application/json':
retval = resp.json()
else:
retval = resp.text
return retval
def __checkForSuccess(self, resp):
try:
resp.raise_for_status()
except requests.HTTPError:
print("ERROR: " + resp.text)
raise
class MetadataClient(BaseClient):
BASE_RESOURCE = 'metadata'
def __init__(self, baseURL, headers=None):
BaseClient.__init__(self, baseURL, self.BASE_RESOURCE, headers)
def getWorkflowDef(self, wfname, version=None):
url = self.makeUrl('workflow/{}', wfname)
return self.get(url, self.makeParams(version=version))
def createWorkflowDef(self, wfdObj):
url = self.makeUrl('workflow')
return self.post(url, None, wfdObj)
def updateWorkflowDefs(self, listOfWfdObj):
url = self.makeUrl('workflow')
self.put(url, None, listOfWfdObj)
def getAllWorkflowDefs(self):
url = self.makeUrl('workflow')
return self.get(url)
def unRegisterWorkflowDef(self, wfname, version):
url = self.makeUrl("workflow/{name}/{version}".format(name=wfname, version=version))
self.delete(url, None)
def getTaskDef(self, tdName):
url = self.makeUrl('taskdefs/{}', tdName)
return self.get(url)
def registerTaskDefs(self, listOfTaskDefObj):
url = self.makeUrl('taskdefs')
return self.post(url, None, listOfTaskDefObj)
def registerTaskDef(self, taskDefObj):
"""registerTaskDef is deprecated since PUT /metadata/taskdefs does not
register but updates a task definition. Use updateTaskDef function
instead.
"""
warnings.warn(self.registerTaskDef.__doc__, DeprecationWarning)
url = self.makeUrl('taskdefs')
self.put(url, None, taskDefObj)
def updateTaskDef(self, taskDefObj):
url = self.makeUrl('taskdefs')
self.put(url, None, taskDefObj)
def unRegisterTaskDef(self, tdName, reason=None):
url = self.makeUrl('taskdefs/{}', tdName)
self.delete(url, self.makeParams(reason=reason))
def getAllTaskDefs(self):
url = self.makeUrl('taskdefs')
return self.get(url)
class TaskClient(BaseClient):
BASE_RESOURCE = 'tasks'
def __init__(self, baseURL, headers=None):
BaseClient.__init__(self, baseURL, self.BASE_RESOURCE, headers)
def getTask(self, taskId):
url = self.makeUrl('{}', taskId)
return self.get(url)
def updateTask(self, taskObj):
url = self.makeUrl('')
headers = {'Content-Type':'application/json', 'Accept': 'text/plain; encodingcharset=utf-8'}
self.post(url, None, taskObj, headers)
def pollForTask(self, taskType, workerid, domain=None, extraParams=None, reRegisterFunction=None):
url = self.makeUrl('poll/{}', taskType)
params = {}
params['workerid'] = workerid
if domain is not None:
params['domain'] = domain
if extraParams is not None:
#params = extraParams | params
params = dict(list(extraParams.items()) + list(params.items()))
try:
logging.getLogger("pyexec").debug("Polling at %s with %s", url, params)
return self.get(url, params)
except requests.HTTPError as err:
if reRegisterFunction is not None:
reRegisterFunction()
else:
logging.getLogger("pyexec").warning('Error while polling: %s', str(err))
return None
def pollForBatch(self, taskType, count, timeout, workerid, domain=None):
url = self.makeUrl('poll/batch/{}', taskType)
params = {}
params['workerid'] = workerid
params['count'] = count
params['timeout'] = timeout
if domain is not None:
params['domain'] = domain
try:
return self.get(url, params)
except Exception as err:
print('Error while polling ' + str(err))
return None
def ackTask(self, taskId, workerid):
#url = self.makeUrl('{}/ack', taskId)
#params = {}
#params['workerid'] = workerid
#headers = {'Accept': 'application/json'}
#value = self.post(url, params, None, headers)
#return value in ['true', True]
return True
def getTasksInQueue(self, taskName):
url = self.makeUrl('queue/{}', taskName)
return self.get(url)
def removeTaskFromQueue(self, taskId, reason=None):
url = self.makeUrl('queue/{}', taskId)
params = {}
params['reason'] = reason
self.delete(url, params)
def getTaskQueueSizes(self, listOfTaskName):
url = self.makeUrl('queue/sizes')
return self.post(url, None, listOfTaskName)
class WorkflowClient(BaseClient):
BASE_RESOURCE = 'workflow'
def __init__(self, baseURL, headers=None):
BaseClient.__init__(self, baseURL, self.BASE_RESOURCE, headers)
def getWorkflow(self, wfId, includeTasks=True):
url = self.makeUrl('{}', wfId)
params = {}
params['includeTasks'] = includeTasks
return self.get(url, params)
def getRunningWorkflows(self, wfName, version=None, startTime=None, endTime=None):
url = self.makeUrl('running/{}', wfName)
params = {}
params['version'] = version
params['startTime'] = startTime
params['endTime'] = endTime
return self.get(url, params)
def startWorkflow(self, wfName, inputjson, version=None, correlationId=None):
url = self.makeUrl('{}', wfName)
params = {}
params['version'] = version
params['correlationId'] = correlationId
headers = {'Accept': 'text/plain'}
return self.post(url, params, inputjson, headers)
def terminateWorkflow(self, wfId, reason=None):
url = self.makeUrl('{}', wfId)
params = {}
params['reason'] = reason
self.delete(url, params)
def removeWorkflow(self, wfId, archiveWorkflow, reason=None):
url = self.makeUrl('{}/remove', wfId)
self.delete(url, self.makeParams(archiveWorkflow=archiveWorkflow, reason=reason))
def pauseWorkflow(self, wfId):
url = self.makeUrl('{}/pause', wfId)
self.put(url)
def resumeWorkflow(self, wfId):
url = self.makeUrl('{}/resume', wfId)
self.put(url)
def skipTaskFromWorkflow(self, wfId, taskRefName, skipTaskRequest):
url = self.makeUrl('{}/skiptask/{}', wfId, taskRefName)
self.post(url, None, skipTaskRequest)
def rerunWorkflow(self, wfId, taskRefName, rerunWorkflowRequest):
url = self.makeUrl('{}/rerun', wfId)
return self.post(url, None, rerunWorkflowRequest)
def restartWorkflow(self, wfId, taskRefName, fromTaskRef):
url = self.makeUrl('{}/restart', wfId)
params = {}
params['from'] = fromTaskRef
self.post(url, params, None)
class EventServicesClient(BaseClient):
BASE_RESOURCE = 'event'
def __init__(self, baseURL, headers = None):
BaseClient.__init__(self, baseURL, self.BASE_RESOURCE, headers)
def getEventHandlerDef(self, event, activeOnly=True):
url = self.makeUrl('{}', event)
params = {}
params['activeOnly'] = activeOnly
return self.get(url, params)
def getEventHandlerDefs(self):
url = self.makeUrl()
return self.get(url)
def createEventHandlerDef(self, ehObj):
url = self.makeUrl()
return self.post(url, None, ehObj)
def updateEventHandlerDef(self, ehObj):
url = self.makeUrl()
return self.put(url, None, ehObj)
def removeEventHandler(self, ehName):
url = self.makeUrl('{}', ehName)
self.delete(url, {})
def getEventHandlerQueues(self):
url = self.makeUrl('queues')
return self.get(url)
def getEventHandlerQueuesProviders(self):
url = self.makeUrl('queues/providers')
return self.get(url)
class WFClientMgr:
def __init__(self, server_url='http://localhost:8080/api/', headers=None):
self.workflowClient = WorkflowClient(server_url, headers)
self.taskClient = TaskClient(server_url, headers)
self.metadataClient = MetadataClient(server_url, headers)
def main():
if len(sys.argv) < 3:
print("Usage - python conductor server_url command parameters...")
return None
server_url = sys.argv[1]
command = sys.argv[2]
wfcMgr = WFClientMgr(server_url)
wfc = wfcMgr.workflowClient
if command == 'start':
if len(sys.argv) < 7:
print('python conductor server_url start workflow_name input_json [version] [correlationId]')
return None
wfName = sys.argv[3]
input = json.loads(sys.argv[5])
correlationId = sys.argv[6]
workflowId = wfc.startWorkflow(wfName, input, 1, correlationId)
print(workflowId)
return workflowId
elif command == 'get':
if len(sys.argv) < 4:
print('python conductor server_url get workflow_id')
return None
wfId = sys.argv[3]
wfjson = wfc.getWorkflow(wfId)
print(json.dumps(wfjson, indent=True, separators=(',', ': ')))
return wfjson
elif command == 'terminate':
if len(sys.argv) < 4:
print('python conductor server_url terminate workflow_id')
return None
wfId = sys.argv[3]
wfc.terminateWorkflow(wfId)
print('OK')
return wfId
if __name__ == '__main__':
main()