Kubernetes 事件队列

  • Post author:
  • Post category:其他




Kubernetes 事件队列


Queue

接口是在

Store

的基础上添加了

Pop()

方法。


FIFO



DeltaFIFO

类型(非接口)实现了

Queue

接口。


DeltaFIFO

是 Kubernetes 中非常重要的数据结构,用于保存对象的变化事件。



Queue 定义了队列接口


Queue

是在对象缓存的基础上,添加了

Pop()

方法,这样既能缓存对象、按照 Key 查找对象、也能按序(Add 的顺序)弹出对象:

// 来源于 k8s.io/client-go/tools/cache/fifo.go
type Queue interface {
    // 对象缓存接口
    Store

    // 弹出队列中的一个对象,如果队列为空,则一直阻塞。返回处理后的对象,以及处理结果。
    Pop(PopProcessFunc) (interface{}, error)

    AddIfNotPresent(interface{}) error

    // 当队列中第一批对象都弹出后,返回 true。
    HasSynced() bool
    Close()
}



FIFO 是先入先出的队列

FIFO (struct 类型,非接口)实现了 Queue 接口,只缓存对象的

一个最新值

,例如,队列中对象 A 的值为 a1,在被弹出前,进行两次更新,值分别为 a2, a3,则只会弹出一次且值为 a3。

FIFO 适用的情况:

  1. 你希望最多一个 worker 处理某个对象(对象在队列中是唯一的);
  2. 当 worker 处理该对象时,对象值是最新的;
  3. 你不需要处理删除的对象(删除的对象不会被弹出);

FIFO 类型定义如下:

// 来源于 k8s.io/client-go/tools/cache/fifo.go
type FIFO struct {
    lock sync.RWMutex
    cond sync.Cond

    // 对象缓存,用于快速查询。map key 为对象的 Key,该 Key 由 keyFunc 函数生成
    items map[string]interface{}

    // 对象弹出(Pop)顺序队列,队列中各对象 Key 是**唯一**的
    queue []string

    // 首先调用 Delete/Add/Update 或 Replace() 添加的第一批对象都 Pop 后为 true
    populated bool

    // Replace() 添加的第一批对象的数目
    initialPopulationCount int

    // 根据对象生成它的标识 Key 的函数
    keyFunc KeyFunc

    closed     bool
    closedLock sync.Mutex
}

函数

NewFIFO()

返回 FIFO 类型对象,传入的

KeyFunc

(一般是

DeletionHandlingMetaNamespaceKeyFunc

) 用于生成对象 Key:

// 来源于 k8s.io/client-go/tools/cache/fifo.go
func NewFIFO(keyFunc KeyFunc) *FIFO {
    f := &FIFO{
        // 初始化对象缓存
        items:   map[string]interface{}{},
        // 初始化对象 Key 队列
        queue:   []string{},
        keyFunc: keyFunc,
    }
    f.cond.L = &f.lock
    return f
}



Add() 方法

将对象更新到缓存(f.items),如果缓存中没有该对象,则将它加到弹出队列(f.queue),这样可以保证只会弹出对象一次,且弹出的是最新值。

