conductor-worker-python/pyexecplugins/Ansible.py

147 lines
7.2 KiB
Python

from pyexecplugins.pyexecplugins import PyExecPlugin
import json
import sys, os, traceback
import shutil
import logging
import ansible.constants as C
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.module_utils.common.collections import ImmutableDict
from ansible.inventory.manager import InventoryManager
from ansible.parsing.dataloader import DataLoader
from ansible.playbook.play import Play
from ansible.plugins.callback import CallbackBase
from ansible.vars.manager import VariableManager
from ansible import context
# Create a callback plugin so we can capture the output
class ResultsCollectorJSONCallback(CallbackBase):
"""A sample callback plugin used for performing an action as results come in.
If you want to collect all results into a single object for processing at
the end of the execution, look into utilizing the ``json`` callback plugin
or writing your own custom callback plugin.
"""
def __init__(self, *args, **kwargs):
super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs)
self.host_ok = []
self.host_unreachable = []
self.host_failed = []
def v2_runner_on_unreachable(self, result):
host = result._host
self.host_unreachable.append({ "host" :host.get_name(), "result" : result._result})
def v2_runner_on_ok(self, result, *args, **kwargs):
host = result._host
#print(dir(result._task), result._result.get("ansible_facts"))
#self.host_ok.append({ "host" :host.get_name(), "result" : result._result})
self.host_ok.append({ "host" :host.get_name(), "name" : result._task.name, "result" : { "stdout" : result._result.get("stdout"), "stderr" : result._result.get("stderr"), "ansible_facts" : result._result.get("ansible_facts") }})
def v2_runner_on_failed(self, result, *args, **kwargs):
host = result._host
self.host_failed.append({ "host" :host.get_name(), "result" : result._result})
class Ansible(PyExecPlugin):
name = "Ansible"
taskdef = {
"name" : "pyansible",
"retryCount" : 0,
"description" : "Execute ansible playbook",
"inputKeys" : ["playbook", "hosts", "connection", "verbosity", "extra_vars", "gather_facts"],
"outputKeys" : ["ok", "failed", "unreachable"],
"ownerEmail" : "m.lettere@gmail.com"
}
def __init__(self, data=None, config=None):
super().__init__(data, config)
self.playbook = self.data.get("playbook")
self.hosts = self.data.get("hosts", ["localhost"])
self.connection = self.data.get("connection","local")
self.verbosity = self.data.get("verbosity", 0)
self.extra_vars = [self.data.get("extra_vars")] if self.data.get("extra_vars") else []
self.gather_facts = self.data.get("gather_facts", False)
logging.getLogger("pyexec").debug("Hosts: %s\n Connection: %s\n Verbosity: %s\n Extra vars: %s\n Gather facts: %s\n", self.hosts, self.connection, self.verbosity, self.extra_vars, self.gather_facts)
def execute(self):
# since the API is constructed for CLI it expects certain options to always be set in the context object
context.CLIARGS = ImmutableDict(connection=self.connection, forks=10, become=False, become_method="sudo", become_user=None, check=False, diff=False, verbosity=self.verbosity, extra_vars = self.extra_vars)
# required for
# https://github.com/ansible/ansible/blob/devel/lib/ansible/inventory/manager.py#L204
if (len(self.extra_vars) > 0) and self.extra_vars[0].get("ansible_host"):
sources = self.extra_vars[0].get("ansible_host")
else:
sources = ','.join(self.hosts)
if len(self.hosts) == 1:
sources += ','
logging.getLogger("pyexec").debug("Sources are %s", sources)
# initialize needed objects
loader = DataLoader() # Takes care of finding and reading yaml, json and ini files
passwords = dict()
# Instantiate our ResultsCollectorJSONCallback for handling results as they come in. Ansible expects this to be one of its main display outlets
results_callback = ResultsCollectorJSONCallback()
# create inventory, use path to host config file as source or hosts in a comma separated string
inventory = InventoryManager(loader=loader, sources=sources)
# variable manager takes care of merging all the different sources to give you a unified view of variables available in each context
logging.getLogger("pyexec").debug("Instiantiating VariableManager %s %s", loader, inventory)
try:
variable_manager = VariableManager(loader=loader, inventory=inventory)
except Exception as exc:
exc_type, exc_obj, exc_tb = sys.exc_info()
traceback.print_exc(file=sys.stdout)
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
logging.getLogger("pyexec").debug("%s %s %s", exc_type, fname, exc_tb.tb_lineno)
logging.getLogger("pyexec").error("%s", str(exc))
# instantiate task queue manager, which takes care of forking and setting up all objects to iterate over host list and tasks
# IMPORTANT: This also adds library dirs paths to the module loader
# IMPORTANT: and so it must be initialized before calling `Play.load()`.
logging.getLogger("pyexec").debug("Creating TaskQueueManager")
tqm = TaskQueueManager(
inventory=inventory,
variable_manager=variable_manager,
loader=loader,
passwords=passwords,
stdout_callback=results_callback, # Use our custom callback instead of the ``default`` callback plugin, which prints to stdout
)
# create data structure that represents our play
logging.getLogger("pyexec").debug("Loading playbook %s", self.playbook)
play_sources = loader.load(self.playbook)
logging.getLogger("pyexec").debug("Play sources loaded %s", play_sources)
for play_source in play_sources:
logging.getLogger("pyexec").debug("Executing playsource: %s",play_source)
play_source["gather_facts"] = self.gather_facts
# Create play object, playbook objects use .load instead of init or new methods,
# this will also automatically create the task objects from the info provided in play_source
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
logging.getLogger("pyexec").debug("Play is: %s",play)
# Actually run it
result = tqm.run(play) # most interesting data for a play is actually sent to the callback's methods
logging.getLogger("pyexec").debug("Executed playsource returns: %s",result)
# we always need to cleanup child procs and the structures we use to communicate with them
tqm.cleanup()
if loader:
loader.cleanup_all_tmp_files()
# Remove ansible tmpdir
shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
return {
"ok" : results_callback.host_ok,
"failed" : results_callback.host_failed,
"unreachable" : results_callback.host_unreachable
}