Overview
controller-runtime 是 Kubernetes 社区提供可供快速搭建一套 实现了controller 功能的工具,无需自行实现Controller的功能了;在 Kubebuilder
与 Operator SDK
也是使用 controller-runtime
。本文将对 controller-runtime
的工作原理以及在不同场景下的使用方式进行简要的总结和介绍。
controller-runtime structure
controller-runtime
主要组成是需要用户创建的 Manager
和 Reconciler
以及 Controller Runtime
自己启动的 Cache
和 Controller
。
- Manager:是用户在初始化时创建的,用于启动
Controller Runtime
组件 - Reconciler:是用户需要提供来处理自己的业务逻辑的组件(即在通过
code-generator
生成的api-like而实现的controller中的业务处理部分)。 - Cache:一个缓存,用来建立
Informer
到ApiServer
的连接来监听资源并将被监听的对象推送到queue中。 - Controller: 一方面向 Informer 注册
eventHandler
,另一方面从队列中获取数据。controller 将从队列中获取数据并执行用户自定义的Reconciler
功能。
由图可知,Controller会向 Informer 注册一些列eventHandler;然后Cache启动Informer(informer属于cache包中),与ApiServer建立监听;当Informer检测到资源变化时,将对象加入queue,Controller 将元素取出并在用户端执行 Reconciler。
Controller引入
我们从 controller-rumtime项目的 example 进行引入看下,整个架构都是如何实现的。
可以看到 example 下的实际上实现了一个 reconciler
的结构体,实现了 Reconciler
抽象和 Client
结构体
type reconciler struct { client.Client scheme *runtime.Scheme }
那么来看下 抽象的 Reconciler 是什么,可以看到就是抽象了 Reconcile
方法,这个是具体处理的逻辑过程
type Reconciler interface { Reconcile(context.Context, Request) (Result, error) }
下面在看下谁来实现了这个 Reconciler 抽象
type Controller interface { reconcile.Reconciler // 协调的具体步骤,通过ns/name // 通过predicates来评估来源数据,并加入queue中(放入队列的是reconcile.Requests) Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error // 启动controller,类似于自定义的Run() Start(ctx context.Context) error GetLogger() logr.Logger }
controller structure
在 controller-runtimepkginternalcontrollercontroller.go 中实现了这个 Controller
type Controller struct { Name string // controller的标识 MaxConcurrentReconciles int // 并发运行Reconciler的数量,默认1 // 实现了reconcile.Reconciler的调节器, 默认DefaultReconcileFunc Do reconcile.Reconciler // makeQueue会构建一个对应的队列,就是返回一个限速队列 MakeQueue func() workqueue.RateLimitingInterface // MakeQueue创造出来的,在出入队列就是操作的这个 Queue workqueue.RateLimitingInterface // 用于注入其他内容 // 已弃用 SetFields func(i interface{}) error mu sync.Mutex // 标识开始的状态 Started bool // 在启动时传递的上下文,用于停止控制器 ctx context.Context // 等待缓存同步的时间 默认2分钟 CacheSyncTimeout time.Duration // 维护了eventHandler predicates,在控制器启动时启动 startWatches []watchDescription // 日志构建器,输出入日志 LogConstructor func(request *reconcile.Request) logr.Logger // RecoverPanic为是否对reconcile引起的panic恢复 RecoverPanic bool }
看完了controller的structure,接下来看看controller是如何使用的
injection
Controller.Watch 实现了注入的动作,可以看到 watch()
通过参数将 对应的事件函数传入到内部
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error { c.mu.Lock() defer c.mu.Unlock() // 使用SetFields来完成注入操作 if err := c.SetFields(src); err != nil { return err } if err := c.SetFields(evthdler); err != nil { return err } for _, pr := range prct { if err := c.SetFields(pr); err != nil { return err } } // 如果Controller还未启动,那么将这些动作缓存到本地 if !c.Started { c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct}) return nil } c.LogConstructor(nil).Info("Starting EventSource", "source", src) return src.Start(c.ctx, evthdler, c.Queue, prct...) }
启动操作实际上为informer注入事件函数
type Source interface { // start 是Controller 调用,用以向 Informer 注册 EventHandler, 将 reconcile.Requests(一个入队列的动作) 排入队列。 Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error } func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, prct ...predicate.Predicate) error { // Informer should have been specified by the user. if is.Informer == nil { return fmt.Errorf("must specify Informer.Informer") } is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) return nil }
我们知道对于 eventHandler,实际上应该是一个 onAdd
,onUpdate
这种类型的函数,queue则是workqueue,那么 Predicates
是什么呢?
通过追踪可以看到定义了 Predicate 抽象,可以看出Predicate 是Watch到的事件时什么类型的,当对于每个类型的事件,对应的函数就为 true,在 eventHandler 中,这些被用作,事件的过滤。
// Predicate filters events before enqueuing the keys. type Predicate interface { // Create returns true if the Create event should be processed Create(event.CreateEvent) bool // Delete returns true if the Delete event should be processed Delete(event.DeleteEvent) bool // Update returns true if the Update event should be processed Update(event.UpdateEvent) bool // Generic returns true if the Generic event should be processed Generic(event.GenericEvent) bool }
在对应的动作中,可以看到这里作为过滤操作
func (e EventHandler) OnAdd(obj interface{}) { c := event.CreateEvent{} // Pull Object out of the object if o, ok := obj.(client.Object); ok { c.Object = o } else { log.Error(nil, "OnAdd missing Object", "object", obj, "type", fmt.Sprintf("%T", obj)) return } for _, p := range e.Predicates { if !p.Create(c) { return } } // Invoke create handler e.EventHandler.Create(c, e.Queue) }
上面就看到了,对应是 EventHandler.Create
进行添加的,那么这些动作具体是在做什么呢?
在代码 pkg/handler ,可以看到这些操作,类似于create,这里将ns/name放入到队列中。
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return } q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), }}) }
unqueue
上面看到了,入队的动作实际上都是将 ns/name
加入到队列中,那么出队列时又做了些什么呢?
通过 controller.Start()
可以看到controller在启动后都做了些什么动作
func (c *Controller) Start(ctx context.Context) error { c.mu.Lock() if c.Started { return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times") } c.initMetrics() // Set the internal context. c.ctx = ctx c.Queue = c.MakeQueue() // 初始化queue go func() { // 退出时,让queue关闭 <-ctx.Done() c.Queue.ShutDown() }() wg := &sync.WaitGroup{} err := func() error { defer c.mu.Unlock() defer utilruntime.HandleCrash() // 启动informer前,将之前准备好的 evnetHandle predictates source注册 for _, watch := range c.startWatches { c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src)) // 上面我们看过了,start就是真正的注册动作 if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { return err } } // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches c.LogConstructor(nil).Info("Starting Controller") // startWatches上面我们也看到了,是evnetHandle predictates source被缓存到里面, // 这里是拿出来将其启动 for _, watch := range c.startWatches { syncingSource, ok := watch.src.(source.SyncingSource) if !ok { continue } if err := func() error { // use a context with timeout for launching sources and syncing caches. sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) defer cancel() // WaitForSync waits for a definitive timeout, and returns if there // is an error or a timeout if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync") return err } return nil }(); err != nil { return err } } // which won't be garbage collected if we hold a reference to it. c.startWatches = nil // Launch workers to process resources c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles) wg.Add(c.MaxConcurrentReconciles) // 启动controller消费端的线程 for i := 0; i < c.MaxConcurrentReconciles; i++ { go func() { defer wg.Done() for c.processNextWorkItem(ctx) { } }() } c.Started = true return nil }() if err != nil { return err } <-ctx.Done() // 阻塞,直到上下文关闭 c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish") wg.Wait() // 等待所有线程都关闭 c.LogConstructor(nil).Info("All workers finished") return nil }
通过上面的分析,可以看到,每个消费的worker线程,实际上调用的是 processNextWorkItem 下面就来看看他究竟做了些什么?
func (c *Controller) processNextWorkItem(ctx context.Context) bool { obj, shutdown := c.Queue.Get() // 从队列中拿取数据 if shutdown { return false } defer c.Queue.Done(obj) // 下面应该是prometheus指标的一些东西 ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1) defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1) // 获得的对象通过reconcileHandler处理 c.reconcileHandler(ctx, obj) return true }
那么下面看看 reconcileHandler 做了些什么
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { // Update metrics after processing each item reconcileStartTS := time.Now() defer func() { c.updateMetrics(time.Since(reconcileStartTS)) }() // 检查下取出的数据是否为reconcile.Request,在之前enqueue时了解到是插入的这个类型的值 req, ok := obj.(reconcile.Request) if !ok { // 如果错了就忘记 c.Queue.Forget(obj) c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj) return } log := c.LogConstructor(&req) log = log.WithValues("reconcileID", uuid.NewUUID()) ctx = logf.IntoContext(ctx, log) // 这里调用了自己在实现controller实现的Reconcile的动作 result, err := c.Reconcile(ctx, req) switch { case err != nil: c.Queue.AddRateLimited(req) ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc() ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc() log.Error(err, "Reconciler error") case result.RequeueAfter > 0: c.Queue.Forget(obj) c.Queue.AddAfter(req, result.RequeueAfter) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc() case result.Requeue: c.Queue.AddRateLimited(req) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc() default: c.Queue.Forget(obj) ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc() } }
通过对example中的 Reconcile 查找其使用,可以看到,调用他的就是上面我们说道的 reconcileHandler
,到这里我们就知道了,controller 的运行流为 Controller.Start()
> Controller.processNextWorkItem
> Controller.reconcileHandler
> Controller.Reconcile
最终到达了我们自定义的业务逻辑处理 Reconcile
Manager
在上面学习 controller-runtime
时了解到,有一个 Manager
的组件,这个组件是做什么呢?我们来分析下。
Manager
是用来创建与启动 controller
的(允许多个 controller
与 一个 manager
关联),Manager会启动分配给他的所有controller,以及其他可启动的对象。
在 example 看到,会初始化一个 ctrl.NewManager
func main() { ctrl.SetLogger(zap.New()) mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{}) if err != nil { setupLog.Error(err, "unable to start manager") os.Exit(1) } // in a real controller, we'd create a new scheme for this err = api.AddToScheme(mgr.GetScheme()) if err != nil { setupLog.Error(err, "unable to add scheme") os.Exit(1) } err = ctrl.NewControllerManagedBy(mgr). For(&api.ChaosPod{}). Owns(&corev1.Pod{}). Complete(&reconciler{ Client: mgr.GetClient(), scheme: mgr.GetScheme(), }) if err != nil { setupLog.Error(err, "unable to create controller") os.Exit(1) } err = ctrl.NewWebhookManagedBy(mgr). For(&api.ChaosPod{}). Complete() if err != nil { setupLog.Error(err, "unable to create webhook") os.Exit(1) } setupLog.Info("starting manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } }
这个 manager
就是 controller-runtimepkgmanagermanager.go 下的 Manager
, Manager 通过初始化 Caches 和 Clients 等共享依赖,并将它们提供给 Runnables。
type Manager interface { // 提供了与APIServer交互的方式,如incluster,indexer,cache等 cluster.Cluster // Runnable 是任意可允许的cm中的组件,如 webhook,controller,Caches,在new中调用时, // 可以看到是传入的是一个controller,这里可以启动的是带有Start()方法的,通过调用Start() // 来启动组件 Add(Runnable) error // 实现选举方法。当elected关闭,则选举为leader Elected() <-chan struct{} // 这为一些列健康检查和指标的方法,和我们关注的没有太大关系 AddMetricsExtraHandler(path string, handler http.Handler) error AddHealthzCheck(name string, check healthz.Checker) error AddReadyzCheck(name string, check healthz.Checker) error // Start将启动所有注册进来的控制器,直到ctx取消。如果有任意controller报错,则立即退出 // 如果使用了 LeaderElection,则必须在此返回后立即退出二进制文件, Start(ctx context.Context) error // GetWebhookServer returns a webhook.Server GetWebhookServer() *webhook.Server // GetLogger returns this manager's logger. GetLogger() logr.Logger // GetControllerOptions returns controller global configuration options. GetControllerOptions() v1alpha1.ControllerConfigurationSpec }
controller-manager
controllerManager 则实现了这个manager的抽象
type controllerManager struct { sync.Mutex started bool stopProcedureEngaged *int64 errChan chan error runnables *runnables cluster cluster.Cluster // recorderProvider 用于记录eventhandler source predictate recorderProvider *intrec.Provider // resourceLock forms the basis for leader election resourceLock resourcelock.Interface // 在退出时是否关闭选举租约 leaderElectionReleaseOnCancel bool // 一些指标性的,暂时不需要关注 metricsListener net.Listener metricsExtraHandlers map[string]http.Handler healthProbeListener net.Listener readinessEndpointName string livenessEndpointName string readyzHandler *healthz.Handler healthzHandler *healthz.Handler // 有关controller全局参数 controllerOptions v1alpha1.ControllerConfigurationSpec logger logr.Logger // 用于关闭 LeaderElection.Run(...) 的信号 leaderElectionStopped chan struct{} // 取消选举,在失去选举后,必须延迟到gracefulShutdown之后os.exit() leaderElectionCancel context.CancelFunc // leader取消选举 elected chan struct{} port int host string certDir string webhookServer *webhook.Server webhookServerOnce sync.Once // 非leader节点强制leader的等待时间 leaseDuration time.Duration // renewDeadline is the duration that the acting controlplane will retry // refreshing leadership before giving up. renewDeadline time.Duration // LeaderElector重新操作的时间 retryPeriod time.Duration // gracefulShutdownTimeout 是在manager停止之前让runnables停止的持续时间。 gracefulShutdownTimeout time.Duration // onStoppedLeading is callled when the leader election lease is lost. // It can be overridden for tests. onStoppedLeading func() shutdownCtx context.Context internalCtx context.Context internalCancel context.CancelFunc internalProceduresStop chan struct{} }
workflow
了解完ControllerManager之后,我们通过 example 来看看 ControllerManager 的workflow
func main() { ctrl.SetLogger(zap.New()) // New一个manager mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{}) if err != nil { setupLog.Error(err, "unable to start manager") os.Exit(1) } // in a real controller, we'd create a new scheme for this err = api.AddToScheme(mgr.GetScheme()) if err != nil { setupLog.Error(err, "unable to add scheme") os.Exit(1) } err = ctrl.NewControllerManagedBy(mgr). For(&api.ChaosPod{}). Owns(&corev1.Pod{}). Complete(&reconciler{ Client: mgr.GetClient(), scheme: mgr.GetScheme(), }) if err != nil { setupLog.Error(err, "unable to create controller") os.Exit(1) } err = ctrl.NewWebhookManagedBy(mgr). For(&api.ChaosPod{}). Complete() if err != nil { setupLog.Error(err, "unable to create webhook") os.Exit(1) } setupLog.Info("starting manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } }
- 通过
manager.New()
初始化一个manager,这里面会初始化一些列的manager的参数 - 通过
ctrl.NewControllerManagedBy
注册 controller 到manager中ctrl.NewControllerManagedBy
是 builder的一个别名,构建出一个builder类型的controllerbuilder
中的ctrl
就是 controller
- 启动manager
builder
下面看来看下builder在构建时做了什么
// Builder builds a Controller. type Builder struct { forInput ForInput ownsInput []OwnsInput watchesInput []WatchesInput mgr manager.Manager globalPredicates []predicate.Predicate ctrl controller.Controller ctrlOptions controller.Options name string }
我们看到 example 中是调用了 For()
动作,那么这个 For()
是什么呢?
通过注释,我们可以看到 For() 提供了 调解对象类型,ControllerManagedBy 通过 reconciling object 来相应对应create/delete/update
事件。调用 For()
相当于调用了 Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{})
。
func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder { if blder.forInput.object != nil { blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation") return blder } input := ForInput{object: object} for _, opt := range opts { opt.ApplyToFor(&input) //最终把我们要监听的对象每个 opts注册进去 } blder.forInput = input return blder }
接下来是调用的 Owns() ,Owns()
看起来和 For()
功能是类似的。只是说属于不同,是通过Owns方法设置的
func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder { input := OwnsInput{object: object} for _, opt := range opts { opt.ApplyToOwns(&input) } blder.ownsInput = append(blder.ownsInput, input) return blder }
最后到了 Complete(),Complete
是完成这个controller的构建
// Complete builds the Application Controller. func (blder *Builder) Complete(r reconcile.Reconciler) error { _, err := blder.Build(r) return err } // Build 创建控制器并返回 func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) { if r == nil { return nil, fmt.Errorf("must provide a non-nil Reconciler") } if blder.mgr == nil { return nil, fmt.Errorf("must provide a non-nil Manager") } if blder.forInput.err != nil { return nil, blder.forInput.err } // Checking the reconcile type exist or not if blder.forInput.object == nil { return nil, fmt.Errorf("must provide an object for reconciliation") } // Set the ControllerManagedBy if err := blder.doController(r); err != nil { return nil, err } // Set the Watch if err := blder.doWatch(); err != nil { return nil, err } return blder.ctrl, nil }
这里面可以看到,会完成 doController 和 doWatch
doController会初始化好这个controller并返回
func (blder *Builder) doController(r reconcile.Reconciler) error { globalOpts := blder.mgr.GetControllerOptions() ctrlOptions := blder.ctrlOptions if ctrlOptions.Reconciler == nil { ctrlOptions.Reconciler = r } // 通过检索GVK获得默认的名称 gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme()) if err != nil { return err } // 设置并发,如果最大并发为0则找到一个 // 追踪下去看似是对于没有设置时,例如会根据 app group中的 ReplicaSet设定 // 就是在For()传递的一个类型的数量来确定并发的数量 if ctrlOptions.MaxConcurrentReconciles == 0 { groupKind := gvk.GroupKind().String() if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 { ctrlOptions.MaxConcurrentReconciles = concurrency } } // Setup cache sync timeout. if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil { ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout } // 给controller一个name,如果没有初始化传递,则使用Kind做名称 controllerName := blder.getControllerName(gvk) // Setup the logger. if ctrlOptions.LogConstructor == nil { log := blder.mgr.GetLogger().WithValues( "controller", controllerName, "controllerGroup", gvk.Group, "controllerKind", gvk.Kind, ) lowerCamelCaseKind := strings.ToLower(gvk.Kind[:1]) + gvk.Kind[1:] ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger { log := log if req != nil { log = log.WithValues( lowerCamelCaseKind, klog.KRef(req.Namespace, req.Name), "namespace", req.Namespace, "name", req.Name, ) } return log } } // 这里就是构建一个新的控制器了,也就是前面说到的 manager.New() blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions) return err }
start Manager
接下来是manager的启动,也就是对应的 start()
与 doWatch()
通过下述代码我们可以看出来,对于 doWatch()
就是把 compete()
前的一些资源的事件函数都注入到controller 中
func (blder *Builder) doWatch() error { // 调解类型,这也也就是对于For的obj来说,我们需要的是什么结构的,如非结构化数据或metadata-only // metadata-only就是配置成一个GVK schema.GroupVersionKind typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) if err != nil { return err }&source.Kind{} // 一些准备工作,将对象封装为&source.Kind{} // src := &source.Kind{Type: typeForSrc} hdler := &handler.EnqueueRequestForObject{} // 就是包含obj的一个事件队列 allPredicates := append(blder.globalPredicates, blder.forInput.predicates...) // 这里又到之前说过的controller watch了 // 将一系列的准备动作注入到cache 如 source eventHandler predicate if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } // 再重复 ownsInput 动作 for _, own := range blder.ownsInput { typeForSrc, err := blder.project(own.object, own.objectProjection) if err != nil { return err } src := &source.Kind{Type: typeForSrc} hdler := &handler.EnqueueRequestForOwner{ OwnerType: blder.forInput.object, IsController: true, } allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } } // 在对 ownsInput 进行重复的操作 for _, w := range blder.watchesInput { allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, w.predicates...) // If the source of this watch is of type *source.Kind, project it. if srckind, ok := w.src.(*source.Kind); ok { typeForSrc, err := blder.project(srckind.Type, w.objectProjection) if err != nil { return err } srckind.Type = typeForSrc } if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil { return err } } return nil }
由于前两部 builder
的操作将 mgr 指针传入到 builder中,并且操作了 complete()
,也就是操作了 build()
,这代表了对 controller
完成了初始化,和事件注入(watch
)的操作,所以 Start(),就是将controller启动
func (cm *controllerManager) Start(ctx context.Context) (err error) { cm.Lock() if cm.started { cm.Unlock() return errors.New("manager already started") } var ready bool defer func() { if !ready { cm.Unlock() } }() // Initialize the internal context. cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) // 这个channel代表了controller的停止 stopComplete := make(chan struct{}) defer close(stopComplete) // This must be deferred after closing stopComplete, otherwise we deadlock. defer func() { stopErr := cm.engageStopProcedure(stopComplete) if stopErr != nil { if err != nil { err = kerrors.NewAggregate([]error{err, stopErr}) } else { err = stopErr } } }() // Add the cluster runnable. if err := cm.add(cm.cluster); err != nil { return fmt.Errorf("failed to add cluster to runnables: %w", err) } // 指标类 if cm.metricsListener != nil { cm.serveMetrics() } if cm.healthProbeListener != nil { cm.serveHealthProbes() } if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } // 等待informer同步完成 if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } // 非选举模式,runnable将在cache同步完成后启动 if err := cm.runnables.Others.Start(cm.internalCtx); err != nil { if !errors.Is(err, wait.ErrWaitTimeout) { return err } } // Start the leader election and all required runnables. { ctx, cancel := context.WithCancel(context.Background()) cm.leaderElectionCancel = cancel go func() { if cm.resourceLock != nil { if err := cm.startLeaderElection(ctx); err != nil { cm.errChan <- err } } else { // Treat not having leader election enabled the same as being elected. if err := cm.startLeaderElectionRunnables(); err != nil { cm.errChan <- err } close(cm.elected) } }() } ready = true cm.Unlock() select { case <-ctx.Done(): // We are done return nil case err := <-cm.errChan: // Error starting or running a runnable return err } }
可以看到上面启动了4种类型的runnable,实际上就是对这runnable进行启动,例如 controller,cache等。
回顾一下,我们之前在使用code-generator
生成,并自定义controller时,我们也是通过启动 informer.Start()
,否则会报错。
最后可以通过一张关系图来表示,client-go与controller-manager之间的关系
Reference