2020-06-19 10:50:02 +02:00
#
# Copyright 2017 Netflix, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function , absolute_import
import sys
import time
2020-11-06 18:26:44 +01:00
import logging
2020-06-19 10:50:02 +02:00
from conductor . conductor import WFClientMgr
from threading import Thread
2023-04-17 17:27:44 +02:00
import threading
2020-06-19 10:50:02 +02:00
import socket
hostname = socket . gethostname ( )
class ConductorWorker :
"""
Main class for implementing Conductor Workers
A conductor worker is a separate system that executes the various
tasks that the conductor server queues up for execution . The worker
can run on the same instance as the server or on a remote instance .
The worker generally provides a wrapper around some function that
performs the actual execution of the task . The function that is
being executed must return a ` dict ` with the ` status ` , ` output ` and
` log ` keys . If these keys are not present , the worker will raise an
Exception after completion of the task .
The start method is used to begin continous polling and execution
of the tasks that the conductor server makes available . The same
script can run multiple workers using the wait argument . For more
details , view the start method
"""
2021-07-22 17:42:43 +02:00
def __init__ ( self , server_url , thread_count , polling_interval , worker_id = None , auth = None ) :
2020-06-19 10:50:02 +02:00
"""
Parameters
- - - - - - - - - -
server_url : str
The url to the server hosting the conductor api .
Ex : ' http://localhost:8080/api '
thread_count : int
The number of threads that will be polling for and
executing tasks in case of using the start method .
polling_interval : float
The number of seconds that each worker thread will wait
between polls to the conductor server .
worker_id : str , optional
The worker_id of the worker that is going to execute the
task . For further details , refer to the documentation
By default , it is set to hostname of the machine
2021-07-22 17:42:43 +02:00
auth : the basic auth to obtain authorization for polling
2020-06-19 10:50:02 +02:00
"""
2021-07-22 17:42:43 +02:00
hdrs = None
if auth != None :
hdrs = { " Authorization " : " Basic " + auth }
wfcMgr = WFClientMgr ( server_url , hdrs )
2020-06-19 10:50:02 +02:00
self . workflowClient = wfcMgr . workflowClient
self . taskClient = wfcMgr . taskClient
self . thread_count = thread_count
self . polling_interval = polling_interval
self . worker_id = worker_id or hostname
def execute ( self , task , exec_function ) :
try :
2020-11-06 18:26:44 +01:00
logging . getLogger ( " pyexec " ) . debug ( " Exec function is %s " , exec_function )
2020-06-19 10:50:02 +02:00
resp = exec_function ( task )
if type ( resp ) is not dict or not all ( key in resp for key in ( ' status ' , ' output ' , ' logs ' ) ) :
raise Exception ( ' Task execution function MUST return a response as a dict with status, output and logs fields ' )
task [ ' status ' ] = resp [ ' status ' ]
task [ ' outputData ' ] = resp [ ' output ' ]
task [ ' logs ' ] = resp [ ' logs ' ]
if ' reasonForIncompletion ' in resp :
task [ ' reasonForIncompletion ' ] = resp [ ' reasonForIncompletion ' ]
self . taskClient . updateTask ( task )
except Exception as err :
2023-04-17 17:27:44 +02:00
logging . getLogger ( " pyexec " ) . error ( ' Error executing task: %s \n %s ' , task [ " referenceTaskName " ] , str ( err ) )
2020-06-19 10:50:02 +02:00
task [ ' status ' ] = ' FAILED '
self . taskClient . updateTask ( task )
def poll_and_execute ( self , taskType , exec_function , domain = None ) :
while True :
time . sleep ( float ( self . polling_interval ) )
2021-07-22 17:42:43 +02:00
start = time . time ( )
2023-04-17 17:27:44 +02:00
polled = None
try :
polled = self . taskClient . pollForTask ( taskType , self . worker_id , domain )
end = time . time ( )
logging . getLogger ( " pyexec " ) . debug ( " Polled in %s sec " , ( end - start ) )
except Exception as e :
logging . getLogger ( " pyexec " ) . warning ( " Error during main loop: %s " , str ( e ) )
2020-06-19 10:50:02 +02:00
if polled is not None :
self . taskClient . ackTask ( polled [ ' taskId ' ] , self . worker_id )
self . execute ( polled , exec_function )
def start ( self , taskType , exec_function , wait , domain = None ) :
"""
start begins the continuous polling of the conductor server
Parameters
- - - - - - - - - -
taskType : str
The name of the task that the worker is looking to execute
exec_function : function
The function that the worker will execute . The function
must return a dict with the ` status ` , ` output ` and ` logs `
keys present . If this is not present , an Exception will be
raised
wait : bool
Whether the worker will block execution of further code .
Since the workers are being run in daemon threads , when the
program completes execution , all the threads are destroyed .
Setting wait to True prevents the program from ending .
If multiple workers are being called from the same program ,
all but the last start call but have wait set to False .
The last start call must always set wait to True . If a
single worker is being called , set wait to True .
domain : str , optional
The domain of the task under which the worker will run . For
further details refer to the conductor server documentation
By default , it is set to None
"""
2020-11-06 18:26:44 +01:00
logging . getLogger ( " pyexec " ) . debug ( ' Polling for task %s at a %f ms interval with %d threads for task execution, with worker id as %s ' % ( taskType , self . polling_interval * 1000 , self . thread_count , self . worker_id ) )
2020-06-19 10:50:02 +02:00
for x in range ( 0 , int ( self . thread_count ) ) :
thread = Thread ( target = self . poll_and_execute , args = ( taskType , exec_function , domain , ) )
thread . daemon = True
2020-11-06 18:26:44 +01:00
logging . getLogger ( " pyexec " ) . debug ( " Starting Thread %d for %s " , x , taskType )
2020-06-19 10:50:02 +02:00
thread . start ( )
if wait :
2023-04-17 17:27:44 +02:00
expectedthreads = threading . active_count ( )
2020-06-19 10:50:02 +02:00
while 1 :
2023-04-17 17:27:44 +02:00
time . sleep ( 60 )
activecount = threading . active_count ( )
if expectedthreads != activecount :
logging . getLogger ( " pyexec " ) . fatal ( " !!! Active/expected running threads count is: %d / %d " , activecount , expectedthreads )
2020-06-19 10:50:02 +02:00
def exc ( taskType , inputData , startTime , retryCount , status , callbackAfterSeconds , pollCount ) :
2020-11-06 18:26:44 +01:00
logging . getLogger ( " pyexec " ) . debug ( ' Executing the function ' )
2020-06-19 10:50:02 +02:00
return { ' status ' : ' COMPLETED ' , ' output ' : { } }
def main ( ) :
cc = ConductorWorker ( ' http://localhost:8080/api ' , 5 , 0.1 )
cc . start ( sys . argv [ 1 ] , exc , False )
cc . start ( sys . argv [ 2 ] , exc , True )
if __name__ == ' __main__ ' :
main ( )