// 来源于 k8s.io/client-go/tools/cache/fifo.go
func (f *FIFO) Add(obj interface{}) error {
    id, err := f.keyFunc(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true

    // 缓存中没有该对象,则将它的 key 加到队列 f.queue 中
    if _, exists := f.items[id]; !exists {
        f.queue = append(f.queue, id)
    }
    // 更新对象缓存
    f.items[id] = obj
    f.cond.Broadcast()
    return nil
}

什么情况下缓存中没有该对象呢?

  1. 第一次向 FIFO Add/Update 该对象;
  2. 或者调用 FIFO 的 Delete 方法删除该对象;
  3. 或者,该对象被 Pop 处理了;
  4. 或者,调用 Replace 方法,用新的一组对象替换当前缓存 f.items 和队列 f.queue;



Update() 方法

通过

Add()

方法实现:

func (f *FIFO) Update(obj interface{}) error {
    return f.Add(obj)
}




Delete()

方法

从缓存中删除对象:

// 来源于 k8s.io/client-go/tools/cache/fifo.go
func (f *FIFO) Delete(obj interface{}) error {
    id, err := f.keyFunc(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    // 从缓存中删除对象,注意 f.queue 中还可能有对象的 Key
    delete(f.items, id)
    return err
}

注意:

没有

从弹出队列(f.queue) 中删除该对象 Key,后续弹出过程中会

忽略

这种已删除对象的 Key,继续弹出下一个对象。



Pop() 方法

从弹出队列(f.queue)弹出一个对象,并调用用户注册的回调函数进行处理,返回处理后的对象和出错信息。

如果队列为空,则一直阻塞。

处理函数执行失败时应该返回

ErrRequeue

类型的错误,这时该对象会被

重新加回

FIFO,后续可以再次被弹出处理。


Pop()

是在对 FIFO 加锁的情况下调用处理函数的,所以可以在多个 goroutine 中

并发调用

该方法。

// 来源于 k8s.io/client-go/tools/cache/fifo.go
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            if f.IsClosed() {
                return nil, FIFOClosedError
            }
            // 如果队列未关闭,但为空,则阻塞等待
            f.cond.Wait()
        }
        // 先从 queue 弹出对象 id
        id := f.queue[0]
        f.queue = f.queue[1:]
        if f.initialPopulationCount > 0 {
            f.initialPopulationCount--
        }
        // 从缓存中获取对象
        item, ok := f.items[id]
        if !ok {
            // 前面提到,当 Add/Update 对象,在 Pop 前又 Delete 了该对象,就会出现 queue 中有 id,而 items 中无对象的情况
            // 由于对象已经被删除,所以跳过,Pop 下一个对象
            continue
        }
        // 从缓存中删除对象
        delete(f.items, id)
        // 调用处理函数,该函数处于 f.lock 锁保护中,可以并发执行
        err := process(item)
        // 如果处理 item 失败,应该返回 ErrRequeue 类型错误,再将对象加回队列
        if e, ok := err.(ErrRequeue); ok {
            f.addIfNotPresent(id, item)
            err = e.Err
        }
        return item, err
    }
}



Replace() 方法

用传入的一组对象替换对象缓存 f.items 和弹出队列 f.queue:

// 来源于 k8s.io/client-go/tools/cache/fifo.go
func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
    items := make(map[string]interface{}, len(list))
    for _, item := range list {
        key, err := f.keyFunc(item)
        if err != nil {
            return KeyError{item, err}
        }
        items[key] = item
    }

    f.lock.Lock()
    defer f.lock.Unlock()

    if !f.populated {
        f.populated = true
        f.initialPopulationCount = len(items)
    }

    f.items = items
    f.queue = f.queue[:0]
    for id := range items {
        f.queue = append(f.queue, id)
    }
    if len(f.queue) > 0 {
        f.cond.Broadcast()
    }
    return nil
}



HasSyncd() 方法

参考后文对 DeltaFIFO 的

HasSyncd()

方法分析。



Resync() 方法

将对象缓存

f.items

中的对象都更新到弹出队列

f.queue

中:

// 来源于 k8s.io/client-go/tools/cache/fifo.go
func (f *FIFO) Resync() error {
    f.lock.Lock()
    defer f.lock.Unlock()

    inQueue := sets.NewString()
    for _, id := range f.queue {
        inQueue.Insert(id)
    }
    for id := range f.items {
        if !inQueue.Has(id) {
            f.queue = append(f.queue, id)
        }
    }
    if len(f.queue) > 0 {
        f.cond.Broadcast()
    }
    return nil
}


FIXME!!!

:对象加入和弹出时都会同时更新 f.items 和 f.queue,按说是完全一致的,所以

Resync()

方法是多余的?



DeltaFIFO 是记录对象历史事件的队列


DeltaFIFO



FIFO

