理解kubernetes’ tools/cache包: part 2

  • Post author:
  • Post category:其他




上一篇

文章中,我们研究了k8s controller的一些基础,并开始研究tools/cache包背后的概念,特别是ListerWatcher,Store和Reflector。在这篇文章中,我们健康到Controller的实际概念。

我想说,我写过”Kubernetes controller”,不是”Kubernetes Controller”或”Kubernetes controller”“Kubernetes Controller”。这是故意的,这部分是因为tools/cache包有Controller类型,逻辑位于Reflector之前。你可以自己

看看

// Controller is a generic controller framework.
type controller struct {
    config         Config
    reflector      *Reflector
    reflectorMutex sync.RWMutex
    clock          clock.Clock
}

type Controller interface {
    Run(stopCh <-chan struct{})
    HasSynced() bool
    LastSyncResourceVersion() string
}

请注意,有趣的是,严格来说,实现Controller类型所需要做的就是提供一个Run()函数(对于其他Java程序员,stopCh“stop channel”是Go的(基本上)允许中断的方式)。如果同步已经完成,HasSynced ()函数返回true。LastSyncResourceVersion()函数返回被watched和reflected的k8s资源列表的resourceVersion。

这里有一个有趣的观点,这意味着虽然Reflector是一个完全与Kubernetes分离的通用接口,但这个接口在概念上与Kubernetes耦合(注意术语Resource和resource version,这两个概念都来自于k8s体系)。这种观察模式可以帮助我们改进Java模型。

接下来,看看controller结构体,他是controller实现的承载。它接受Config来实现配置,并有一个变量接受Reflector,同时还有线程安全相关参数。

那么,Controller究竟是做什么的,Reflector还没开始工作呢?

答案的线索在于

Config

参数,它作为Controller类型的一个特定实现细节:

// Config contains all the settings for a Controller.
type Config struct {
    // The queue for your objects; either a FIFO or
    // a DeltaFIFO. Your Process() function should accept
    // the output of this Queue's Pop() method.
    Queue

    // Something that can list and watch your objects.
    ListerWatcher

    // Something that can process your objects.
    Process ProcessFunc

    // The type of your objects.
    ObjectType runtime.Object

    // Reprocess everything at least this often.
    // Note that if it takes longer for you to clear the queue than this
    // period, you will end up processing items in the order determined
    // by FIFO.Replace(). Currently, this is random. If this is a
    // problem, we can change that replacement policy to append new
    // things to the end of the queue instead of replacing the entire
    // queue.
    FullResyncPeriod time.Duration

    // ShouldResync, if specified, is invoked when the controller's reflector determines the next
    // periodic sync should occur. If this returns true, it means the reflector should proceed with
    // the resync.
    ShouldResync ShouldResyncFunc

    // If true, when Process() returns an error, re-enqueue the object.
    // TODO: add interface to let you inject a delay/backoff or drop
    //       the object completely if desired. Pass the object in
    //       question to this interface as a parameter.
    RetryOnError bool
}

// ShouldResyncFunc is a type of function that indicates if a reflector should perform a
// resync or not. It can be used by a shared informer to support multiple event handlers with custom
// resync periods.
type ShouldResyncFunc func() bool

// ProcessFunc processes a single object.
type ProcessFunc func(obj interface{}) error

当你阅读上面的摘录时,请记住它位于controller.go文件中,但是还有很多其它文档的类型和概念我们尚未遇到。

因此,并非所有Controller实现都必须使用它。实际上,Controller并没有完全说明。实际上唯一重要的Controller实现,由

New()

函数返回实现,由controller结构体支持,

为了使用它,我们应该要更好的理解它。

首先要注意的是Config结构中包含一个Queue。我们可以在

fifo.go

文件中找到它的定义:

// Queue is exactly like a Store, but has a Pop() method too.
type Queue interface {
    Store

    // Pop blocks until it has something to process.
    // It returns the object that was process and the result of processing.
    // The PopProcessFunc may return an ErrRequeue{...} to indicate the item
    // should be requeued before releasing the lock on the queue.
    Pop(PopProcessFunc) (interface{}, error)

    // AddIfNotPresent adds a value previously
    // returned by Pop back into the queue as long
    // as nothing else (presumably more recent)
    // has since been added.
    AddIfNotPresent(interface{}) error

    // Return true if the first batch of items has been popped
    HasSynced() bool

    // Close queue
    Close()
}

所以一般来说Queue的所有实现也必须满足Store约定。用Java术语来说,这意味着假设Queue接口会扩展Store,我们先把它归档以供日后使用。

我们在这个接口里还遗漏了一些概念,回想一下,Controller实现必须具有HasSynced函数,但它的目的和原因并没有说明。当我们回顾Controller-struct支持的实现(Controller类型的一种可能实现)时,我们可以看到它的HasSynced函数的实现仅仅委托给它的Config包含的Queue。所以我们假设Controller的实现最有可能是得到Queue的支持。尽管严格来说这不合适必须的,但这是实现HasSynced函数最简单的方法。这也是我们将要了解该函数的作用:如果第一批items已经popped,返回true。

回到Config,它也包含了ListerWatcher。咦,这个我们在哪里见过之前?我们意识到它是Reflector的核心组件,那么为什么Config也有一个Reflector呢?为什么要在这里打破封装,ListerWatcher难道不是Reflector的实现细节?是的,似乎还没有充分的理由。稍后我们可以从源码看controller的结构体实现方法Run()函数,它使用Config的ListerWatcher实时的创建了Reflector。有个问题是Reflector为什么不作为Config的参数传入,不得而知。无论如何,从逻辑上将,Reflector是Controller接口的controller-struct实现的一部分。

接下来我们的第一个真正有趣的部分是之前没有见过的,它让我们更好的了解Controller应该做什么:ProcessFunc,从文档中查看时对k8s的某种资源执行某些操作:

ProcessFunc processes a single object.

即使从这一点文档黄总我们也可以看到,Controller的实现最终使用了Config(记住,它不是必需的),它不仅获取k8s资源cache到Store中,也会对Store中的Object发现起作用。这个是不确定的假设,并没有任何规定要这么去实现,但结果非常重要,不仅仅是对Controller类型的标准实现(使用Config的controller-struct支持),而是隐含的,但技术上没有指定。

总而言之,大多数Controller的实现都可能是使用Reflector来填充特定资源类型的Queue(特定类型的Store),然后还处理Queue的内容,通过popping objects来实现。

我们看到的模型如下:

Controller

这里,Controller使用Reflector来填充Queue,并且有一个受保护的进程方法知道如何对给定Object执行某些操作。它还有一个受保护的方法shouldResync(),一个public hasSynced()方法,能够在同步后报告最后一个kubernetes resource version,我们将极大的改进和重构这个模型。