1.sharedInformer概述
Informer实现在k8s.io\client-go\tools\cache\shared_informer.go中,是client-go的重要组成部分,也是 k8s controller模式的重要一环。
controller是通过Informer的listWatch某些对象,通过Callback回调函数获取对象的增删改事件,存入工作队列(workqueue)中,再通过worker函数循环处理workqueue队列中的事件。
各个模块功能如下:
List/Watch:List是列举apiserver中对象的接口,Watch是监控apiserver资源变化的接口;
Reflector:通过一个sync循环把list watch到的对象的增删改变化同步到后端store中,Informer中是DeltaFIFO(实现了store interface)
DeltaIFIFO:将Reflector监控的对象变化的事件流保存到FIFO队列。
LocalStore:指的就是Indexer的实现cache,这里面缓存的就是apiserver中的对象(其中有一部分可能还在DeltaFIFO中),此时使用者再查询对象的时候就直接从cache中查找,减少了apiserver的压力;
Callbacks:通知回调函数,Infomer感知的所有对象变化都是通过回调函数通知使用者(Listener);
2.Informer设计实现
SharedInformer interface也在k8s.io\client-go\tools\cache\shared_informer.go中,sharedIndexInformer是该接口的一个实现。Shared指的是多个listeners共享同一个cache,而且资源的变化会同时通知到cache和listeners。
Informer本身的设计也类似于controller模式,
- SharedInformer 内部也有一个controller对象,controller内部持有一个reflector对象,reflector有一个循环通过ListerWatcher接口去watch关注的对象,对象的增删改变化分别调用DeltaFIFO的Add,Update,Delete接口
- DeltaFIFO会把Add,Update,Delete接口的调用转化成不同类型的事件,追加到delta队列中
- controller有个循环函数processLoop()不断从DeltaFIFO队列中取出事件(事件表示对象的增删改),调用外部传给controller的process函数,这个函数是在SharedInformer中实现的HandleDeltas()函数。
- HandleDeltas()中会 ,1)根据delta类型分别调用indexer的Add,Update,Delete接口,这个indexer实际上是实现了Indexer interface 的cache(vendor\k8s.io\client-go\tools\cache\store.go),cache中使用ThreadSafeStore作为缓存。2)调用processor(sharedProcessor)的distribute函数通知所有的listener。
- Informer的AddEventHandler会创建一个processorListener对象添加到sharedProcessor的listeners队列中,并启动processorListener的pop和run协程。有通知来的时候sharedProcessor.distribute会把通知添加到addCh中,pop循环再放入nextCh,run循环取出通知,根据通知类型调用handler的回调函数。
SharedInformer 框架如下图:
2.1 SharedInformer 启动代码分析
sharedInformer的使用示例:
// pkg/kubelet/kubeletconfig/watch.go 中一段
// 把kube client封装成listWatcher接口,list、watch方法分别调的是kubeclient的list、watch函数
lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
// 回调函数
handler := cache.ResourceEventHandlerFuncs{
AddFunc: addFunc,
UpdateFunc: updateFunc,
DeleteFunc: deleteFunc,
}
// 传入一个listWatcher和需要watch的对象类型(Node)
informer := cache.NewSharedInformer(lw, &apiv1.Node{}, resyncPeriod)
// 往informer添加事件回调函数
informer.AddEventHandler(handler)
2.1.1 sharedInformer构造函数
sharedInformer 的New函数:
// --- vendor\k8s.io\client-go\tools\cache\shared_informer.go
func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
}
// NewSharedIndexInformer 要传入一个ListerWatcher类型的lw对象,一般是对kubeclient的封装。传入一个需要listWatch的对象类型objType
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
// processor用来保存listener(EventHandler)队列,并进行事件分发。