类型的区别:

  1. DeltaFIFO 缓存对象的事件列表,而 FIFO 缓存对象的最新值;
  2. FIFO 内部维护了一个对象缓存(f.items),而 DeltaFIFO 需要和一个外部维护的、包含所有对象的缓存(knownObjects) 结合使用:

    • Delete():用 knownObjects 检查待删除的对象是否存在,如果不存在则直接返回,否则生成 Deleted 事件;
    • Replace():用传入的队列列表更新 DeltaFIFO,检查 knownObjects,为不在传入的对象列表中的对象生成 Deleted 事件;
    • Resync():将 knownObjects 中的对象同步到 DeltaFIFO 中,并生成 Sync 事件;
  3. Delete/Replace/Resync() 方法不会从 DeltaFIFO 删除/替换对象,而是生成对应的事件。DeltaFIFO 的消费者需要将他们从 knownObjects 删除(见后文);
  4. DeltaFIFO 的 Pop/Get() 方法,返回的不是对象最新值,而是对象事件列表。

DeltaFIFO 适用的情况:

  1. 你希望最多一个 worker 处理某个对象的事件(对象在队列中是唯一的,);
  2. 当 worker 处理该对象时,可以获得自上次弹出该对象以来的所有事件,如 Add/Updat/Delete(FIFO 只缓存和弹出对象的最新值);
  3. 你可以处理删除对象的事件(FIFO 不弹出被删除的对象);
  4. 你想周期处理所有的对象(Reflector 周期调用 Resync() 方法,将 knownObjects 中对象同步到 DeltaFIFO);

DeltaFIFO 是一个生产者-消费者队列,生产者是

Reflector

,消费者是

controller/sharedInformer/sharedIndexInformer

函数

NewDeltaFIFO()

返回一个

DeltaFIFO

类型对象:

// 来源于 k8s.io/client-go/tools/cache/delta_fifo.go
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
    f := &DeltaFIFO{
        // 对象事件缓存,Key 为对象 Key,Value 为该对象的事件列表类型 Deltas;
        items:        map[string]Deltas{},
        // 对象弹出队列,缓存的是对象 Key,后续 Pop 方法按序弹出;
        queue:        []string{},
        // 生成对象标识 Key 的函数,一般是预定义的 MetaNamespaceKeyFunc 函数;
        keyFunc:      keyFunc,
        // 关联的外部对象缓存
        knownObjects: knownObjects,
    }
    f.cond.L = &f.lock
    return f
}

  • knownObjects

    是外部的对象缓存,DelaFIFO

    不对它进行更新

    ,只用它来查找对象。

  • keyFunc

    一般是预定义的

    MetaNamespaceKeyFunc

    函数,即提取对象的 Namespace/Name 作为标识 Key。

例如 NewIndexerInformer() 创建 knownObjects 和 DeltaFIFO 的过程如下:

// 来源于:k8s.io/client-go/tools/cache/controller.go
...
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
...
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)

DeltaFIFO 的消费者根据从 DeltaFIFO 弹出的 Delta 对象对 knownObjects 缓存(上面的 clientState )进行更新,从而保证 FIFO 缓存和该缓存的一致性。



DeltaFIFO 的生产者和消费者


后续文章会介绍

,各种

Informer

(如

Informer、IndexInformer、SharedInformer、SharedIndexInformer

)的初始化函数依次创建


knownObjects

缓存、

DeltaFIO




controller




controller

再将

DeltaFIFO

传给

Reflector


Reflector 的

ListAndWatch()

方法是 DeltaFIFO 的生产者

  1. List etcd 中(通过 kube-apiserver,下同)特定类型的所有对象,然后调用 DeltaFIFO 的

    Replace()

    方法,将它们完整同步到 DeltaFIFO;
  2. 根据配置的 Resync 时间,

    周期调用

    DeltaFIFO 的

    Resync()

    方法(见后文),将 knownObjects 中的对象更新到 DeltaFIFO 中;
  3. 阻塞式 Watch etcd 中特对类型的对象变化,根据事件的类型分别调用 DeltaFIFO 的 Add/Update/Delete()方法,将对象更新到 DeltaFIFO;

