conductor-worker-python/conductor/ConductorWorker.py

144 lines
6.0 KiB
Python

#
# Copyright 2017 Netflix, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function, absolute_import
import sys
import time
from conductor.conductor import WFClientMgr
from threading import Thread
import socket
hostname = socket.gethostname()
class ConductorWorker:
"""
Main class for implementing Conductor Workers
A conductor worker is a separate system that executes the various
tasks that the conductor server queues up for execution. The worker
can run on the same instance as the server or on a remote instance.
The worker generally provides a wrapper around some function that
performs the actual execution of the task. The function that is
being executed must return a `dict` with the `status`, `output` and
`log` keys. If these keys are not present, the worker will raise an
Exception after completion of the task.
The start method is used to begin continous polling and execution
of the tasks that the conductor server makes available. The same
script can run multiple workers using the wait argument. For more
details, view the start method
"""
def __init__(self, server_url, thread_count, polling_interval, worker_id=None):
"""
Parameters
----------
server_url: str
The url to the server hosting the conductor api.
Ex: 'http://localhost:8080/api'
thread_count: int
The number of threads that will be polling for and
executing tasks in case of using the start method.
polling_interval: float
The number of seconds that each worker thread will wait
between polls to the conductor server.
worker_id: str, optional
The worker_id of the worker that is going to execute the
task. For further details, refer to the documentation
By default, it is set to hostname of the machine
"""
wfcMgr = WFClientMgr(server_url)
self.workflowClient = wfcMgr.workflowClient
self.taskClient = wfcMgr.taskClient
self.thread_count = thread_count
self.polling_interval = polling_interval
self.worker_id = worker_id or hostname
def execute(self, task, exec_function):
try:
#print("Exec function is", exec_function)
resp = exec_function(task)
if type(resp) is not dict or not all(key in resp for key in ('status', 'output', 'logs')):
raise Exception('Task execution function MUST return a response as a dict with status, output and logs fields')
task['status'] = resp['status']
task['outputData'] = resp['output']
task['logs'] = resp['logs']
if 'reasonForIncompletion' in resp:
task['reasonForIncompletion'] = resp['reasonForIncompletion']
self.taskClient.updateTask(task)
except Exception as err:
print('Error executing task: ', task["referenceTaskName"], str(err) )
task['status'] = 'FAILED'
self.taskClient.updateTask(task)
def poll_and_execute(self, taskType, exec_function, domain=None):
while True:
time.sleep(float(self.polling_interval))
polled = self.taskClient.pollForTask(taskType, self.worker_id, domain)
if polled is not None:
self.taskClient.ackTask(polled['taskId'], self.worker_id)
self.execute(polled, exec_function)
def start(self, taskType, exec_function, wait, domain=None):
"""
start begins the continuous polling of the conductor server
Parameters
----------
taskType: str
The name of the task that the worker is looking to execute
exec_function: function
The function that the worker will execute. The function
must return a dict with the `status`, `output` and `logs`
keys present. If this is not present, an Exception will be
raised
wait: bool
Whether the worker will block execution of further code.
Since the workers are being run in daemon threads, when the
program completes execution, all the threads are destroyed.
Setting wait to True prevents the program from ending.
If multiple workers are being called from the same program,
all but the last start call but have wait set to False.
The last start call must always set wait to True. If a
single worker is being called, set wait to True.
domain: str, optional
The domain of the task under which the worker will run. For
further details refer to the conductor server documentation
By default, it is set to None
"""
print('Polling for task %s at a %f ms interval with %d threads for task execution, with worker id as %s' % (taskType, self.polling_interval * 1000, self.thread_count, self.worker_id))
for x in range(0, int(self.thread_count)):
thread = Thread(target=self.poll_and_execute, args=(taskType, exec_function, domain,))
thread.daemon = True
thread.start()
if wait:
while 1:
time.sleep(1)
def exc(taskType, inputData, startTime, retryCount, status, callbackAfterSeconds, pollCount):
print('Executing the function')
return {'status': 'COMPLETED', 'output': {}}
def main():
cc = ConductorWorker('http://localhost:8080/api', 5, 0.1)
cc.start(sys.argv[1], exc, False)
cc.start(sys.argv[2], exc, True)
if __name__ == '__main__':
main()