- deployemt启动注册:
cmd/kube-controller-manager/app/controllermanager.go
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
.....
controllers["deployment"] = startDeploymentController
....
}
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
return nil, false, nil
}
dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
}
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
return nil, true, nil
}
NewControllerInitializers 注册 startDeploymentController
-
NewDeploymentController
定义在pkg/controller/deployment/deployment_controller.go
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
......
dc.syncHandler = dc.syncDeployment
dc.enqueueDeployment = dc.enqueue
......
return dc, nil
}
// Run begins watching and syncing.
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()
klog.Infof("Starting deployment controller")
defer klog.Infof("Shutting down deployment controller")
if !cache.WaitForNamedCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(dc.worker, time.Second, stopCh)
}
<-stopCh
}
调用run启动, handler处理响应, 实际通过dc.syncHandler = dc.syncDeployment 指向syncDeployment
-
syncDeployment定义:
// syncDeployment will sync the deployment with the given key. // This function is not meant to be invoked concurrently with the same key. func (dc *DeploymentController) syncDeployment(key string) error { startTime := time.Now() klog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime) defer func() { klog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } deployment, err := dc.dLister.Deployments(namespace).Get(name) if errors.IsNotFound(err) { klog.V(2).Infof("Deployment %v has been deleted", key) return nil } if err != nil { return err } // Deep-copy otherwise we are mutating our cache. // TODO: Deep-copy only when needed. d := deployment.DeepCopy() everything := metav1.LabelSelector{} if reflect.DeepEqual(d.Spec.Selector, &everything) { dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.") if d.Status.ObservedGeneration < d.Generation { d.Status.ObservedGeneration = d.Generation dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}) } return nil } // List ReplicaSets owned by this Deployment, while reconciling ControllerRef // through adoption/orphaning. rsList, err := dc.getReplicaSetsForDeployment(d) if err != nil { return err } // List all Pods owned by this Deployment, grouped by their ReplicaSet. // Current uses of the podMap are: // // * check if a Pod is labeled correctly with the pod-template-hash label. // * check that no old Pods are running in the middle of Recreate Deployments. podMap, err := dc.getPodMapForDeployment(d, rsList) if err != nil { return err } if d.DeletionTimestamp != nil { return dc.syncStatusOnly(d, rsList) } // Update deployment conditions with an Unknown condition when pausing/resuming // a deployment. In this way, we can be sure that we won't timeout when a user // resumes a Deployment with a set progressDeadlineSeconds. if err = dc.checkPausedConditions(d); err != nil { return err } if d.Spec.Paused { return dc.sync(d, rsList) } // rollback is not re-entrant in case the underlying replica sets are updated with a new // revision so we should ensure that we won't proceed to update replica sets until we // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues. if getRollbackTo(d) != nil { return dc.rollback(d, rsList) } scalingEvent, err := dc.isScalingEvent(d, rsList) if err != nil { return err } if scalingEvent { return dc.sync(d, rsList) } switch d.Spec.Strategy.Type { case apps.RecreateDeploymentStrategyType: return dc.rolloutRecreate(d, rsList, podMap) case apps.RollingUpdateDeploymentStrategyType: return dc.rolloutRolling(d, rsList) } return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type) }
-
Deployment删除
if d.DeletionTimestamp != nil { return dc.syncStatusOnly(d, rsList) }
通过标记deletiontimestamp判断, 删除调用syncStatusOnly 更新状态:
// syncStatusOnly only updates Deployments Status and doesn't take any mutating actions. func (dc *DeploymentController) syncStatusOnly(d *apps.Deployment, rsList []*apps.ReplicaSet) error { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false) if err != nil { return err } allRSs := append(oldRSs, newRS) return dc.syncDeploymentStatus(allRSs, newRS, d) }
注释
// syncStatusOnly only updates Deployments Status and doesn't take any mutating actions.
说明只更新状态, 不进行实际的删除操作getAllReplicaSetsAndSyncRevision 获取所有的replicationset以及版本信息列表
将rs的列表的new以及old生成为所有的列表allrss
dc.syncDeploymentStatus(allRSs, newRS, d)
同步状态信息// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error { newStatus := calculateStatus(allRSs, newRS, d) if reflect.DeepEqual(d.Status, newStatus) { return nil } newDeployment := d newDeployment.Status = newStatus _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(context.TODO(), newDeployment, metav1.UpdateOptions{}) return err }
calculateStatus
计算状态/ calculateStatus calculates the latest status for the provided deployment by looking into the provided replica sets. func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) apps.DeploymentStatus { availableReplicas := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs) totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) unavailableReplicas := totalReplicas - availableReplicas // If unavailableReplicas is negative, then that means the Deployment has more available replicas running than // desired, e.g. whenever it scales down. In such a case we should simply default unavailableReplicas to zero. if unavailableReplicas < 0 { unavailableReplicas = 0 } status := apps.DeploymentStatus{ // TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value. ObservedGeneration: deployment.Generation, Replicas: deploymentutil.GetActualReplicaCountForReplicaSets(allRSs), UpdatedReplicas: deploymentutil.GetActualReplicaCountForReplicaSets([]*apps.ReplicaSet{newRS}), ReadyReplicas: deploymentutil.GetReadyReplicaCountForReplicaSets(allRSs), AvailableReplicas: availableReplicas, UnavailableReplicas: unavailableReplicas, CollisionCount: deployment.Status.CollisionCount, } // Copy conditions one by one so we won't mutate the original object. conditions := deployment.Status.Conditions for i := range conditions { status.Conditions = append(status.Conditions, conditions[i]) } if availableReplicas >= *(deployment.Spec.Replicas)-deploymentutil.MaxUnavailable(*deployment) { minAvailability := deploymentutil.NewDeploymentCondition(apps.DeploymentAvailable, v1.ConditionTrue, deploymentutil.MinimumReplicasAvailable, "Deployment has minimum availability.") deploymentutil.SetDeploymentCondition(&status, *minAvailability) } else { noMinAvailability := deploymentutil.NewDeploymentCondition(apps.DeploymentAvailable, v1.ConditionFalse, deploymentutil.MinimumReplicasUnavailable, "Deployment does not have minimum availability.") deploymentutil.SetDeploymentCondition(&status, *noMinAvailability) } return status }
获取到最新的status信息
然后调用
dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus
更新状态,执行对应操作对于删除操作,实际实在 kube-controller-manager 的垃圾回收器(garbagecollector controller)中完成的
-
Deployment 暂停和恢复
if err = dc.checkPausedConditions(d); err != nil { return err } if d.Spec.Paused { return dc.sync(d, rsList) }
检查是否可以更新pasused暂停
func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error { if !deploymentutil.HasProgressDeadline(d) { return nil } cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing) if cond != nil && cond.Reason == deploymentutil.TimedOutReason { // If we have reported lack of progress, do not overwrite it with a paused condition. return nil } pausedCondExists := cond != nil && cond.Reason == deploymentutil.PausedDeployReason needsUpdate := false if d.Spec.Paused && !pausedCondExists { condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.PausedDeployReason, "Deployment is paused") deploymentutil.SetDeploymentCondition(&d.Status, *condition) needsUpdate = true } else if !d.Spec.Paused && pausedCondExists { condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.ResumedDeployReason, "Deployment is resumed") deploymentutil.SetDeploymentCondition(&d.Status, *condition) needsUpdate = true } if !needsUpdate { return nil } var err error _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}) return err }
调用sync进行状态更新
-
Deployment 回滚
if getRollbackTo(d) != nil { return dc.rollback(d, rsList) }
/ rollback the deployment to the specified revision. In any case cleanup the rollback spec. func (dc *DeploymentController) rollback(d *apps.Deployment, rsList []*apps.ReplicaSet) error { newRS, allOldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true) if err != nil { return err } allRSs := append(allOldRSs, newRS) rollbackTo := getRollbackTo(d) // If rollback revision is 0, rollback to the last revision if rollbackTo.Revision == 0 { if rollbackTo.Revision = deploymentutil.LastRevision(allRSs); rollbackTo.Revision == 0 { // If we still can't find the last revision, gives up rollback dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.") // Gives up rollback return dc.updateDeploymentAndClearRollbackTo(d) } } for _, rs := range allRSs { v, err := deploymentutil.Revision(rs) if err != nil { klog.V(4).Infof("Unable to extract revision from deployment's replica set %q: %v", rs.Name, err) continue } if v == rollbackTo.Revision { klog.V(4).Infof("Found replica set %q with desired revision %d", rs.Name, v) // rollback by copying podTemplate.Spec from the replica set // revision number will be incremented during the next getAllReplicaSetsAndSyncRevision call // no-op if the spec matches current deployment's podTemplate.Spec performedRollback, err := dc.rollbackToTemplate(d, rs) if performedRollback && err == nil { dc.emitRollbackNormalEvent(d, fmt.Sprintf("Rolled back deployment %q to revision %d", d.Name, rollbackTo.Revision)) } return err } } dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find the revision to rollback to.") // Gives up rollback return dc.updateDeploymentAndClearRollbackTo(d) }
getAllReplicaSetsAndSyncRevision 查询列表
getRollbackTo
获取版本信息// TODO: Remove this when extensions/v1beta1 and apps/v1beta1 Deployment are dropped. func getRollbackTo(d *apps.Deployment) *extensions.RollbackConfig { // Extract the annotation used for round-tripping the deprecated RollbackTo field. revision := d.Annotations[apps.DeprecatedRollbackTo] if revision == "" { return nil } revision64, err := strconv.ParseInt(revision, 10, 64) if err != nil { // If it's invalid, ignore it. return nil } return &extensions.RollbackConfig{ Revision: revision64, } }
version为0, 回滚到最近的版本:
if rollbackTo.Revision == 0 { if rollbackTo.Revision = deploymentutil.LastRevision(allRSs); rollbackTo.Revision == 0 { // If we still can't find the last revision, gives up rollback dc.emitRollbackWarningEvent(d, deploymentutil.RollbackRevisionNotFound, "Unable to find last revision.") // Gives up rollback return dc.updateDeploymentAndClearRollbackTo(d) } }
updateDeploymentAndClearRollbackTo
回滚// updateDeploymentAndClearRollbackTo sets .spec.rollbackTo to nil and update the input deployment // It is assumed that the caller will have updated the deployment template appropriately (in case // we want to rollback). func (dc *DeploymentController) updateDeploymentAndClearRollbackTo(d *apps.Deployment) error { klog.V(4).Infof("Cleans up rollbackTo of deployment %q", d.Name) setRollbackTo(d, nil) _, err := dc.client.AppsV1().Deployments(d.Namespace).Update(context.TODO(), d, metav1.UpdateOptions{}) return err }
调用client更新
-
scale 扩缩容
scalingEvent, err := dc.isScalingEvent(d, rsList) if err != nil { return err } if scalingEvent { return dc.sync(d, rsList) }
sync
func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaSet) error { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false) if err != nil { return err } if err := dc.scale(d, newRS, oldRSs); err != nil { // If we get an error while trying to scale, the deployment will be requeued // so we can abort this resync return err } ..... allRSs := append(oldRSs, newRS) return dc.syncDeploymentStatus(allRSs, newRS, d) }
scale定义如下:
// scale scales proportionally in order to mitigate risk. Otherwise, scaling up can increase the size // of the new replica set and scaling down can decrease the sizes of the old ones, both of which would // have the effect of hastening the rollout progress, which could produce a higher proportion of unavailable // replicas in the event of a problem with the rolled out template. Should run only on scaling events or // when a deployment is paused and not during the normal rollout process. func (dc *DeploymentController) scale(deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error { // If there is only one active replica set then we should scale that up to the full count of the // deployment. If there is no active replica set, then we should scale up the newest replica set. if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil { if *(activeOrLatest.Spec.Replicas) == *(deployment.Spec.Replicas) { return nil } _, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, *(deployment.Spec.Replicas), deployment) return err } // If the new replica set is saturated, old replica sets should be fully scaled down. // This case handles replica set adoption during a saturated new replica set. if deploymentutil.IsSaturated(deployment, newRS) { for _, old := range controller.FilterActiveReplicaSets(oldRSs) { if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil { return err } } return nil } // There are old replica sets with pods and the new replica set is not saturated. // We need to proportionally scale all replica sets (new and old) in case of a // rolling deployment. if deploymentutil.IsRollingUpdate(deployment) { allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS)) allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) allowedSize := int32(0) if *(deployment.Spec.Replicas) > 0 { allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment) } // Number of additional replicas that can be either added or removed from the total // replicas count. These replicas should be distributed proportionally to the active // replica sets. deploymentReplicasToAdd := allowedSize - allRSsReplicas // The additional replicas should be distributed proportionally amongst the active // replica sets from the larger to the smaller in size replica set. Scaling direction // drives what happens in case we are trying to scale replica sets of the same size. // In such a case when scaling up, we should scale up newer replica sets first, and // when scaling down, we should scale down older replica sets first. var scalingOperation string switch { case deploymentReplicasToAdd > 0: sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs)) scalingOperation = "up" case deploymentReplicasToAdd < 0: sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs)) scalingOperation = "down" } // Iterate over all active replica sets and estimate proportions for each of them. // The absolute value of deploymentReplicasAdded should never exceed the absolute // value of deploymentReplicasToAdd. deploymentReplicasAdded := int32(0) nameToSize := make(map[string]int32) for i := range allRSs { rs := allRSs[i] // Estimate proportions if we have replicas to add, otherwise simply populate // nameToSize with the current sizes for each replica set. if deploymentReplicasToAdd != 0 { proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded) nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion deploymentReplicasAdded += proportion } else { nameToSize[rs.Name] = *(rs.Spec.Replicas) } } // Update all replica sets for i := range allRSs { rs := allRSs[i] // Add/remove any leftovers to the largest replica set. if i == 0 && deploymentReplicasToAdd != 0 { leftover := deploymentReplicasToAdd - deploymentReplicasAdded nameToSize[rs.Name] = nameToSize[rs.Name] + leftover if nameToSize[rs.Name] < 0 { nameToSize[rs.Name] = 0 } } // TODO: Use transactions when we have them. if _, _, err := dc.scaleReplicaSet(rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil { // Return as soon as we fail, the deployment is requeued return err } } } return nil }
更新scaleReplicaSet
func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) { sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale annotationsNeedUpdate := deploymentutil.ReplicasAnnotationsNeedUpdate(rs, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment)) scaled := false var err error if sizeNeedsUpdate || annotationsNeedUpdate { rsCopy := rs.DeepCopy() *(rsCopy.Spec.Replicas) = newScale deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment)) rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{}) if err == nil && sizeNeedsUpdate { scaled = true dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) } } return scaled, rs, err }
-
Deployment重建, rollupdate滚动升级
switch d.Spec.Strategy.Type { case apps.RecreateDeploymentStrategyType: return dc.rolloutRecreate(d, rsList, podMap) case apps.RollingUpdateDeploymentStrategyType: return dc.rolloutRolling(d, rsList) }
s
// rolloutRecreate implements the logic for recreating a replica set. func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error { // Don't create a new RS if not already existed, so that we avoid scaling up before scaling down. newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false) if err != nil { return err } allRSs := append(oldRSs, newRS) activeOldRSs := controller.FilterActiveReplicaSets(oldRSs) // scale down old replica sets. scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d) if err != nil { return err } if scaledDown { // Update DeploymentStatus. return dc.syncRolloutStatus(allRSs, newRS, d) } // Do not process a deployment when it has old pods running. if oldPodsRunning(newRS, oldRSs, podMap) { return dc.syncRolloutStatus(allRSs, newRS, d) } // If we need to create a new RS, create it now. if newRS == nil { newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true) if err != nil { return err } allRSs = append(oldRSs, newRS) } // scale up new replica set. if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil { return err } if util.DeploymentComplete(d, &d.Status) { if err := dc.cleanupDeployment(oldRSs, d); err != nil { return err } } // Sync deployment status. return dc.syncRolloutStatus(allRSs, newRS, d) }
rolloutRolling定义:
// rolloutRolling implements the logic for rolling a new replica set. func (dc *DeploymentController) rolloutRolling(d *apps.Deployment, rsList []*apps.ReplicaSet) error { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true) if err != nil { return err } allRSs := append(oldRSs, newRS) // Scale up, if we can. scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d) if err != nil { return err } if scaledUp { // Update DeploymentStatus return dc.syncRolloutStatus(allRSs, newRS, d) } // Scale down, if we can. scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d) if err != nil { return err } if scaledDown { // Update DeploymentStatus return dc.syncRolloutStatus(allRSs, newRS, d) } if deploymentutil.DeploymentComplete(d, &d.Status) { if err := dc.cleanupDeployment(oldRSs, d); err != nil { return err } } // Sync deployment status return dc.syncRolloutStatus(allRSs, newRS, d) }
rolloutrolling会判断scale up 和scaledown
scaleup流程:
scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
reconcileNewReplicaSet定义如下:
func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) { if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) { // Scaling not required. return false, nil } if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) { // Scale down. scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment) return scaled, err } newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS) if err != nil { return false, err } scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, deployment) return scaled, err }
判断是否需要scale
判断scale down
deploymentutil.NewRSNewReplicas
获取scale的数量
scaleReplicaSetAndRecordEvent
调用rs,进行scale并更新eventfunc (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment) (bool, *apps.ReplicaSet, error) { // No need to scale if *(rs.Spec.Replicas) == newScale { return false, rs, nil } var scalingOperation string if *(rs.Spec.Replicas) < newScale { scalingOperation = "up" } else { scalingOperation = "down" } scaled, newRS, err := dc.scaleReplicaSet(rs, newScale, deployment, scalingOperation) return scaled, newRS, err } func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) { sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale annotationsNeedUpdate := deploymentutil.ReplicasAnnotationsNeedUpdate(rs, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment)) scaled := false var err error if sizeNeedsUpdate || annotationsNeedUpdate { rsCopy := rs.DeepCopy() *(rsCopy.Spec.Replicas) = newScale deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment)) rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{}) if err == nil && sizeNeedsUpdate { scaled = true dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) } } return scaled, rs, err }
两个步骤都最终调用syncRolloutStatus
syncRolloutStatus
定义如下:
// syncRolloutStatus updates the status of a deployment during a rollout. There are
// cases this helper will run that cannot be prevented from the scaling detection,
// for example a resync of the deployment after it was scaled up. In those cases,
// we shouldn't try to estimate any progress.
func (dc *DeploymentController) syncRolloutStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
newStatus := calculateStatus(allRSs, newRS, d)
// If there is no progressDeadlineSeconds set, remove any Progressing condition.
if !util.HasProgressDeadline(d) {
util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing)
}
// If there is only one replica set that is active then that means we are not running
// a new rollout and this is a resync where we don't need to estimate any progress.
// In such a case, we should simply not estimate any progress for this deployment.
currentCond := util.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
isCompleteDeployment := newStatus.Replicas == newStatus.UpdatedReplicas && currentCond != nil && currentCond.Reason == util.NewRSAvailableReason
// Check for progress only if there is a progress deadline set and the latest rollout
// hasn't completed yet.
if util.HasProgressDeadline(d) && !isCompleteDeployment {
switch {
case util.DeploymentComplete(d, &newStatus):
// Update the deployment conditions with a message for the new replica set that
// was successfully deployed. If the condition already exists, we ignore this update.
msg := fmt.Sprintf("Deployment %q has successfully progressed.", d.Name)
if newRS != nil {
msg = fmt.Sprintf("ReplicaSet %q has successfully progressed.", newRS.Name)
}
condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.NewRSAvailableReason, msg)
util.SetDeploymentCondition(&newStatus, *condition)
case util.DeploymentProgressing(d, &newStatus):
// If there is any progress made, continue by not checking if the deployment failed. This
// behavior emulates the rolling updater progressDeadline check.
msg := fmt.Sprintf("Deployment %q is progressing.", d.Name)
if newRS != nil {
msg = fmt.Sprintf("ReplicaSet %q is progressing.", newRS.Name)
}
condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, util.ReplicaSetUpdatedReason, msg)
// Update the current Progressing condition or add a new one if it doesn't exist.
// If a Progressing condition with status=true already exists, we should update
// everything but lastTransitionTime. SetDeploymentCondition already does that but
// it also is not updating conditions when the reason of the new condition is the
// same as the old. The Progressing condition is a special case because we want to
// update with the same reason and change just lastUpdateTime iff we notice any
// progress. That's why we handle it here.
if currentCond != nil {
if currentCond.Status == v1.ConditionTrue {
condition.LastTransitionTime = currentCond.LastTransitionTime
}
util.RemoveDeploymentCondition(&newStatus, apps.DeploymentProgressing)
}
util.SetDeploymentCondition(&newStatus, *condition)
case util.DeploymentTimedOut(d, &newStatus):
// Update the deployment with a timeout condition. If the condition already exists,
// we ignore this update.
msg := fmt.Sprintf("Deployment %q has timed out progressing.", d.Name)
if newRS != nil {
msg = fmt.Sprintf("ReplicaSet %q has timed out progressing.", newRS.Name)
}
condition := util.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionFalse, util.TimedOutReason, msg)
util.SetDeploymentCondition(&newStatus, *condition)
}
}
// Move failure conditions of all replica sets in deployment conditions. For now,
// only one failure condition is returned from getReplicaFailures.
if replicaFailureCond := dc.getReplicaFailures(allRSs, newRS); len(replicaFailureCond) > 0 {
// There will be only one ReplicaFailure condition on the replica set.
util.SetDeploymentCondition(&newStatus, replicaFailureCond[0])
} else {
util.RemoveDeploymentCondition(&newStatus, apps.DeploymentReplicaFailure)
}
// Do not update if there is nothing new to add.
if reflect.DeepEqual(d.Status, newStatus) {
// Requeue the deployment if required.
dc.requeueStuckDeployment(d, newStatus)
return nil
}
newDeployment := d
newDeployment.Status = newStatus
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(context.TODO(), newDeployment, metav1.UpdateOptions{})
return err
}