Watch etcd 会

周期性的

超时(5min ~ 10min),这时

ListAndWatch()

出错返回,Reflector 会等待一段时间再执行它,从而实现

周期的将

etcd

中特定类型的全部对象

同步到

DeltaFIFO



controller



DeltaFIFO

的消费者,它用 DeltaFIFO 弹出的对象更新

knownObjects

缓存

,然后调用注册的 OnUpdate/OnAdd/OnDelete 回调函数。

详情参考

Reflector



controller 和 Informer

文档。



记录对象事件的 Delta、Deltas 和 DeletedFinalStateUnknown 类型

DeltaFIFO 使用 Delta 类型记录对象的事件类型和发生

事件后

的对象值:

type Delta struct {
    // DeltaType 可能是:Added、Deleted、Updated、Sync
	Type   DeltaType
	Object interface{}
}

DeltaFIFO Watch apiserver 过程中可能因网络等问题出现丢事件的情况,如果丢失了 Delete 事件,则后续 Reflector 重复执行

ListAndWatch()

方法从 apiserver 获取的对象集合 set1 会出现与 knownObjects 对象集合 set2 不一致的情况。

为了保证两者一致,DeltaFIFO 的

Replace()

方法将位于 set1 但不在 set2 中的对象用

DeletedFinalStateUnknown

类型对象封装,再保存到 Delta Object 中。

type DeletedFinalStateUnknown struct {
    // 对象的 Key
    Key string
    // knownObjects 缓存中的对象值
    Obj interface{}
}


Replace()

方法是

唯一

产生

DeletedFinalStateUnknown

类型对象的方法。



Add() 和 Update() 方法

// 来源于 k8s.io/client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Add(obj interface{}) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.populated = true
    // Added 类型事件;
    return f.queueActionLocked(Added, obj)
}


Update()

方法和

Add()

方法类似,差别在于产生的是

Updated

类型 Delta 事件;



queueActionLocked() 方法

将对象的事件存入事件队列 f.items,如果事件队列中没有该对象则还将对象(Key)加入弹出队列(f.queue),另外它还做如下操作:

  1. 如果事件类型为 Sync,且对象的事件列表中最后一个事件类型为 Deleted,则直接返回(没有必要再同步一个已删除的对象,一般是 Reflector 周期调用 Rsync() 方法产生的 Sync 事件);
  2. 合并连续重复的 Deleted 事件为一个;
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }

    // FIXME!!! 感觉这个逻辑不太对。产生 Sync 事件有两种情形:
    // 1. 周期的 Rsync(), 这时处理逻辑 OK;
    // 2. Reflector 执行 ListAndWatch() LIST etcd 获取特定类型的全部对象;
    if actionType == Sync && f.willObjectBeDeletedLocked(id) {
        return nil
    }
    // 将对象保存到 Delta 中
    newDeltas := append(f.items[id], Delta{actionType, obj})
    newDeltas = dedupDeltas(newDeltas)

    if len(newDeltas) > 0 {
        if _, exists := f.items[id]; !exists {
            f.queue = append(f.queue, id)
        }
        f.items[id] = newDeltas
        f.cond.Broadcast()
    } else {
        delete(f.items, id)
    }
}



Delete() 方法

如果

f.knownObjects

对象缓存和事件队列

f.items

中均没有待删除的对象,则

直接返回

,否则为对象生成

Deleted

事件(非

DeletedFinalStateUnknown

类型)。

FIFO 的 Delete() 方法将对象从缓存中删除,而 DeltaFIFO 的 Delete() 方法

不将对象从事件缓存 f.items 和弹出队列 f.queue 删除

,而是后续弹出时,将它从 f.items、f.queue

和 f.knownObjects

中删除。



Get/GetByKey/List/ListKeys() 方法

查询事件缓存 f.items,返回对象的事件列表 Deltas 或 Key 列表;



