参考
AnsibleAPI 开发
Python3 调用 Ansible2.x API
ansible 封装基础类
import json
import shutil
from ansible.module_utils.common.collections import ImmutableDict #用于添加选项。比如: 指定远程用户remote_user=None
from ansible.parsing.dataloader import DataLoader #读取 json/ymal/ini 格式的文件的数据解析器
from ansible.vars.manager import VariableManager #管理主机和主机组的变量管理器
from ansible.inventory.manager import InventoryManager #管理资源库的,可以指定一个 inventory 文件等
from ansible.playbook.play import Play #用于执行 Ad-hoc 的类 ,需要传入相应的参数
from ansible.executor.task_queue_manager import TaskQueueManager #ansible 底层用到的任务队列管理器
from ansible.plugins.callback import CallbackBase #处理任务执行后返回的状态,用于处理执行结果的。 后面我们可以继承改写这个类用作回调插件,以便满足我们的需求。
from ansible import context #上下文管理器,他就是用来接收 ImmutableDict 的示例对象
import ansible.constants as C #用于获取 ansible 产生的临时文档
from ansible.executor.playbook_executor import PlaybookExecutor # 执行 playbook 的核心类
from ansible.inventory.host import Group #对 主机组 执行操作 ,可以给组添加变量等操作,扩展
from ansible.inventory.host import Host #对 主机 执行操作 ,可以给主机添加变量等操作,扩展
import time
import datetime
class ResultCallback(CallbackBase):
"""
重写callbackBase类的部分方法,可以获得执行后返回的结果和状态
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
#参考的博客中原本定义的,信息不准确,{ip:result}
#以ip为key,result信息为value,存进字典中,但如果执行了多个任务只能收集到最后执行的任务的结果,之前的执行信息会被覆盖,所以后面我改了下收集信息的方式
self.host_ok = {}
self.host_unreachable = {}
self.host_failed = {}
#重新定义的收集结果信息的方法,可以收集到所有的执行信息,按执行的顺序放入列表,但缺点是没有按ip来进行分类
self.host_all_info=[] #存放所有的执行结果信息的列表[{reslut1},{reslut2}]
self.host_failed_iplist=[]
self.host_unreachabled_iplist=[]
#机器不可达的时候,result参数里是执行的结果信息等
def v2_runner_on_unreachable(self, result):
#以ip为key,result信息为value,存进字典中,但如果执行了多个任务只能收集到最后执行的任务的结果,所以后面我改了下收集信息的方式
self.host_unreachable[result._host.get_name()] = result
exec_time=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') #执行时间
host_ip=result._host.get_name() #主机ip
task_name=result.task_name #任务名称
result_info=result._result #执行的结果信息
#获取任务执行的相关信息,放入字典
result_dict={"exec_time":exec_time,"host_ip":host_ip,"task_name":task_name,"status":"unreachable","result_info":result_info}
#print(result_dict)
#然后再讲字典放入存放全部信息的列表中
self.host_all_info.append(result_dict)
#任务成功的时候回调函数
#在ad-hoc下每个ip成功,都会执行这个函数,而在playbook状态下,每个action都会执行这个函数,例如,全部ip第一个action执行完才会继续执行下一个action
def v2_runner_on_ok(self, result, **kwargs):
# 以ip为key,result信息为value,存进字典中,但如果执行了多个任务只能收集到最后执行的任务的结果,所以后面我改了下收集信息的方式
self.host_ok[result._host.get_name()] = result
exec_time=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
host_ip=result._host.get_name()
task_name=result.task_name
result_info=result._result
# 获取任务执行的相关信息,放入字典
result_dict={"exec_time":exec_time,"host_ip":host_ip,"task_name":task_name,"status":"success","result_info":result_info}
#print(result_dict)
# 然后再讲字典放入存放全部信息的列表中
self.host_all_info.append(result_dict)
#任务执行失败时的回调函数
def v2_runner_on_failed(self, result, ignore_errors=False,**kwargs):
self.host_failed[result._host.get_name()] = result
exec_time=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
host_ip=result._host.get_name()
task_name=result.task_name
result_info=result._result
result_dict={"exec_time":exec_time,"host_ip":host_ip,"task_name":task_name,"status":"failed","result_info":result_info}
#print(result_dict)
self.host_all_info.append(result_dict)
# print("failed:",end="")
# print(result._host.get_name(),end=" ")
# print(result.task_name,end=" ")
# print(result._result)
#任务被skip时的回调函数
def v2_runner_on_skipped(self, result,**kwargs):
exec_time=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
host_ip=result._host.get_name()
task_name=result.task_name
result_info=result._result
result_dict={"exec_time":exec_time,"host_ip":host_ip,"task_name":task_name,"status":"skip","result_info":result_info}
#print(result_dict)
self.host_all_info.append(result_dict)
#task中每个action都会执行这个函数,可以用task.name获取action的名称,只执行一次
def v2_playbook_on_task_start(self, task, is_conditional):
pass
#print(task.name)
#print(task._uuid)
def v2_playbook_on_play_start(self, play):
pass
#print(play.name)
#获取playbook执行的状态,在playbook执行完后执行该函数,可以获取该playbook中执行的主机,以及每个主机执行该playbook中action成功失败的个数
def v2_playbook_on_stats(self, stats):
#print(stats.processed) #{'10.104.114.197': 1, '10.104.113.129': 1, '10.104.113.130': 1}
#print(stats.processed.keys()) #dict_keys(['10.104.114.197', '10.104.113.129', '10.104.113.130'])
hosts = sorted(stats.processed.keys())
for h in hosts:
t = stats.summarize(h) #{'ok': 1, 'failures': 0, 'unreachable': 0, 'changed': 0, 'skipped': 0, 'rescued': 0, 'ignored': 0}
if t["failures"]!=int(0):
self.host_failed_iplist.append(h)
if t["unreachable"]!=int(0):
self.host_unreachabled_iplist.append(h)
#print(t)
# task中每个action都会执行这个函数,可以用task.name获取action的名称,每个ip都执行一次
def v2_runner_on_start(self, host, task):
pass
#print(task.name)
# def v2_playbook_on_stats(self, stats):
# print(stats.processed.keys())
# print(stats.processed)
# print(sorted(stats.processed.keys()))
# print(stats.summarize(sorted(stats.processed.keys())[0]))
# def v2_playbook_on_play_start(self, play):
# print(play.name)
class MyAnsiable():
#自定义类的一些初始化信息,在下面的context.CLIARGS初始化函数中使用这些初始化属性
# 在初始化的这个类时候可以传参,以便覆盖默认选项的值,我们可以自己传参数,否则使用定义的默认值
def __init__(self,
connection='ssh', # 连接方式 local 本地方式,smart ssh方式
remote_user="None", # 远程用户
ack_pass=None, # 提示输入密码
sudo=None, sudo_user=None, ask_sudo_pass=None,
module_path=None, # 模块路径,可以指定一个自定义模块的路径
become=None, # 是否提权
become_method=None, # 提权方式 默认 sudo 可以是 su
become_user=None, # 提权后,要成为的用户,并非登录用户
check=False, diff=False,
listhosts=None, listtasks=None, listtags=None,
forks=5, #同时执行的主机数量
tags=[], #执行的tags列表
skip_tags=[], #skip跳过的tags列表
verbosity=3,
syntax=None,
start_at_task=None,
inventory=None,
passwords=None):
#上下文管理器,用来接收 ImmutableDict 的示例对象,ImmutableDict用于添加选项
#2.7 和 2.8 版本有一些差异: 2.7 使用了 Python 标准库里的 命名元组来初始化选项,而 2.8 是 Ansible 自己封装了一个 ImmutableDict,要和 context 结合使用的
context.CLIARGS = ImmutableDict(
connection=connection,
remote_user=remote_user,
ack_pass=ack_pass,
sudo=sudo,
sudo_user=sudo_user,
ask_sudo_pass=ask_sudo_pass,
module_path=module_path,
become=become,
become_method=become_method,
become_user=become_user,
verbosity=verbosity,
listhosts=listhosts,
listtasks=listtasks,
listtags=listtags,
forks=forks,
tags=tags,
skip_tags=skip_tags,
syntax=syntax,
start_at_task=start_at_task,
)
# 三元表达式,假如没有传递 inventory文件, 就使用 "localhost,"
# 这里需要注意如果不使用host文件,即inventory未传入,直接动态传入host列表的情况,这里会将localhost加入主机中,所以执行all主机组也会执行本机,所以要么改为未传入就为“”,要么就设置其他的主机组名称,不使用all执行全部
# inventory 就是平时用到的存放主机ip以及变量的资源库文件,-i 参数后面跟的文件
self.inventory = inventory if inventory else "localhost,"
# 实例化数据解析器,用于解析 存放主机列表的资源库文件 (比如: /etc/ansible/hosts) 中的数据和变量数据的
self.loader = DataLoader()
# 实例化 资产配置对象,InventoryManager管理资源库,可以指定一个loader数据解析器和一个inventory文件
self.inv_obj = InventoryManager(loader=self.loader, sources=self.inventory)
# 设置密码,可以为空字典,但必须有此参数
self.passwords = {"conn_pass":passwords}
# 实例化回调插件对象
self.results_callback = ResultCallback()
# 变量管理器,假如有变量,所有的变量应该交给他管理。 这里他会从 inventory 对象中获取到所有已定义好的变量。 这里也需要数据解析器。
self.variable_manager = VariableManager(self.loader, self.inv_obj)
#用来执行 Ad-hoc 的方法
def run(self, hosts='all', gether_facts="no", module="ping", args=''):
#定义一个字典,来设置主机组、是否获取机器信息、以及执行的模块及参数,
#参数可以在执行run()函数的时候传入
play_source = dict(
name="Ad-hoc",
hosts=hosts,
gather_facts=gether_facts,
tasks=[
# 这里每个 task 就是这个列表中的一个元素,格式是嵌套的字典
# 也可以作为参数传递过来,这里就简单化了。
{"action": {"module": module, "args": args}},
])
#Play()是用于执行 Ad-hoc 的类 ,这里传入一个上面的play_source字典参数 VariableManager变量管理器 DataLoader数据解析器
play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)
# 先定义一个值,防止代码出错后, `finally` 语句中的 `tqm` 未定义。
tqm = None
try:
#TaskQueueManager是底层用到的任务队列管理器
#要想执行 Ad-hoc ,需要把上面的 play 对象交给任务队列管理器的 run 方法去运行
tqm = TaskQueueManager(
inventory=self.inv_obj,
variable_manager=self.variable_manager,
loader=self.loader,
passwords=self.passwords,
stdout_callback=self.results_callback)
result = tqm.run(play)
finally:
if tqm is not None:
tqm.cleanup()
shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
#用来执行 playbook 的方法
def playbook(self, playbooks,extra_vars={}):
#自定义变量是用一个字典保存的
#使用变量管理器VariableManager().extra_vars.update(extra_vars_dict)的方法进行添加
self.variable_manager.extra_vars.update(extra_vars)
#PlaybookExecutor是执行playbook 的核心类,这里实例化一个PlaybookExecutor对象
playbook = PlaybookExecutor(playbooks=playbooks, #playbook yaml文件列表,注意这里是一个列表
inventory=self.inv_obj, # InventoryManager资产配置对象
variable_manager=self.variable_manager, #VariableManager变量管理器
loader=self.loader, #DataLoader数据解析器
passwords=self.passwords) #密码
# 使用回调函数
playbook._tqm._stdout_callback = self.results_callback
#执行PlaybookExecutor类的run方法
result = playbook.run()
#这个获取执行状态和信息的函数是不准确的,如果一个playbook中有多个action,
#用ip做key,key值是唯一的,但value可以改变,获取到的信息只能是最后一个,所以不准确
def get_result(self):
result_raw = {'success': {}, 'failed': {}, 'unreachable': {}}
# print(self.results_callback.host_ok)
for host, result in self.results_callback.host_ok.items():
result_raw['success'][host] = result._result
for host, result in self.results_callback.host_failed.items():
result_raw['failed'][host] = result._result
for host, result in self.results_callback.host_unreachable.items():
result_raw['unreachable'][host] = result._result
return result_raw
# 最终打印结果,并且使用 JSON 继续格式化
#print(json.dumps(result_raw, indent=4))
#动态添加主机,传入主机列表和组名
def add_dynamic_hosts(self, hostip_list, groupname=None, groupvars=None):
"""
add hosts to a group
"""
#如果有传入组名,则添加组,并创建Group实例
if groupname:
self.inv_obj.add_group(groupname)
my_group = Group(name=groupname)
#如果有传入组变量,则将组变量设置给上面创建的Group实例
if groupvars:
for key, value in groupvars.iteritems():
my_group.set_variable(key, value)
# add hosts to group
# 如果有传入组名,则遍历主机列表,添加主机并且设置组
if groupname:
for hostip in hostip_list:
self.inv_obj.add_host(host=hostip,group=groupname)
#如果没有传入组名,则遍历主机列表,只添加主机
else:
for hostip in hostip_list:
self.inv_obj.add_host(host=hostip)
Ad-hoc 模式到 API 的映射
二、验证
示例一、验证ansible-plook 的执行
import sys
from myansible import MyAnsiable
if __name__ == '__main__':
##实例化
ansible3 = MyAnsiable(inventory='/etc/ansible/hosts', connection='ssh',remote_user="root",forks=1,tags=["test1","test2","test3"],
passwords="")
#传入playbooks 的参数,需要是一个列表的数据类型,这里是使用的相对路径
#相对路径是相对于执行脚本的当前用户的家目录
print("playbook指定tags以及传递自定义变量")
ansible3.playbook(playbooks=['test_set_fact.yml'],extra_vars={"testvars":"testvars1"})
print(ansible3.results_callback.host_unreachable)
cat >test_set_fact.yml<<EOF
---
- hosts: tx_151
remote_user: root
vars:
testvar1: test1
tasks:
- set_fact:
testvar2: "test2"
- debug:
msg: "{{testvars}} {{testvar1}} {{testvar2}} "
EOF
ansible-playbook test_set_fact.yml -v
示例二、验证ansible的执行
if __name__ == '__main__':
#实例化
ansible1 = MyAnsiable(inventory='/etc/ansible/hosts', remote_user="root",connection='ssh',forks=1,passwords="")
#执行 ad-hoc
print("执行ad-hoc的结果")
ansible1.run(hosts= "owner", module="shell", args='pwd')
#打印结果
print(ansible1.get_result())
print(ansible1.results_callback.host_all_info)
实例三、添加group和host及定义主机变量
host_list=["10.104.113.129","10.104.113.130"]
ansible4 = MyAnsiable(connection='ssh',remote_user="root",forks=1,tags=["test4"],
passwords="123456")
#创建一个组 remote
ansible4.inv_obj.add_group('remote')
#向 remote 组中添加主机
for host in host_list:
ansible4.inv_obj.add_host(host,group="remote")
ansible4.inv_obj.get_host(host).vars['ansible_ssh_host'] = host
print("playbook添加group和host及定义主机变量")
ansible4.playbook(playbooks=['test.yml'])
print(ansible4.get_result())
print(ansible4.results_callback.host_all_info)
实例四、动态添加主机,传入主机列表和组名
ansible5=MyAnsiable(connection='ssh',remote_user="root",forks=1,tags=["test4"],
passwords="123456")
host_list = ["10.104.116.15"]
#使用上面类中设置的add_dynamic_hosts方法
ansible5.add_dynamic_hosts(hostip_list=host_list,groupname="remote")
print("动态添加主机,传入主机列表和组名")
ansible5.playbook(playbooks=["test.yml"])
print(ansible5.get_result())
print(ansible5.results_callback.host_all_info)
实例五 ignore_errors忽略错误测试
ansible6=MyAnsiable(connection='ssh',remote_user="root",forks=1,tags=["ignore"],
passwords="123456")
host_list = ["10.104.116.15"]
ansible6.add_dynamic_hosts(hostip_list=host_list,groupname="remote")
print("ignore_errors忽略错误测试")
ansible6.playbook(playbooks=["test.yml"])
print(ansible6.get_result())
print(ansible6.results_callback.host_all_info)
test.yml
---
- hosts: owner
remote_user: root
sudo: true
gather_facts: no
tasks:
- name: test1
sudo: yes
shell: id
tags:
- test1
- name: test2
sudo: yes
shell: pwd
tags:
- test2
- name: test3
sudo: yes
debug: msg={{ testvars }}
tags:
- test3
- name: test4
sudo: yes
debug: msg={{ ansible_ssh_host }}
tags:
- test4
- name: ignore_faild
sudo: yes
shell: "123"
ignore_errors: True
tags:
- ignore