kube-apiserver源码分析-Informer机制

  • Post author:
  • Post category:其他


1.sharedInformer概述

Informer实现在k8s.io\client-go\tools\cache\shared_informer.go中,是client-go的重要组成部分,也是 k8s controller模式的重要一环。

controller是通过Informer的listWatch某些对象,通过Callback回调函数获取对象的增删改事件,存入工作队列(workqueue)中,再通过worker函数循环处理workqueue队列中的事件。

各个模块功能如下:

List/Watch:List是列举apiserver中对象的接口,Watch是监控apiserver资源变化的接口;

Reflector:通过一个sync循环把list watch到的对象的增删改变化同步到后端store中,Informer中是DeltaFIFO(实现了store interface)

DeltaIFIFO:将Reflector监控的对象变化的事件流保存到FIFO队列。

LocalStore:指的就是Indexer的实现cache,这里面缓存的就是apiserver中的对象(其中有一部分可能还在DeltaFIFO中),此时使用者再查询对象的时候就直接从cache中查找,减少了apiserver的压力;

Callbacks:通知回调函数,Infomer感知的所有对象变化都是通过回调函数通知使用者(Listener);

2.Informer设计实现

SharedInformer interface也在k8s.io\client-go\tools\cache\shared_informer.go中,sharedIndexInformer是该接口的一个实现。Shared指的是多个listeners共享同一个cache,而且资源的变化会同时通知到cache和listeners。

Informer本身的设计也类似于controller模式,

  1. SharedInformer 内部也有一个controller对象,controller内部持有一个reflector对象,reflector有一个循环通过ListerWatcher接口去watch关注的对象,对象的增删改变化分别调用DeltaFIFO的Add,Update,Delete接口
  2. DeltaFIFO会把Add,Update,Delete接口的调用转化成不同类型的事件,追加到delta队列中
  3. controller有个循环函数processLoop()不断从DeltaFIFO队列中取出事件(事件表示对象的增删改),调用外部传给controller的process函数,这个函数是在SharedInformer中实现的HandleDeltas()函数。
  4. HandleDeltas()中会 ,1)根据delta类型分别调用indexer的Add,Update,Delete接口,这个indexer实际上是实现了Indexer interface 的cache(vendor\k8s.io\client-go\tools\cache\store.go),cache中使用ThreadSafeStore作为缓存。2)调用processor(sharedProcessor)的distribute函数通知所有的listener。
  5. Informer的AddEventHandler会创建一个processorListener对象添加到sharedProcessor的listeners队列中,并启动processorListener的pop和run协程。有通知来的时候sharedProcessor.distribute会把通知添加到addCh中,pop循环再放入nextCh,run循环取出通知,根据通知类型调用handler的回调函数。

SharedInformer 框架如下图:

2.1 SharedInformer 启动代码分析

sharedInformer的使用示例:

// pkg/kubelet/kubeletconfig/watch.go 中一段
// 把kube client封装成listWatcher接口,list、watch方法分别调的是kubeclient的list、watch函数
lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
// 回调函数
handler := cache.ResourceEventHandlerFuncs{
   AddFunc:    addFunc,
   UpdateFunc: updateFunc,
   DeleteFunc: deleteFunc,
}
// 传入一个listWatcher和需要watch的对象类型(Node)
informer := cache.NewSharedInformer(lw, &apiv1.Node{}, resyncPeriod)
// 往informer添加事件回调函数
informer.AddEventHandler(handler)

2.1.1 sharedInformer构造函数

sharedInformer 的New函数:

// --- vendor\k8s.io\client-go\tools\cache\shared_informer.go
func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
   return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
}

// NewSharedIndexInformer 要传入一个ListerWatcher类型的lw对象,一般是对kubeclient的封装。传入一个需要listWatch的对象类型objType 
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
   realClock := &clock.RealClock{}
   sharedIndexInformer := &sharedIndexInformer{
       // processor用来保存listener(EventHandler)队列,并进行事件分发。



版权声明:本文为u014152978原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。