本文将对Kubernetes中controller-manager的源码进行分析。分析的时间为2017年08月19日。采用主干的代码,commitId为
2ab7ad14b4fad378a4a69a64c587497d77e60f44
。
Replicaset Controller 初始化过程
Replicaset Controller 创建其实可以理解从
controllers["replicaset"] = startReplicaSetController
开始的。
在
startReplicaSetController
函数中,首先判断
replicasets
是否开启。然后创建并初始化一个
ReplicaSetController
对象。最后启动
ReplicaSetController
对象的Run时,进入循环处理流程。
func startReplicaSetController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicasets"}] {
return false, nil
}
go replicaset.NewReplicaSetController(
ctx.InformerFactory.Extensions().V1beta1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(int(ctx.Options.ConcurrentRSSyncs), ctx.Stop)
return true, nil
}
Replicaset Controller 创建对象
创建Replicaset Controller 对象的过程,在
NewReplicaSetController
完成。具体的步骤包括以下几个。(1)创建
eventBroadcaster
并设置对应的属性。(2)创建
ReplicaSetController
对象(3)设置
rsInformer
和
podInformer
对应的事件回调函数(4)设置
rsc.syncHandler
对象为
rsc.syncReplicaSet
函数
// NewReplicaSetController configures a replica set controller with the specified event recorder
func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().RESTClient().GetRateLimiter())
}
//创建并设置eventBroadcaster属性a
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
//创建ReplicaSetController对象
rsc := &ReplicaSetController{
kubeClient: kubeClient,
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
},
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicaset"),
}
//设置rsInformer和podInformer对应的回调函数
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.enqueueReplicaSet,
UpdateFunc: rsc.updateRS,
// This will enter the sync loop and no-op, because the replica set has been deleted from the store.
// Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the replica set.
DeleteFunc: rsc.enqueueReplicaSet,
})
rsc.rsLister = rsInformer.Lister()
rsc.rsListerSynced = rsInformer.Informer().HasSynced
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.addPod,
// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
// local storage, so it should be ok.
UpdateFunc: rsc.updatePod,
DeleteFunc: rsc.deletePod,
})
rsc.podLister = podInformer.Lister()
rsc.podListerSynced = podInformer.Informer().HasSynced
//设置rsc.syncHandler处理的函数
rsc.syncHandler = rsc.syncReplicaSet
return rsc
}
Replicaset Controller 中Run函数的处理
// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rsc.queue.ShutDown()
glog.Infof("Starting replica set controller")
defer glog.Infof("Shutting down replica set Controller")
//等待Informer初始化完成
if !controller.WaitForCacheSync("replica set", stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
return
}
//启动woker协程
for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh)
}
<-stopCh
}
在协程中,转换到对应的处理函数。
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rsc *ReplicaSetController) worker() {
for rsc.processNextWorkItem() {
}
}
func (rsc *ReplicaSetController) processNextWorkItem() bool {
key, quit := rsc.queue.Get()
if quit {
return false
}
defer rsc.queue.Done(key)
err := rsc.syncHandler(key.(string))
if err == nil {
rsc.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
rsc.queue.AddRateLimited(key)
return true
}
在处理函数中,实际的处理逻辑为。从queue中取出相应的key,然后调用syncHandler设置的函数进行处理。
Replicaset Controller 中syncHandler函数的处理
在 Replicaset Controller中syncHandler被设置为rsc.syncReplicaSet。
rsc.syncReplicaSet
的中的处理为
// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing replica set %q (%v)", key, time.Now().Sub(startTime))
}()
//名称解析
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
//获取RS的对象
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if errors.IsNotFound(err) {
glog.V(4).Infof("ReplicaSet has been deleted %v", key)
rsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return err
}
//获取是否有异常信息
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
//获取Label标签
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err))
return nil
}
//获取命名空间下所有的Pod
// list all pods to include the pods that don't match the rs`s selector
// anymore but has the stale controller ref.
// TODO: Do the List and Filter in a single pass, or use an index.
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
return err
}
// Ignore inactive pods.
//过滤掉状态为(v1.PodSucceeded,v1.PodFailed的Pod)
var filteredPods []*v1.Pod
for _, pod := range allPods {
if controller.IsPodActive(pod) {
filteredPods = append(filteredPods, pod)
}
}
//直接通过kubeClient获取一次RS对象,然后再次检测RS对象是否被删除
// If any adoptions are attempted, we should first recheck for deletion with
// an uncached quorum read sometime after listing Pods (see #42639).
canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := rsc.kubeClient.ExtensionsV1beta1().ReplicaSets(rs.Namespace).Get(rs.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if fresh.UID != rs.UID {
return nil, fmt.Errorf("original ReplicaSet %v/%v is gone: got uid %v, wanted %v", rs.Namespace, rs.Name, fresh.UID, rs.UID)
}
return fresh, nil
})
//创建NewPodControllerRefManager对象,后续使用该对象处理Pod的相关操作
cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, controllerKind, canAdoptFunc)
// NOTE: filteredPods are pointing to objects from cache - if you need to
//获取属于这个RS的Pod,也包括捕获的孤儿Pod
//这个是一个关键流程,涉及到哪些Pod属于这个RS,后面会展开分析
// modify them, you need to copy it first.
filteredPods, err = cm.ClaimPods(filteredPods)
if err != nil {
return err
}
//根据RS管理Pod,这个是处理的核心流程,后面会展开分析
var manageReplicasErr error
if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
//创建一个RS的副本
copy, err := scheme.Scheme.DeepCopy(rs)
if err != nil {
return err
}
rs = copy.(*extensions.ReplicaSet)
//计算RS的状态
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
//更新RS状态,失败后重试次数为1
// Always updates status as pods come up or die.
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
// Multiple things could lead to this update failing. Requeuing the replica set ensures
// Returning an error causes a requeue without forcing a hotloop
return err
}
//如果updatedRS.Spec.MinReadySeconds不为0,则RS延时MinReadySeconds后,再放入到队列中
//AddAfter函数实现在\k8s.io\kubernetes\staging\src\k8s.io\client-go\util\workqueue\delaying_queue.go中
// Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
//处理完成后返回
return manageReplicasErr
}
上面的过程是RS的处理过程,其中主要涉及到Pod的选择和RS对Pod的管理。接下来我们将分别对Pod的选择与Pod的管理进行分析。
Replicaset Controller 中CliamPod的处理
在Replicaset Controller中,获取到namespace下所有的Pod后,会进入PodControllerRefManager对象的ClaimPods函数,判断哪些Pod是属于自己的。
ClaimPods函数的具体内容如下:
func (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {
var claimed []*v1.Pod
var errlist []error
//定义标签匹配函数
match := func(obj metav1.Object) bool {
pod := obj.(*v1.Pod)
// Check selector first so filters only run on potentially matching Pods.
if !m.Selector.Matches(labels.Set(pod.Labels)) {
return false
}
for _, filter := range filters {
if !filter(pod) {
return false
}
}
return true
}
//定义捕获函数
adopt := func(obj metav1.Object) error {
return m.AdoptPod(obj.(*v1.Pod))
}
//定义释放函数
release := func(obj metav1.Object) error {
return m.ReleasePod(obj.(*v1.Pod))
}
//对每个Pod轮训,判断是否属于这个RS
for _, pod := range pods {
ok, err := m.ClaimObject(pod, match, adopt, release)
if err != nil {
errlist = append(errlist, err)
continue
}
if ok {
claimed = append(claimed, pod)
}
}
return claimed, utilerrors.NewAggregate(errlist)
}
ClaimObject中的处理:
func (m *BaseControllerRefManager) ClaimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) {
//获取Pod中的归属信息
controllerRef := metav1.GetControllerOf(obj)
if controllerRef != nil {
//如果存在归属信息,则判断是否属于这个RS
//通过UID判断,如果UID不相等则说明属于另外的资源
if controllerRef.UID != m.Controller.GetUID() {
// Owned by someone else. Ignore.
return false, nil
}
//UID相同则进行一步判断Label是否相等,相等则认为是这个RS的Pod
if match(obj) {
// We already own it and the selector matches.
// Return true (successfully claimed) before checking deletion timestamp.
// We're still allowed to claim things we already own while being deleted
// because doing so requires taking no actions.
return true, nil
}
//如果不相等,则说明是归属异常的Pod
//先判断是否处于删除状态,如果不处于删除状态则释放Pod
// Owned by us but selector doesn't match.
// Try to release, unless we're being deleted.
if m.Controller.GetDeletionTimestamp() != nil {
return false, nil
}
if err := release(obj); err != nil {
// If the pod no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
}
// Either someone else released it, or there was a transient error.
// The controller should requeue and try again if it's still stale.
return false, err
}
// Successfully released.
return false, nil
}
//如果Pod没有确定归属信息,则Pod属于孤儿Pod
//如果RS没有删除,Pod一直存在
//尝试设置Pod属于这个RS
// It's an orphan.
if m.Controller.GetDeletionTimestamp() != nil || !match(obj) {
// Ignore if we're being deleted or selector doesn't match.
return false, nil
}
if obj.GetDeletionTimestamp() != nil {
// Ignore if the object is being deleted
return false, nil
}
// Selector matches. Try to adopt.
if err := adopt(obj); err != nil {
// If the pod no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
}
// Either someone else claimed it first, or there was a transient error.
// The controller should requeue and try again if it's still orphaned.
return false, err
}
// Successfully adopted.
return true, nil
}
Replicaset Controller 中manageReplicas函数的处理
在Replicaset Controller 中,核心的步骤是通过RS调节Pod的数量,这个在manageReplicas函数中完成。manageReplicas函数的处理代码:
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *extensions.ReplicaSet) error {
//判断设置的副本数量与实际Pod的数量
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for ReplicaSet %#v: %v", rs, err))
return nil
}
var errCh chan error
//如果数量过少,则增加Pod
if diff < 0 {
diff *= -1
errCh = make(chan error, diff)
//进行流量控制,一次最多增加500个Pod
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
// TODO: Track UIDs of creates just like deletes. The problem currently
// is we'd need to wait on the result of a create to record the pod's
// UID, which would require locking *across* the create, which will turn
// into a performance bottleneck. We should generate a UID for the pod
//获得异常信息
// beforehand and store it via ExpectCreations.
rsc.expectations.ExpectCreations(rsKey, diff)
var wg sync.WaitGroup
wg.Add(diff)
glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
//每个Pod单独起协程增加,异步操作
for i := 0; i < diff; i++ {
go func() {
defer wg.Done()
var err error
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
APIVersion: controllerKind.GroupVersion().String(),
Kind: controllerKind.Kind,
Name: rs.Name,
UID: rs.UID,
BlockOwnerDeletion: boolPtr(true),
Controller: boolPtr(true),
}
//调用Pod增加函数增加
err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
}
if err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name)
rsc.expectations.CreationObserved(rsKey)
errCh <- err
}
}()
}
wg.Wait()
} else if diff > 0 {
//删除限制一次删除的数量
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
errCh = make(chan error, diff)
glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
// No need to sort pods if we are about to delete all of them
//进行排序,按照ActivePods.Less函数中的规则排序
//删除排序靠前的Pod
if *(rs.Spec.Replicas) != 0 {
// Sort the pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
sort.Sort(controller.ActivePods(filteredPods))
}
// Snapshot the UIDs (ns/name) of the pods we're expecting to see
// deleted, so we know to record their expectations exactly once either
// when we see it as an update of the deletion timestamp, or as a delete.
// Note that if the labels on a pod/rs change in a way that the pod gets
// orphaned, the rs will only wake up after the expectations have
// expired even if other pods are deleted.
deletedPodKeys := []string{}
for i := 0; i < diff; i++ {
deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i]))
}
rsc.expectations.ExpectDeletions(rsKey, deletedPodKeys)
var wg sync.WaitGroup
wg.Add(diff)
for i := 0; i < diff; i++ {
go func(ix int) {
defer wg.Done()
//异步操作,异常删除Pod
if err := rsc.podControl.DeletePod(rs.Namespace, filteredPods[ix].Name, rs); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(filteredPods[ix])
glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rs.Namespace, rs.Name)
rsc.expectations.DeletionObserved(rsKey, podKey)
errCh <- err
}
}(i)
}
wg.Wait()
}
//等待操作完成
select {
case err := <-errCh:
// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
if err != nil {
return err
}
default:
}
return nil
}
总结
Replicaset Controller
的工作过程包括下面几个主要的步骤:
(1) 在kube-controller-manager框架下,通过startReplicaSetController函数创建对象并启动
(2) 在创建ReplicaSetController对象过程中,主要涉及的操作包括,创建rs监听器,pod监听器,设置syncHandler对象。(在rs监听器中,delete操作也会往队列中写入对象,然后再循环中统一返回nil,完成delete处理)
(3) 在Run函数中WaitForCacheSync等待监听器初始化完成,然后启动Work协程
(4) 在work协程中,依次从队列汇总去除rs对象,进行处理。处理成功则从队列中移除。处理失败则AddRateLimited。
(5) 在
syncReplicaSet
函数中进行真正的处理。处理过程中:
1、获取RS对象
2、获取Namespace下所有的Pod,并对Pod进行过滤
3、通过RS控制Pod的副本数