59 lines
2.0 KiB
Python
59 lines
2.0 KiB
Python
import requests;
|
|
import json;
|
|
import logging;
|
|
|
|
class PyExecPlugins(type):
|
|
def __init__(cls, name, bases, attrs):
|
|
if not hasattr(cls, "plugins"):
|
|
logging.getLogger("pyexec").debug("Initializing plugins")
|
|
cls.plugins = {}
|
|
cls.alias = {}
|
|
else:
|
|
logging.getLogger("pyexec").debug("Appending a plugin %s - %s ", cls.name, cls)
|
|
cls.plugins[cls.name] = cls
|
|
#Alias plugin also with taskdefinition so it will be lookup-able with poth Operation name and taskdefinition
|
|
if cls.taskdef is not None:
|
|
cls.alias[cls.taskdef["name"]] = cls
|
|
|
|
def getPlugins(cls):
|
|
return cls.plugins
|
|
|
|
def getPluginNames(cls):
|
|
return [*cls.plugins.keys()]
|
|
|
|
def get(cls, name):
|
|
return cls.plugins.get(name)
|
|
|
|
def getAlias(cls, name):
|
|
return cls.alias.get(name)
|
|
|
|
def registerTaskDefinitions(cls, server, auth):
|
|
url = server.rstrip("/") + "/metadata/taskdefs"
|
|
#pyexec generic taskdefinition
|
|
taskdefs = []
|
|
for plg in cls.getPluginNames():
|
|
if cls.plugins[plg].taskdef is not None:
|
|
taskdef = cls.plugins[plg].taskdef
|
|
taskdefs.append(taskdef)
|
|
|
|
#Post all new (or not) task definitions to orchestrator
|
|
logging.getLogger("pyexec").debug("Recording task definitions %s", taskdefs)
|
|
headers = {'Content-Type': 'application/json'}
|
|
if auth != None:
|
|
headers['Authorization'] = 'Basic ' + auth
|
|
try:
|
|
response = requests.request("POST", url, headers=headers, data=json.dumps(taskdefs), timeout=(5, 10))
|
|
except Exception as e:
|
|
logging.getLogger("pyexec").warning("Unable to register task defs %s", e)
|
|
|
|
class PyExecPlugin(object, metaclass=PyExecPlugins):
|
|
def __init__(self, data=None, config=None):
|
|
self.data = data
|
|
self.config = config
|
|
|
|
def hasDefinition(self):
|
|
return (self.taskdef is not None)
|
|
|
|
def isAutomatic(self):
|
|
return False
|