1.main函数k8s.io/kubernetes/cmd/kube-controller-manager/controller-manager.go
注册健康检查到http.DefaultServeMux
func main() {
// 根据默认配置创建cm对象
s := options.NewCMServer()
// 添加flag
s.AddFlags(pflag.CommandLine)Bı∫
flag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
//是否为version参数
verflag.PrintAndExitIfRequested()
//运行
if err := app.Run(s); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
2.k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go
func Run(s *options.CMServer) error {
// 创建对应的config,加入到configs
if c, err := configz.New("componentconfig"); err == nil {
// 为对应的config设置config.val值
c.Set(s.KubeControllerManagerConfiguration)
} else {
glog.Errorf("unable to register configz: %s", err)
}
//根据kubeconfig和master的url地址生成最终的restclient.Config
kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
if err != nil {
return err
}
kubeconfig.ContentConfig.ContentType = s.ContentType
// Override kubeconfig qps/burst settings from flags
kubeconfig.QPS = s.KubeAPIQPS
kubeconfig.Burst = int(s.KubeAPIBurst)
// 创建Clientset
kubeClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "controller-manager"))
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "leader-election"))
//开启服务
go func() {
mux := http.NewServeMux()
healthz.InstallHandler(mux)
if s.EnableProfiling {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
}
configz.InstallHandler(mux)
mux.Handle("/metrics", prometheus.Handler())
server := &http.Server{
Addr: net.JoinHostPort(s.Address, strconv.Itoa(int(s.Port))),
Handler: mux,
}
glog.Fatal(server.ListenAndServe())
}()
//事件播音员 Broadcaster发送事件到EventSink,EventSink存储事件
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// 发送event到指定的EventSink,
// &unversionedcore.EventSinkImpl{Interface:&events{kubeClient: c,ns:"",}}
// 发送event到指定的Sink
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
// 创建新的接收器,从给定的event source中记录event
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controller-manager"})
// 定义启动方法
run := func(stop <-chan struct{}) {
// 统一客户端 对于不同的用户客户端
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: kubeconfig,
}
var clientBuilder controller.ControllerClientBuilder
if len(s.ServiceAccountKeyFile) > 0 {
clientBuilder = controller.SAControllerClientBuilder{
ClientConfig: restclient.AnonymousClientConfig(kubeconfig),
CoreClient: kubeClient.Core(),
Namespace: "kube-system",
}
} else {
clientBuilder = rootClientBuilder
}
err := StartControllers(s, kubeconfig, rootClientBuilder, clientBuilder, stop, recorder)
glog.Fatalf("error running controllers: %v", err)
panic("unreachable")
}
//判断是否开启选举
if !s.LeaderElection.LeaderElect {
run(nil)
panic("unreachable")
}
id, err := os.Hostname()
if err != nil {
return err
}
// TODO: enable other lock types
rl := resourcelock.EndpointsLock{
EndpointsMeta: api.ObjectMeta{
Namespace: "kube-system",
Name: "kube-controller-manager",
},
Client: leaderElectionClient,
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
},
}
//根据配置选举leader,以及选举 失败/成功对应的执行方法
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: &rl,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
})
panic("unreachable")
}
在run方法中实际上调用了sharedInformers
在StartControllers中在初始化了SharedInformerFactory后启动了EndpointController、ReplicationManager、PodGCController
nodeController、serviceController、routeController、resourceQuotaController、namespaceController
orizontalController、DaemonSetsController、obController、DeploymentController、ReplicaSetController、DisruptionController
PetSetController、ScheduledJobController、persistentvolumecontroller、AttachDetachController、CertificateController
ServiceAccountsController、GarbageCollector
3.k8s.io/kubernetes/pkg/controller/informers/factory.go
调用了start方法
informers通过应资源的Informer() cache.SharedIndexInformer方法注册
func (s *sharedInformerFactory) Start(stopCh <-chan struct{}) {
s.lock.Lock()
defer s.lock.Unlock()
for informerType, informer := range s.informers {
if !s.startedInformers[informerType] {
// sharedIndexInformer.run
go informer.Run(stopCh)
s.startedInformers[informerType] = true
}
}
}
最后可以看到遍历informers,调用对影informer的run方法,以podInformer为例
4.k8s.io/kubernetes/pkg/controller/informers/core.go
func (f *podInformer) Informer() cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(&api.Pod{})
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = NewPodInformer(f.client, f.defaultResync)
f.informers[informerType] = informer
return informer
}
判断informer是否注册,未注册则注册
5.k8s.io/kubernetes/pkg/controller/informers/core.go
创建SharedIndexInformer用来维护pods列表
func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
sharedIndexInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
return sharedIndexInformer
}
6.k8s.io/kubernetes/pkg/client/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
// 记录崩溃 错误 error信息
defer utilruntime.HandleCrash()
//可被进程更改的资源
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
//对应controller配置
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.fullResyncPeriod,
RetryOnError: false,
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
//生成controller
s.controller = New(cfg)
s.started = true
}()
s.stopCh = stopCh
s.processor.run(stopCh)
s.controller.Run(stopCh)
}
这里执行了两个方法
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
for _, listener := range p.listeners {
//开启goroutine 获取notification 做出OnUpdate OnAdd OnDelete操作
go listener.run(stopCh)
//当pendingNotifications为0时判断是否stop,是则推出,不是则返回pendingNotifications加入nextCh交给run处理
go listener.pop(stopCh)
}
}
func (c *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
r.RunUntil(stopCh)
wait.Until(c.processLoop, time.Second, stopCh)
}
监控资源
func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
go wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}
调用
ListAndWatch先获取rsv 根据rsv监控
watchHandler保证rsv更新