import requests; import json; import logging; class PyExecPlugins(type): def __init__(cls, name, bases, attrs): if not hasattr(cls, "plugins"): logging.getLogger("pyexec").debug("Initializing plugins") cls.plugins = {} cls.alias = {} else: logging.getLogger("pyexec").debug("Appending a plugin %s - %s ", cls.name, cls) cls.plugins[cls.name] = cls #Alias plugin also with taskdefinition so it will be lookup-able with poth Operation name and taskdefinition if cls.taskdef is not None: cls.alias[cls.taskdef["name"]] = cls def getPlugins(cls): return cls.plugins def getPluginNames(cls): return [*cls.plugins.keys()] def get(cls, name): return cls.plugins.get(name) def getAlias(cls, name): return cls.alias.get(name) def registerTaskDefinitions(cls, server, auth): url = server.rstrip("/") + "/metadata/taskdefs" #pyexec generic taskdefinition taskdefs = [] for plg in cls.getPluginNames(): if cls.plugins[plg].taskdef is not None: taskdef = cls.plugins[plg].taskdef taskdefs.append(taskdef) #Post all new (or not) task definitions to orchestrator logging.getLogger("pyexec").debug("Recording task definitions %s", taskdefs) headers = {'Content-Type': 'application/json'} if auth != None: headers['Authorization'] = 'Basic ' + auth try: response = requests.request("POST", url, headers=headers, data=json.dumps(taskdefs), timeout=(5, 10)) except Exception as e: logging.getLogger("pyexec").warning("Unable to register task defs %s", e) class PyExecPlugin(object, metaclass=PyExecPlugins): def __init__(self, data=None, config=None): self.data = data self.config = config def hasDefinition(self): return (self.taskdef is not None) def isAutomatic(self): return False