Replace() 方法


Replace(list []interface{}, resourceVersion string)

  1. 为 list 中的每个对象生成

    Sync

    事件;
  2. 遍历 f.knownObjects 中对象,对不在传入的 list 中的对象,用

    DeletedFinalStateUnknown

    类型对象封装,再保存到 Deleted 类型的 Delta.Object 中;

Reflector 的

ListAndWatch()

方法因 Watch 超时而周期调用

Replace()

方法,从而周期地将 etcd 中特定类型的所有对象同步到 DeltaFIFO 中。


controller

用 DeltaFIFO 弹出的对象更新

knownObjects

缓存,详情参考

Reflector



controller 和 Informer

文档。


Replace()

方法是

唯一

产生

DeletedFinalStateUnknown

类型对象的方法。



Resync() 方法

遍历 knownObjects 中的对象:

  1. 如果该对象位于事件缓存 f.items 中,则跳过;
  2. 否则,生成 Sync 事件;

前文我们提到 DeltaFIFO 的使用场景之一是:

“你想周期处理所有的对象”

,而这是通过周期将 knownObjects 中的对象同步到 DeltaFIFO 来实现的。

Reflector 的

ListAndWatch()

方法周期执行 DeltaFIFO 的 Resync() 方法,将 knownObjects 中的对象同步到 DeltaFIFO(产生 Sync 事件),从而有机会再次调用注册的

OnUpdate()

处理函数。


只有 Replace() 和 Rsync() 方法才产生 Sync 事件



Pop() 方法

Pop(process PopProcessFunc)

  1. 如果弹出队列 f.queue 为空,则

    阻塞等待

  2. 每次弹出队列头部对象的事件列表(Deltas 类型),然后将该对象的事件列表从缓存(f.items)中删除;
  3. 调用配置的回调函数 PopProcessFunc(传入事件列表 Deltas);

如果函数 PopProcessFunc() 执行失败,应该调用

AddIfNotPresent()

方法将 Deltas 重新加回 DeltaFIFO,这样后续可以再次被弹出处理,防止丢事件。(controler 已实现该逻辑)

注意,Pop() 方法是在加锁的情况下调用 PopProcessFunc 的,所以在多个 goroutine 并发调用 Pop() 的情况下,它们实际是

串行

执行的。



HasSyncd() 方法

创建 DealtaFIFO 后,如果首先调用的是

Replace()

方法,则

f.populated

被设置为

true



f.initialPopulationCount

被设置为传入的对象数量。当这

第一批

对象都被弹出完毕时(包含弹出前被删除的对象),

HasSynced()

方法返回

true

// 来源于 k8s.io/client-go/tools/cache/fifo.go
func (f *DeltaFIFO) HasSynced() bool {
    f.lock.Lock()
    defer f.lock.Unlock()
    return f.populated && f.initialPopulationCount == 0
}

另外,如果在调用

Replace()

方法前,

首先

调用了

Add/Update/Delete/AddIfNotPresent()

方法,则

HasSynced()

方法也会返回

true

第一批对象弹出完毕后,后续不管是否再次调用 Replace()或其它方法,HasSynced() 方法

总是返回 true



