kubernetes源码分析-deployment分析

  • Post author:
  • Post category:其他


  1. deployemt启动注册:

​ cmd/kube-controller-manager/app/controllermanager.go

func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
	.....
	controllers["deployment"] = startDeploymentController
	....
}

func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
	if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
		return nil, false, nil
	}
	dc, err := deployment.NewDeploymentController(
		ctx.InformerFactory.Apps().V1().Deployments(),
		ctx.InformerFactory.Apps().V1().ReplicaSets(),
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.ClientBuilder.ClientOrDie("deployment-controller"),
	)
	if err != nil {
		return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
	}
	go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
	return nil, true, nil
}

NewControllerInitializers 注册 startDeploymentController


  1. NewDeploymentController

    定义在pkg/controller/deployment/deployment_controller.go
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
	 ......

	dc.syncHandler = dc.syncDeployment
	dc.enqueueDeployment = dc.enqueue

	......
	return dc, nil
}

// Run begins watching and syncing.
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer dc.queue.ShutDown()

	klog.Infof("Starting deployment controller")
	defer klog.Infof("Shutting down deployment controller")

	if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
		return
	}

	for i := 0; i < workers; i++ {
		go wait.Until(dc.worker, time.Second, stopCh)
	}

	<-stopCh
}

调用run启动, handler处理响应, 实际通过dc.syncHandler = dc.syncDeployment 指向syncDeployment

  1. syncDeployment定义:

    // syncDeployment will sync the deployment with the given key.
    // This function is not meant to be invoked concurrently with the same key.
    func (dc *DeploymentController) syncDeployment(key string) error {
    	startTime := time.Now()
    	klog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)
    	defer func() {
    		klog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime))
    	}()
    
    	namespace, name, err := cache.SplitMetaNamespaceKey(key)
    	if err != nil {
    		return err
    	}
    	deployment, err := dc.dLister.Deployments(namespace).Get(name)
    	if errors.IsNotFound(err) {
    		klog.V(2).Infof("Deployment %v has been deleted", key)
    		return nil
    	}
    	if err != nil {
    		return err
    	}
    
    	// Deep-copy otherwise we are mutating our cache.
    	// TODO: Deep-copy only when needed.
    	d := deployment.DeepCopy()
    
    	everything := metav1.LabelSelector{}
    	if reflect.DeepEqual(d.Spec.Selector, &everything) {
    		dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
    		if d.Status.ObservedGeneration < d.Generation {
    			d.Status.ObservedGeneration = d.Generation
    			dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
    		}
    		return nil
    	}
    
    	// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
    	// through adoption/orphaning.
    	rsList, err := dc.getReplicaSetsForDeployment(d)
    	if err != nil {
    		return err
    	}
    	// List all Pods owned by this Deployment, grouped by their ReplicaSet.
    	// Current uses of the podMap are:
    	//
    	// * check if a Pod is labeled correctly with the pod-template-hash label.
    	// * check that no old Pods are running in the middle of Recreate Deployments.
    	podMap, err := dc.getPodMapForDeployment(d, rsList)
    	if err != nil {
    		return err
    	}
    
    	if d.DeletionTimestamp != nil {
    		return dc.syncStatusOnly(d, rsList)
    	}
    
    	// Update deployment conditions with an Unknown condition when pausing/resuming
    	// a deployment. In this way, we can be sure that we won't timeout when a user
    	// resumes a Deployment with a set progressDeadlineSeconds.
    	if err = dc.checkPausedConditions(d); err != nil {
    		return err
    	}
    
    	if d.Spec.Paused {
    		return dc.sync(d, rsList)
    	}
    
    	// rollback is not re-entrant in case the underlying replica sets are updated with a new
    	// revision so we should ensure that we won't proceed to update replica sets until we
    	// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
    	if getRollbackTo(d) != nil {
    		return dc.rollback(d, rsList)
    	}
    
    	scalingEvent, err := dc.isScalingEvent(d, rsList)
    	if err != nil {
    		return err
    	}
    	if scalingEvent {
    		return dc.sync(d, rsList)
    	}
    
    	switch d.Spec.Strategy.Type {
    	case apps.RecreateDeploymentStrategyType:
    		return dc.rolloutRecreate(d, rsList, podMap)
    	case apps.RollingUpdateDeploymentStrategyType:
    		return dc.rolloutRolling(d, rsList)
    	}
    	return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
    }
    
  2. Deployment删除

    	if d.DeletionTimestamp != nil {
    		return dc.syncStatusOnly(d, rsList)
    	}
    
    

    通过标记deletiontimestamp判断, 删除调用syncStatusOnly 更新状态:

    // syncStatusOnly only updates Deployments Status and doesn't take any mutating actions.
    func (dc *DeploymentController) syncStatusOnly(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
    	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
    	if err != nil {
    		return err
    	}
    
    	allRSs := append(oldRSs, newRS)
    	return dc.syncDeploymentStatus(allRSs, newRS, d)
    }
    

    注释

    // syncStatusOnly only updates Deployments Status and doesn't take any mutating actions.

    说明只更新状态, 不进行实际的删除操作

    getAllReplicaSetsAndSyncRevision 获取所有的replicationset以及版本信息列表

    将rs的列表的new以及old生成为所有的列表allrss


    dc.syncDeploymentStatus(allRSs, newRS, d)

    同步状态信息

    // syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
    func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
    	newStatus := calculateStatus(allRSs, newRS, d)
    
    	if reflect.DeepEqual(d.Status, newStatus) {
    		return nil
    	}
    
    	newDeployment := d
    	newDeployment.Status = newStatus
    	_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(context.TODO(), newDeployment, metav1.UpdateOptions{})
    	return err
    }
    


    calculateStatus

    计算状态

    / calculateStatus calculates the latest status for the provided deployment by looking into the provided replica sets.
    func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) apps.DeploymentStatus {
    	availableReplicas := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
    	totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
    	unavailableReplicas := totalReplicas - availableReplicas
    	// If unavailableReplicas is negative, then that means the Deployment has more available replicas running than
    	// desired, e.g. whenever it scales down. In such a case we should simply default unavailableReplicas to zero.
    	if unavailableReplicas < 0 {
    		unavailableReplicas = 0
    	}
    
    	status := apps.DeploymentStatus{
    		// TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value.
    		ObservedGeneration:  deployment.Generation,
    		Replicas:            deploymentutil.GetActualReplicaCountForReplicaSets(allRSs),
    		UpdatedReplicas:     deploymentutil.GetActualReplicaCountForReplicaSets([]*apps.ReplicaSet{newRS}),
    		ReadyReplicas:       deploymentutil.GetReadyReplicaCountForReplicaSets(allRSs),
    		AvailableReplicas:   availableReplicas,
    		UnavailableReplicas: unavailableReplicas,
    		CollisionCount:      deployment.Status.CollisionCount,
    	}
    
    	// Copy conditions one by one so we won't mutate the original object.
    	conditions := deployment.Status.Conditions
    	for i := range conditions {
    		status.Conditions = append(status.Conditions, conditions[i])
    	}
    
    	if availableReplicas >= *(deployment.Spec.Replicas)-deploymentutil.MaxUnavailable(*deployment) {
    		minAvailability := deploymentutil.NewDeploymentCondition(apps.DeploymentAvailable, v1.ConditionTrue, deploymentutil.MinimumReplicasAvailable, "Deployment has minimum availability.")
    		deploymentutil.SetDeploymentCondition(&status, *minAvailability)
    	} else {
    		noMinAvailability := deploymentutil.NewDeploymentCondition(apps.DeploymentAvailable, v1.ConditionFalse, deploymentutil.MinimumReplicasUnavailable, "Deployment does not have minimum availability.")
    		deploymentutil.SetDeploymentCondition(&status, *noMinAvailability)
    	}
    
    	return status
    }
    

    获取到最新的status信息

    然后调用

    dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus

    更新状态,执行对应操作

    对于删除操作,实际实在 kube-controller-manager 的垃圾回收器(garbagecollector controller)中完成的

  3. Deployment 暂停和恢复

    	if err = dc.checkPausedConditions(d); err != nil {
    		return err
    	}
    
    	if d.Spec.Paused {
    		return dc.sync(d, rsList)
    	}
    

    检查是否可以更新pasused暂停

    func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error {
    	if !deploymentutil.HasProgressDeadline(d) {
    		return nil
    	}
    	cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
    	if cond != nil && cond.Reason == deploymentutil.TimedOutReason {
    		// If we have reported lack of progress, do not overwrite it with a paused condition.
    		return nil
    	}
    	pausedCondExists := cond != nil && cond.Reason == deploymentutil.PausedDeployReason
    
    	needsUpdate := false
    	if d.Spec.Paused && !pausedCondExists {
    		condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.PausedDeployReason, "Deployment is paused")
    		deploymentutil.SetDeploymentCondition(&d.Status, *condition)
    		needsUpdate = true
    	} else if !d.Spec.Paused && pausedCondExists {
    		condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.ResumedDeployReason, "Deployment is resumed")
    		deploymentutil.SetDeploymentCondition(&d.Status, *condition)
    		needsUpdate = true
    	}
    
    	if !needsUpdate {
    		return nil
    	}
    
    	var err error
    	_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
    	return err
    }
    

    调用sync进行状态更新

  4. Deployment 回滚

    if getRollbackTo(d) != nil {
       return dc.rollback(d, rsList)
    }
    
    / rollback the deployment to the specified revision. In any case cleanup the rollback spec.
    func (dc *DeploymentController) rollback(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
    	newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
    	if err != nil {
    		return err
    	}
    
    	allRSs := append(allOldRSs, newRS)
    	rollbackTo := getRollbackTo(d)
    	// If rollback revision is 0, rollback to the last revision
    	if rollbackTo.Revision == 0 {
    		if rollbackTo.Revision = deploymentutil.LastRevision(allRSs); rollbackTo.Revision == 0 {
    			// If we still can't find the last revision, gives up rollback
    			dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.")
    			// Gives up rollback
    			return dc.updateDeploymentAndClearRollbackTo(d)
    		}
    	}
    	for _, rs := range allRSs {
    		v, err := deploymentutil.Revision(rs)
    		if err != nil {
    			klog.V(4).Infof("Unable to extract revision from deployment's replica set %q: %v", rs.Name, err)
    			continue
    		}
    		if v == rollbackTo.Revision {
    			klog.V(4).Infof("Found replica set %q with desired revision %d", rs.Name, v)
    			// rollback by copying podTemplate.Spec from the replica set
    			// revision number will be incremented during the next getAllReplicaSetsAndSyncRevision call
    			// no-op if the spec matches current deployment's podTemplate.Spec
    			performedRollback, err := dc.rollbackToTemplate(d, rs)
    			if performedRollback && err == nil {
    				dc.emitRollbackNormalEvent(d, fmt.Sprintf("Rolled back deployment %q to revision %d", d.Name, rollbackTo.Revision))
    			}
    			return err
    		}
    	}
    	dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find the revision to rollback to.")
    	// Gives up rollback
    	return dc.updateDeploymentAndClearRollbackTo(d)
    }
    

    getAllReplicaSetsAndSyncRevision 查询列表


    getRollbackTo

    获取版本信息

    // TODO: Remove this when extensions/v1beta1 and apps/v1beta1 Deployment are dropped.
    func getRollbackTo(d *apps.Deployment) *extensions.RollbackConfig {
    	// Extract the annotation used for round-tripping the deprecated RollbackTo field.
    	revision := d.Annotations[apps.DeprecatedRollbackTo]
    	if revision == "" {
    		return nil
    	}
    	revision64, err := strconv.ParseInt(revision, 10, 64)
    	if err != nil {
    		// If it's invalid, ignore it.
    		return nil
    	}
    	return &extensions.RollbackConfig{
    		Revision: revision64,
    	}
    }
    

    version为0, 回滚到最近的版本:

    	if rollbackTo.Revision == 0 {
    		if rollbackTo.Revision = deploymentutil.LastRevision(allRSs); rollbackTo.Revision == 0 {
    			// If we still can't find the last revision, gives up rollback
    			dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.")
    			// Gives up rollback
    			return dc.updateDeploymentAndClearRollbackTo(d)
    		}
    	}
    


    updateDeploymentAndClearRollbackTo

    回滚

    // updateDeploymentAndClearRollbackTo sets .spec.rollbackTo to nil and update the input deployment
    // It is assumed that the caller will have updated the deployment template appropriately (in case
    // we want to rollback).
    func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(d *apps.Deployment) error {
    	klog.V(4).Infof("Cleans up rollbackTo of deployment %q", d.Name)
    	setRollbackTo(d, nil)
    	_, err := dc.client.AppsV1().Deployments(d.Namespace).Update(context.TODO(), d, metav1.UpdateOptions{})
    	return err
    }
    

    调用client更新

  5. scale 扩缩容

    	scalingEvent, err := dc.isScalingEvent(d, rsList)
    	if err != nil {
    		return err
    	}
    	if scalingEvent {
    		return dc.sync(d, rsList)
    	}
    

    sync

    func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
    	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
    	if err != nil {
    		return err
    	}
    	if err := dc.scale(d, newRS, oldRSs); err != nil {
    		// If we get an error while trying to scale, the deployment will be requeued
    		// so we can abort this resync
    		return err
    	}
    .....
    	allRSs := append(oldRSs, newRS)
    	return dc.syncDeploymentStatus(allRSs, newRS, d)
    }
    

    scale定义如下:

    // scale scales proportionally in order to mitigate risk. Otherwise, scaling up can increase the size
    // of the new replica set and scaling down can decrease the sizes of the old ones, both of which would
    // have the effect of hastening the rollout progress, which could produce a higher proportion of unavailable
    // replicas in the event of a problem with the rolled out template. Should run only on scaling events or
    // when a deployment is paused and not during the normal rollout process.
    func (dc *DeploymentController) scale(deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error {
    	// If there is only one active replica set then we should scale that up to the full count of the
    	// deployment. If there is no active replica set, then we should scale up the newest replica set.
    	if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil {
    		if *(activeOrLatest.Spec.Replicas) == *(deployment.Spec.Replicas) {
    			return nil
    		}
    		_, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, *(deployment.Spec.Replicas), deployment)
    		return err
    	}
    
    	// If the new replica set is saturated, old replica sets should be fully scaled down.
    	// This case handles replica set adoption during a saturated new replica set.
    	if deploymentutil.IsSaturated(deployment, newRS) {
    		for _, old := range controller.FilterActiveReplicaSets(oldRSs) {
    			if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil {
    				return err
    			}
    		}
    		return nil
    	}
    
    	// There are old replica sets with pods and the new replica set is not saturated.
    	// We need to proportionally scale all replica sets (new and old) in case of a
    	// rolling deployment.
    	if deploymentutil.IsRollingUpdate(deployment) {
    		allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
    		allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
    
    		allowedSize := int32(0)
    		if *(deployment.Spec.Replicas) > 0 {
    			allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
    		}
    
    		// Number of additional replicas that can be either added or removed from the total
    		// replicas count. These replicas should be distributed proportionally to the active
    		// replica sets.
    		deploymentReplicasToAdd := allowedSize - allRSsReplicas
    
    		// The additional replicas should be distributed proportionally amongst the active
    		// replica sets from the larger to the smaller in size replica set. Scaling direction
    		// drives what happens in case we are trying to scale replica sets of the same size.
    		// In such a case when scaling up, we should scale up newer replica sets first, and
    		// when scaling down, we should scale down older replica sets first.
    		var scalingOperation string
    		switch {
    		case deploymentReplicasToAdd > 0:
    			sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
    			scalingOperation = "up"
    
    		case deploymentReplicasToAdd < 0:
    			sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
    			scalingOperation = "down"
    		}
    
    		// Iterate over all active replica sets and estimate proportions for each of them.
    		// The absolute value of deploymentReplicasAdded should never exceed the absolute
    		// value of deploymentReplicasToAdd.
    		deploymentReplicasAdded := int32(0)
    		nameToSize := make(map[string]int32)
    		for i := range allRSs {
    			rs := allRSs[i]
    
    			// Estimate proportions if we have replicas to add, otherwise simply populate
    			// nameToSize with the current sizes for each replica set.
    			if deploymentReplicasToAdd != 0 {
    				proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)
    
    				nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion
    				deploymentReplicasAdded += proportion
    			} else {
    				nameToSize[rs.Name] = *(rs.Spec.Replicas)
    			}
    		}
    
    		// Update all replica sets
    		for i := range allRSs {
    			rs := allRSs[i]
    
    			// Add/remove any leftovers to the largest replica set.
    			if i == 0 && deploymentReplicasToAdd != 0 {
    				leftover := deploymentReplicasToAdd - deploymentReplicasAdded
    				nameToSize[rs.Name] = nameToSize[rs.Name] + leftover
    				if nameToSize[rs.Name] < 0 {
    					nameToSize[rs.Name] = 0
    				}
    			}
    
    			// TODO: Use transactions when we have them.
    			if _, _, err := dc.scaleReplicaSet(rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
    				// Return as soon as we fail, the deployment is requeued
    				return err
    			}
    		}
    	}
    	return nil
    }
    

    更新scaleReplicaSet

    func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) {
    
    	sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale
    
    	annotationsNeedUpdate := deploymentutil.ReplicasAnnotationsNeedUpdate(rs, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
    
    	scaled := false
    	var err error
    	if sizeNeedsUpdate || annotationsNeedUpdate {
    		rsCopy := rs.DeepCopy()
    		*(rsCopy.Spec.Replicas) = newScale
    		deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
    		rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{})
    		if err == nil && sizeNeedsUpdate {
    			scaled = true
    			dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
    		}
    	}
    	return scaled, rs, err
    }
    
  6. Deployment重建, rollupdate滚动升级

    	switch d.Spec.Strategy.Type {
    	case apps.RecreateDeploymentStrategyType:
    		return dc.rolloutRecreate(d, rsList, podMap)
    	case apps.RollingUpdateDeploymentStrategyType:
    		return dc.rolloutRolling(d, rsList)
    	}
    

    s

    // rolloutRecreate implements the logic for recreating a replica set.
    func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error {
    	// Don't create a new RS if not already existed, so that we avoid scaling up before scaling down.
    	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
    	if err != nil {
    		return err
    	}
    	allRSs := append(oldRSs, newRS)
    	activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)
    
    	// scale down old replica sets.
    	scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d)
    	if err != nil {
    		return err
    	}
    	if scaledDown {
    		// Update DeploymentStatus.
    		return dc.syncRolloutStatus(allRSs, newRS, d)
    	}
    
    	// Do not process a deployment when it has old pods running.
    	if oldPodsRunning(newRS, oldRSs, podMap) {
    		return dc.syncRolloutStatus(allRSs, newRS, d)
    	}
    
    	// If we need to create a new RS, create it now.
    	if newRS == nil {
    		newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
    		if err != nil {
    			return err
    		}
    		allRSs = append(oldRSs, newRS)
    	}
    
    	// scale up new replica set.
    	if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
    		return err
    	}
    
    	if util.DeploymentComplete(d, &d.Status) {
    		if err := dc.cleanupDeployment(oldRSs, d); err != nil {
    			return err
    		}
    	}
    
    	// Sync deployment status.
    	return dc.syncRolloutStatus(allRSs, newRS, d)
    }
    

    rolloutRolling定义:

    // rolloutRolling implements the logic for rolling a new replica set.
    func (dc *DeploymentController) rolloutRolling(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
    	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
    	if err != nil {
    		return err
    	}
    	allRSs := append(oldRSs, newRS)
    
    	// Scale up, if we can.
    	scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
    	if err != nil {
    		return err
    	}
    	if scaledUp {
    		// Update DeploymentStatus
    		return dc.syncRolloutStatus(allRSs, newRS, d)
    	}
    
    	// Scale down, if we can.
    	scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
    	if err != nil {
    		return err
    	}
    	if scaledDown {
    		// Update DeploymentStatus
    		return dc.syncRolloutStatus(allRSs, newRS, d)
    	}
    
    	if deploymentutil.DeploymentComplete(d, &d.Status) {
    		if err := dc.cleanupDeployment(oldRSs, d); err != nil {
    			return err
    		}
    	}
    
    	// Sync deployment status
    	return dc.syncRolloutStatus(allRSs, newRS, d)
    }
    

    rolloutrolling会判断scale up 和scaledown

    scaleup流程:

    scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
    

    reconcileNewReplicaSet定义如下:

    func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
    	if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) {
    		// Scaling not required.
    		return false, nil
    	}
    	if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) {
    		// Scale down.
    		scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment)
    		return scaled, err
    	}
    	newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
    	if err != nil {
    		return false, err
    	}
    	scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment)
    	return scaled, err
    }
    

    判断是否需要scale

    判断scale down


    deploymentutil.NewRSNewReplicas

    获取scale的数量


    scaleReplicaSetAndRecordEvent

    调用rs,进行scale并更新event

    func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment) (bool, *apps.ReplicaSet, error) {
       // No need to scale
       if *(rs.Spec.Replicas) == newScale {
          return false, rs, nil
       }
       var scalingOperation string
       if *(rs.Spec.Replicas) < newScale {
          scalingOperation = "up"
       } else {
          scalingOperation = "down"
       }
       scaled, newRS, err := dc.scaleReplicaSet(rs, newScale, deployment, scalingOperation)
       return scaled, newRS, err
    }
    
    func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) {
    
       sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale
    
       annotationsNeedUpdate := deploymentutil.ReplicasAnnotationsNeedUpdate(rs, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
    
       scaled := false
       var err error
       if sizeNeedsUpdate || annotationsNeedUpdate {
          rsCopy := rs.DeepCopy()
          *(rsCopy.Spec.Replicas) = newScale
          deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
          rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{})
          if err == nil && sizeNeedsUpdate {
             scaled = true
             dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
          }
       }
       return scaled, rs, err
    }
    

    两个步骤都最终调用syncRolloutStatus


    syncRolloutStatus

    定义如下:

