diff --git a/conductor/conductor.py b/conductor/conductor.py index b725882..e19afd1 100644 --- a/conductor/conductor.py +++ b/conductor/conductor.py @@ -21,7 +21,6 @@ import sys import socket import warnings - hostname = socket.gethostname() @@ -34,30 +33,30 @@ class BaseClient(object): self.headers = self.mergeTwoDicts(self.headers, headers) if headers != None else self.headers self.baseResource = baseResource - def get(self, resPath, queryParams=None): + def get(self, resPath, queryParams=None, timeout=(5, 10)): theUrl = "{}/{}".format(self.baseURL, resPath) - resp = requests.get(theUrl, params=queryParams, headers=self.headers) + resp = requests.get(theUrl, params=queryParams, headers=self.headers, timeout=timeout) self.__checkForSuccess(resp) if(resp.content == b''): return None else: return resp.json() - def post(self, resPath, queryParams, body, headers=None): + def post(self, resPath, queryParams, body, headers=None, timeout=(5, 10)): theUrl = "{}/{}".format(self.baseURL, resPath) theHeader = self.headers if headers is not None: theHeader = self.mergeTwoDicts(self.headers, headers) if body is not None: jsonBody = json.dumps(body, ensure_ascii=True) - resp = requests.post(theUrl, params=queryParams, data=jsonBody, headers=theHeader) + resp = requests.post(theUrl, params=queryParams, data=jsonBody, headers=theHeader, timeout=timeout) else: - resp = requests.post(theUrl, params=queryParams, headers=theHeaders) + resp = requests.post(theUrl, params=queryParams, headers=theHeaders, timeout=timeout) self.__checkForSuccess(resp) return self.__return(resp, theHeader) - def put(self, resPath, queryParams=None, body=None, headers=None): + def put(self, resPath, queryParams=None, body=None, headers=None, timeout=(5, 10)): theUrl = "{}/{}".format(self.baseURL, resPath) theHeader = self.headers if headers is not None: @@ -65,16 +64,16 @@ class BaseClient(object): if body is not None: jsonBody = json.dumps(body, ensure_ascii=True) - resp = requests.put(theUrl, params=queryParams, data=jsonBody, headers=theHeaders) + resp = requests.put(theUrl, params=queryParams, data=jsonBody, headers=theHeaders, timeout=timeout) else: - resp = requests.put(theUrl, params=queryParams, headers=theHeaders) + resp = requests.put(theUrl, params=queryParams, headers=theHeaders, timeout=timeout) self.__print(resp) self.__checkForSuccess(resp) - def delete(self, resPath, queryParams): + def delete(self, resPath, queryParams, timeout=(5, 10)): theUrl = "{}/{}".format(self.baseURL, resPath) - resp = requests.delete(theUrl, params=queryParams, headers=self.headers) + resp = requests.delete(theUrl, params=queryParams, headers=self.headers, timeout=timeout) self.__print(resp) self.__checkForSuccess(resp) @@ -186,17 +185,24 @@ class TaskClient(BaseClient): headers = {'Accept': 'text/plain; encodingcharset=utf-8'} self.post(url, None, taskObj, headers) - def pollForTask(self, taskType, workerid, domain=None): + def pollForTask(self, taskType, workerid, domain=None, extraParams=None, reRegisterFunction=None): url = self.makeUrl('poll/{}', taskType) params = {} params['workerid'] = workerid if domain is not None: params['domain'] = domain + if extraParams is not None: + #params = extraParams | params + params = dict(list(extraParams.items()) + list(params.items())) try: logging.getLogger("pyexec").debug("Polling at %s with %s", url, params) - return self.get(url, params) - except Exception as err: - logging.getLogger("pyexec").debug('Error while polling %s', str(err)) + return self.get(url, params, timeout=(1, 3)) + except requests.HTTPError as err: + if reRegisterFunction is not None: + reRegisterFunction() + else: + logging.getLogger("pyexec").debug('Error while polling %s', str(err)) + return None def pollForBatch(self, taskType, count, timeout, workerid, domain=None):