DeltaFIFO 和 knownObjects 对象缓存的同步

  1. Reflector 从 etcd List 出特定类型的所有对象,调用 DeltaFIFO 的 Replace() 方法为各对象生成 Sync 事件,此时 knownObjects 对象缓存为空;

  2. controller 从 DeltaFIFO 弹出对象事件列表 Deltas,遍历 Deltas,根据 Delta 的事件类型更新 knownObjects,从而实现 DeltaFIFO 和 knownObjects 缓存中的对象一致:

    controller 每次

    启动

    时,因 knownObjects 为空且事件类型为 Sync,所以会为同步来的所有对象:

    1. 调用 knownObjects 的

      Add() 方法

      ,将它们加入到对象缓存;
    2. 调用注册的 OnAdd() 回调函数。所以

      第一次对象同步时, controller 也会调用用户注册的 OnAdd() 回调函数

    // 来源于:k8s.io/client-go/tools/cache/controller.go
    for _, d := range obj.(Deltas) {
        switch d.Type {
        // Replace() 方法生成的 Sync 事件涉及到的对象,
        case Sync, Added, Updated:
            // clientState 即为 knownObjects 对象缓存
            if old, exists, err := clientState.Get(d.Object); err == nil && exists {
                if err := clientState.Update(d.Object); err != nil {
                    return err
                }
                h.OnUpdate(old, d.Object)
            } else {
                if err := clientState.Add(d.Object); err != nil {
                    return err
                }
                h.OnAdd(d.Object)
            }
        case Deleted:
            // 先从缓存中删除,再调用处理函数
            if err := clientState.Delete(d.Object); err != nil {
                return err
            }
            h.OnDelete(d.Object)
        }
    }
    
  3. 但是,Reflector 的 Watch 可能会出现

    丢失事件

    的情况(如 ListAndWatch 出错返回后,Reflector 会 Sleep 一段时间再执行它,期间 etcd 的对象变化事件丢失),这样再次 List 到的对象集合 set1 与 knownObjects 缓存中的对象集合 set2 不一致。如何解决这个问题呢?

    答案在于:List 到对象集合后,DeltaFIFO 调用的

    Replace()

    方法将位于 set1 但不在 set2 中的对象用

    DeletedFinalStateUnknown

    类型对象封装,再保存到 Delta.Object 中。而上面 handlerObject() 的参数即为 Delta.Object。

  4. ListAndWatch() 方法起一个 goroutine,周期调用 Resync() 方法,将 knownObjects 中的对象更新到 DeltaFIFO。为何要这么做呢?

    前文我们提到 DeltaFIFO 的使用场景之一是:

    “你想周期处理所有的对象”

    ,但对象一旦从 DeltaFIFO 中弹出,如果没有产生新的 Watch 事件,就不会对它再调用注册的回调函数。Reflector 的

    ListAndWatch()

    方法会周期执行 DeltaFIFO 的 Resync() 方法,目的就是

    为对象产生新的 Sync 事件

    ,从而有机会再次调用注册的

    OnUpdate()

    处理函数。因此 Resync 时,如果对象已经在 f.items,则后续由机会被弹出,所以不需要为它生成 Sync 事件。

后续文章会介绍,

controller

一般是在

Informer

中使用的,

controller

调用的

OnUpdate()

函数会调用用户创建

Informer

时传入的

ResourceEventHandler

中的

OnUpdate()

函数。所以用户的

OnUpdate()

函数可能会因 DeltaFIFO 的周期 Resync() 而被调用,它应该检查传入的 old、new 对象是否发生变化,未变化时直接返回:

// 来源于 https://github.com/kubernetes/sample-controller/blob/master/controller.go#L131
deployInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    // 第一次从 etcd 同步的对象会生成 Added 事件,调用该函数
    // 后续 Added 事件就代表的确有**新创建**的资源对象;
    AddFunc: controller.handleDeploy,
    UpdateFunc: func(old, new interface{}) {
        // 由于 Reflector 周期调用 DeltaFIFO 的 Rsync() 方法,`controller` 会调用注册的 OnUpdate 回调函数,所以需要判断新旧对象是否相同,如果相同则不需处理
        // 也可以用
        newDepl := new.(*extensionsv1beta1.Deployment)
        oldDepl := old.(*extensionsv1beta1.Deployment)
        if newDepl.ResourceVersion == oldDepl.ResourceVersion {
            // Periodic resync will send update events for all known Deployments.
            // Two different versions of the same Deployment will always have different RVs.
            return
        }
        controller.handleDeploy(new)
    },
    DeleteFunc: controller.handleDeploy,
})

另外,前面的第 2 条提到过,

controller

刚启动时,knownObjects 为空,会为从 etcd 同步来的特定类型的所有对象生成 Added 事件,进而调用上面注册的 AddFunc。