// syncRolloutStatus updates the status of a deployment during a rollout. There are
// cases this helper will run that cannot be prevented from the scaling detection,
// for example a resync of the deployment after it was scaled up. In those cases,
// we shouldn't try to estimate any progress.
func (dc *DeploymentController) syncRolloutStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
	newStatus := calculateStatus(allRSs, newRS, d)

	// If there is no progressDeadlineSeconds set, remove any Progressing condition.
	if !util.HasProgressDeadline(d) {
		util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing)
	}

	// If there is only one replica set that is active then that means we are not running
	// a new rollout and this is a resync where we don't need to estimate any progress.
	// In such a case, we should simply not estimate any progress for this deployment.
	currentCond := util.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
	isCompleteDeployment := newStatus.Replicas == newStatus.UpdatedReplicas && currentCond != nil && currentCond.Reason == util.NewRSAvailableReason
	// Check for progress only if there is a progress deadline set and the latest rollout
	// hasn't completed yet.
	if util.HasProgressDeadline(d) && !isCompleteDeployment {
		switch {
		case util.DeploymentComplete(d, &newStatus):
			// Update the deployment conditions with a message for the new replica set that
			// was successfully deployed. If the condition already exists, we ignore this update.
			msg := fmt.Sprintf("Deployment %q has successfully progressed.", d.Name)
			if newRS != nil {
				msg = fmt.Sprintf("ReplicaSet %q has successfully progressed.", newRS.Name)
			}
			condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.NewRSAvailableReason, msg)
			util.SetDeploymentCondition(&newStatus, *condition)

		case util.DeploymentProgressing(d, &newStatus):
			// If there is any progress made, continue by not checking if the deployment failed. This
			// behavior emulates the rolling updater progressDeadline check.
			msg := fmt.Sprintf("Deployment %q is progressing.", d.Name)
			if newRS != nil {
				msg = fmt.Sprintf("ReplicaSet %q is progressing.", newRS.Name)
			}
			condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.ReplicaSetUpdatedReason, msg)
			// Update the current Progressing condition or add a new one if it doesn't exist.
			// If a Progressing condition with status=true already exists, we should update
			// everything but lastTransitionTime. SetDeploymentCondition already does that but
			// it also is not updating conditions when the reason of the new condition is the
			// same as the old. The Progressing condition is a special case because we want to
			// update with the same reason and change just lastUpdateTime iff we notice any
			// progress. That's why we handle it here.
			if currentCond != nil {
				if currentCond.Status == v1.ConditionTrue {
					condition.LastTransitionTime = currentCond.LastTransitionTime
				}
				util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing)
			}
			util.SetDeploymentCondition(&newStatus, *condition)

		case util.DeploymentTimedOut(d, &newStatus):
			// Update the deployment with a timeout condition. If the condition already exists,
			// we ignore this update.
			msg := fmt.Sprintf("Deployment %q has timed out progressing.", d.Name)
			if newRS != nil {
				msg = fmt.Sprintf("ReplicaSet %q has timed out progressing.", newRS.Name)
			}
			condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionFalse, util.TimedOutReason, msg)
			util.SetDeploymentCondition(&newStatus, *condition)
		}
	}

	// Move failure conditions of all replica sets in deployment conditions. For now,
	// only one failure condition is returned from getReplicaFailures.
	if replicaFailureCond := dc.getReplicaFailures(allRSs, newRS); len(replicaFailureCond) > 0 {
		// There will be only one ReplicaFailure condition on the replica set.
		util.SetDeploymentCondition(&newStatus, replicaFailureCond[0])
	} else {
		util.RemoveDeploymentCondition(&newStatus, apps.DeploymentReplicaFailure)
	}

	// Do not update if there is nothing new to add.
	if reflect.DeepEqual(d.Status, newStatus) {
		// Requeue the deployment if required.
		dc.requeueStuckDeployment(d, newStatus)
		return nil
	}

	newDeployment := d
	newDeployment.Status = newStatus
	_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(context.TODO(), newDeployment, metav1.UpdateOptions{})
	return err
}



版权声明:本文为comprel原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。