k8s cm 启动过程分析

  • Post author:
  • Post category:其他


1.main函数k8s.io/kubernetes/cmd/kube-controller-manager/controller-manager.go

注册健康检查到http.DefaultServeMux

func main() {
	// 根据默认配置创建cm对象
	s := options.NewCMServer()
	// 添加flag
	s.AddFlags(pflag.CommandLine)Bı∫

	flag.InitFlags()
	logs.InitLogs()
	defer logs.FlushLogs()
	//是否为version参数
	verflag.PrintAndExitIfRequested()
	//运行
	if err := app.Run(s); err != nil {
		fmt.Fprintf(os.Stderr, "%v\n", err)
		os.Exit(1)
	}
}

2.k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go

func Run(s *options.CMServer) error {
	// 创建对应的config,加入到configs
	if c, err := configz.New("componentconfig"); err == nil {
		// 为对应的config设置config.val值
		c.Set(s.KubeControllerManagerConfiguration)
	} else {
		glog.Errorf("unable to register configz: %s", err)
	}
	//根据kubeconfig和master的url地址生成最终的restclient.Config
	kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
	if err != nil {
		return err
	}

	kubeconfig.ContentConfig.ContentType = s.ContentType
	// Override kubeconfig qps/burst settings from flags
	kubeconfig.QPS = s.KubeAPIQPS
	kubeconfig.Burst = int(s.KubeAPIBurst)
	// 创建Clientset
	kubeClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "controller-manager"))
	if err != nil {
		glog.Fatalf("Invalid API configuration: %v", err)
	}
	leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "leader-election"))
	//开启服务
	go func() {
		mux := http.NewServeMux()
		healthz.InstallHandler(mux)
		if s.EnableProfiling {
			mux.HandleFunc("/debug/pprof/", pprof.Index)
			mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
			mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
		}
		configz.InstallHandler(mux)
		mux.Handle("/metrics", prometheus.Handler())

		server := &http.Server{
			Addr:    net.JoinHostPort(s.Address, strconv.Itoa(int(s.Port))),
			Handler: mux,
		}
		glog.Fatal(server.ListenAndServe())
	}()
	//事件播音员 Broadcaster发送事件到EventSink,EventSink存储事件
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(glog.Infof)
	// 发送event到指定的EventSink,
	// &unversionedcore.EventSinkImpl{Interface:&events{kubeClient: c,ns:"",}}
	// 发送event到指定的Sink
	eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
	// 创建新的接收器,从给定的event source中记录event
	recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controller-manager"})
	// 定义启动方法
	run := func(stop <-chan struct{}) {
		// 统一客户端 对于不同的用户客户端
		rootClientBuilder := controller.SimpleControllerClientBuilder{
			ClientConfig: kubeconfig,
		}
		var clientBuilder controller.ControllerClientBuilder
		if len(s.ServiceAccountKeyFile) > 0 {
			clientBuilder = controller.SAControllerClientBuilder{
				ClientConfig: restclient.AnonymousClientConfig(kubeconfig),
				CoreClient:   kubeClient.Core(),
				Namespace:    "kube-system",
			}
		} else {
			clientBuilder = rootClientBuilder
		}

		err := StartControllers(s, kubeconfig, rootClientBuilder, clientBuilder, stop, recorder)
		glog.Fatalf("error running controllers: %v", err)
		panic("unreachable")
	}
	//判断是否开启选举
	if !s.LeaderElection.LeaderElect {
		run(nil)
		panic("unreachable")
	}

	id, err := os.Hostname()
	if err != nil {
		return err
	}

	// TODO: enable other lock types
	rl := resourcelock.EndpointsLock{
		EndpointsMeta: api.ObjectMeta{
			Namespace: "kube-system",
			Name:      "kube-controller-manager",
		},
		Client: leaderElectionClient,
		LockConfig: resourcelock.ResourceLockConfig{
			Identity:      id,
			EventRecorder: recorder,
		},
	}
	//根据配置选举leader,以及选举 失败/成功对应的执行方法
	leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
		Lock:          &rl,
		LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
		RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
		RetryPeriod:   s.LeaderElection.RetryPeriod.Duration,
		Callbacks: leaderelection.LeaderCallbacks{
			OnStartedLeading: run,
			OnStoppedLeading: func() {
				glog.Fatalf("leaderelection lost")
			},
		},
	})
	panic("unreachable")
}

在run方法中实际上调用了sharedInformers

在StartControllers中在初始化了SharedInformerFactory后启动了EndpointController、ReplicationManager、PodGCController

nodeController、serviceController、routeController、resourceQuotaController、namespaceController

orizontalController、DaemonSetsController、obController、DeploymentController、ReplicaSetController、DisruptionController

PetSetController、ScheduledJobController、persistentvolumecontroller、AttachDetachController、CertificateController

ServiceAccountsController、GarbageCollector

3.k8s.io/kubernetes/pkg/controller/informers/factory.go

调用了start方法

informers通过应资源的Informer() cache.SharedIndexInformer方法注册

func (s *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	s.lock.Lock()
	defer s.lock.Unlock()

	for informerType, informer := range s.informers {
		if !s.startedInformers[informerType] {
			// sharedIndexInformer.run
			go informer.Run(stopCh)
			s.startedInformers[informerType] = true
		}
	}
}

最后可以看到遍历informers,调用对影informer的run方法,以podInformer为例

4.k8s.io/kubernetes/pkg/controller/informers/core.go

func (f *podInformer) Informer() cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

	informerType := reflect.TypeOf(&api.Pod{})
	informer, exists := f.informers[informerType]
	if exists {
		return informer
	}
	informer = NewPodInformer(f.client, f.defaultResync)
	f.informers[informerType] = informer

	return informer
}

判断informer是否注册,未注册则注册

5.k8s.io/kubernetes/pkg/controller/informers/core.go

创建SharedIndexInformer用来维护pods列表

func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	sharedIndexInformer := cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options api.ListOptions) (runtime.Object, error) {
				return client.Core().Pods(api.NamespaceAll).List(options)
			},
			WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
				return client.Core().Pods(api.NamespaceAll).Watch(options)
			},
		},
		&api.Pod{},
		resyncPeriod,
		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
	)

	return sharedIndexInformer
}

6.k8s.io/kubernetes/pkg/client/cache/shared_informer.go

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	// 记录崩溃 错误 error信息
	defer utilruntime.HandleCrash()
	//可被进程更改的资源
	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
	//对应controller配置
	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.fullResyncPeriod,
		RetryOnError:     false,

		Process: s.HandleDeltas,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		//生成controller
		s.controller = New(cfg)
		s.started = true
	}()

	s.stopCh = stopCh
	s.processor.run(stopCh)
	s.controller.Run(stopCh)
}

这里执行了两个方法

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	for _, listener := range p.listeners {
		//开启goroutine 获取notification 做出OnUpdate  OnAdd  OnDelete操作
		go listener.run(stopCh)
		//当pendingNotifications为0时判断是否stop,是则推出,不是则返回pendingNotifications加入nextCh交给run处理
		go listener.pop(stopCh)
	}
}

func (c *Controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	r.RunUntil(stopCh)

	wait.Until(c.processLoop, time.Second, stopCh)
}

监控资源

func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
	glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
	go wait.Until(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			utilruntime.HandleError(err)
		}
	}, r.period, stopCh)
}

调用

ListAndWatch先获取rsv 根据rsv监控

watchHandler保证rsv更新



版权声明:本文为yevvzi原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。