深入解析kubernetes controller-runtime

Overview

controller-runtime 是 Kubernetes 社区提供可供快速搭建一套 实现了controller 功能的工具,无需自行实现Controller的功能了;在 KubebuilderOperator SDK 也是使用 controller-runtime 。本文将对 controller-runtime 的工作原理以及在不同场景下的使用方式进行简要的总结和介绍。

controller-runtime structure

controller-runtime 主要组成是需要用户创建的 ManagerReconciler 以及 Controller Runtime 自己启动的 CacheController

  • Manager:是用户在初始化时创建的,用于启动 Controller Runtime 组件
  • Reconciler:是用户需要提供来处理自己的业务逻辑的组件(即在通过 code-generator 生成的api-like而实现的controller中的业务处理部分)。
  • Cache:一个缓存,用来建立 InformerApiServer 的连接来监听资源并将被监听的对象推送到queue中。
  • Controller: 一方面向 Informer 注册 eventHandler,另一方面从队列中获取数据。controller 将从队列中获取数据并执行用户自定义的 Reconciler 功能。

深入解析kubernetes controller-runtime

图:controller-runtime structure

深入解析kubernetes controller-runtime

图:controller-runtime flowchart

由图可知,Controller会向 Informer 注册一些列eventHandler;然后Cache启动Informer(informer属于cache包中),与ApiServer建立监听;当Informer检测到资源变化时,将对象加入queue,Controller 将元素取出并在用户端执行 Reconciler。

Controller引入

我们从 controller-rumtime项目的 example 进行引入看下,整个架构都是如何实现的。

可以看到 example 下的实际上实现了一个 reconciler 的结构体,实现了 Reconciler 抽象和 Client 结构体

type reconciler struct { 	client.Client 	scheme *runtime.Scheme } 

那么来看下 抽象的 Reconciler 是什么,可以看到就是抽象了 Reconcile 方法,这个是具体处理的逻辑过程

type Reconciler interface { 	Reconcile(context.Context, Request) (Result, error) } 

下面在看下谁来实现了这个 Reconciler 抽象

type Controller interface { 	reconcile.Reconciler // 协调的具体步骤,通过ns/name     // 通过predicates来评估来源数据,并加入queue中(放入队列的是reconcile.Requests) 	Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error     // 启动controller,类似于自定义的Run() 	Start(ctx context.Context) error 	GetLogger() logr.Logger } 

controller structure

controller-runtimepkginternalcontrollercontroller.go 中实现了这个 Controller

type Controller struct { 	Name string // controller的标识      	MaxConcurrentReconciles int // 并发运行Reconciler的数量,默认1 	// 实现了reconcile.Reconciler的调节器, 默认DefaultReconcileFunc 	Do reconcile.Reconciler 	// makeQueue会构建一个对应的队列,就是返回一个限速队列 	MakeQueue func() workqueue.RateLimitingInterface 	// MakeQueue创造出来的,在出入队列就是操作的这个 	Queue workqueue.RateLimitingInterface  	// 用于注入其他内容     // 已弃用 	SetFields func(i interface{}) error  	mu sync.Mutex 	// 标识开始的状态 	Started bool 	// 在启动时传递的上下文,用于停止控制器 	ctx context.Context 	// 等待缓存同步的时间 默认2分钟 	CacheSyncTimeout time.Duration  	// 维护了eventHandler predicates,在控制器启动时启动 	startWatches []watchDescription  	// 日志构建器,输出入日志 	LogConstructor func(request *reconcile.Request) logr.Logger  	// RecoverPanic为是否对reconcile引起的panic恢复 	RecoverPanic bool } 

看完了controller的structure,接下来看看controller是如何使用的

injection

Controller.Watch 实现了注入的动作,可以看到 watch() 通过参数将 对应的事件函数传入到内部

func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error { 	c.mu.Lock() 	defer c.mu.Unlock()  	// 使用SetFields来完成注入操作 	if err := c.SetFields(src); err != nil { 		return err 	} 	if err := c.SetFields(evthdler); err != nil { 		return err 	} 	for _, pr := range prct { 		if err := c.SetFields(pr); err != nil { 			return err 		} 	}  	// 如果Controller还未启动,那么将这些动作缓存到本地 	if !c.Started { 		c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct}) 		return nil 	}  	c.LogConstructor(nil).Info("Starting EventSource", "source", src) 	return src.Start(c.ctx, evthdler, c.Queue, prct...) } 

启动操作实际上为informer注入事件函数

type Source interface { 	// start 是Controller 调用,用以向 Informer 注册 EventHandler, 将 reconcile.Requests(一个入队列的动作) 排入队列。 	Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error }  func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface, 	prct ...predicate.Predicate) error { 	// Informer should have been specified by the user. 	if is.Informer == nil { 		return fmt.Errorf("must specify Informer.Informer") 	}  	is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) 	return nil } 

我们知道对于 eventHandler,实际上应该是一个 onAddonUpdate 这种类型的函数,queue则是workqueue,那么 Predicates 是什么呢?

通过追踪可以看到定义了 Predicate 抽象,可以看出Predicate 是Watch到的事件时什么类型的,当对于每个类型的事件,对应的函数就为 true,在 eventHandler 中,这些被用作,事件的过滤。

// Predicate filters events before enqueuing the keys. type Predicate interface { 	// Create returns true if the Create event should be processed 	Create(event.CreateEvent) bool  	// Delete returns true if the Delete event should be processed 	Delete(event.DeleteEvent) bool  	// Update returns true if the Update event should be processed 	Update(event.UpdateEvent) bool  	// Generic returns true if the Generic event should be processed 	Generic(event.GenericEvent) bool } 

在对应的动作中,可以看到这里作为过滤操作

func (e EventHandler) OnAdd(obj interface{}) { 	c := event.CreateEvent{}  	// Pull Object out of the object 	if o, ok := obj.(client.Object); ok { 		c.Object = o 	} else { 		log.Error(nil, "OnAdd missing Object", 			"object", obj, "type", fmt.Sprintf("%T", obj)) 		return 	}  	for _, p := range e.Predicates { 		if !p.Create(c) { 			return 		} 	}  	// Invoke create handler 	e.EventHandler.Create(c, e.Queue) } 

上面就看到了,对应是 EventHandler.Create 进行添加的,那么这些动作具体是在做什么呢?

在代码 pkg/handler ,可以看到这些操作,类似于create,这里将ns/name放入到队列中。

func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { 	if evt.Object == nil { 		enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) 		return 	} 	q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ 		Name:      evt.Object.GetName(), 		Namespace: evt.Object.GetNamespace(), 	}}) } 

unqueue

上面看到了,入队的动作实际上都是将 ns/name 加入到队列中,那么出队列时又做了些什么呢?

通过 controller.Start() 可以看到controller在启动后都做了些什么动作

func (c *Controller) Start(ctx context.Context) error { 	c.mu.Lock() 	if c.Started { 		return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times") 	}  	c.initMetrics()  	// Set the internal context. 	c.ctx = ctx  	c.Queue = c.MakeQueue() // 初始化queue 	go func() { // 退出时,让queue关闭 		<-ctx.Done() 		c.Queue.ShutDown() 	}()  	wg := &sync.WaitGroup{} 	err := func() error { 		defer c.mu.Unlock() 		defer utilruntime.HandleCrash()  		// 启动informer前,将之前准备好的 evnetHandle predictates source注册 		for _, watch := range c.startWatches { 			c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src)) 				// 上面我们看过了,start就是真正的注册动作 			if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { 				return err 			} 		}  		// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches 		c.LogConstructor(nil).Info("Starting Controller") 		 // startWatches上面我们也看到了,是evnetHandle predictates source被缓存到里面,         // 这里是拿出来将其启动 		for _, watch := range c.startWatches { 			syncingSource, ok := watch.src.(source.SyncingSource) 			if !ok { 				continue 			}  			if err := func() error { 				// use a context with timeout for launching sources and syncing caches. 				sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) 				defer cancel()  				// WaitForSync waits for a definitive timeout, and returns if there 				// is an error or a timeout 				if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { 					err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) 					c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync") 					return err 				}  				return nil 			}(); err != nil { 				return err 			} 		}  		// which won't be garbage collected if we hold a reference to it. 		c.startWatches = nil  		// Launch workers to process resources 		c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles) 		wg.Add(c.MaxConcurrentReconciles)         // 启动controller消费端的线程 		for i := 0; i < c.MaxConcurrentReconciles; i++ { 			go func() { 				defer wg.Done() 				for c.processNextWorkItem(ctx) { 				} 			}() 		}  		c.Started = true 		return nil 	}() 	if err != nil { 		return err 	}  	<-ctx.Done() // 阻塞,直到上下文关闭 	c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish") 	wg.Wait() // 等待所有线程都关闭 	c.LogConstructor(nil).Info("All workers finished") 	return nil } 

通过上面的分析,可以看到,每个消费的worker线程,实际上调用的是 processNextWorkItem 下面就来看看他究竟做了些什么?

func (c *Controller) processNextWorkItem(ctx context.Context) bool { 	obj, shutdown := c.Queue.Get() // 从队列中拿取数据 	if shutdown { 		return false 	}  	defer c.Queue.Done(obj) 	// 下面应该是prometheus指标的一些东西 	ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1) 	defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1) 	// 获得的对象通过reconcileHandler处理 	c.reconcileHandler(ctx, obj) 	return true } 

那么下面看看 reconcileHandler 做了些什么

func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) { 	// Update metrics after processing each item 	reconcileStartTS := time.Now() 	defer func() { 		c.updateMetrics(time.Since(reconcileStartTS)) 	}()  	// 检查下取出的数据是否为reconcile.Request,在之前enqueue时了解到是插入的这个类型的值 	req, ok := obj.(reconcile.Request) 	if !ok { 		// 如果错了就忘记 		c.Queue.Forget(obj) 		c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj) 		return 	}  	log := c.LogConstructor(&req)  	log = log.WithValues("reconcileID", uuid.NewUUID()) 	ctx = logf.IntoContext(ctx, log)  	// 这里调用了自己在实现controller实现的Reconcile的动作 	result, err := c.Reconcile(ctx, req) 	switch { 	case err != nil: 		c.Queue.AddRateLimited(req) 		ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc() 		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc() 		log.Error(err, "Reconciler error") 	case result.RequeueAfter > 0: 		c.Queue.Forget(obj) 		c.Queue.AddAfter(req, result.RequeueAfter) 		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc() 	case result.Requeue: 		c.Queue.AddRateLimited(req) 		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc() 	default: 		c.Queue.Forget(obj) 		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc() 	} } 

通过对example中的 Reconcile 查找其使用,可以看到,调用他的就是上面我们说道的 reconcileHandler ,到这里我们就知道了,controller 的运行流为 Controller.Start() > Controller.processNextWorkItem > Controller.reconcileHandler > Controller.Reconcile 最终到达了我们自定义的业务逻辑处理 Reconcile

深入解析kubernetes controller-runtime

Manager

在上面学习 controller-runtime 时了解到,有一个 Manager 的组件,这个组件是做什么呢?我们来分析下。

Manager 是用来创建与启动 controller 的(允许多个 controller 与 一个 manager 关联),Manager会启动分配给他的所有controller,以及其他可启动的对象。

example 看到,会初始化一个 ctrl.NewManager

func main() {    ctrl.SetLogger(zap.New())     mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})    if err != nil {       setupLog.Error(err, "unable to start manager")       os.Exit(1)    }     // in a real controller, we'd create a new scheme for this    err = api.AddToScheme(mgr.GetScheme())    if err != nil {       setupLog.Error(err, "unable to add scheme")       os.Exit(1)    }     err = ctrl.NewControllerManagedBy(mgr).       For(&api.ChaosPod{}).       Owns(&corev1.Pod{}).       Complete(&reconciler{          Client: mgr.GetClient(),          scheme: mgr.GetScheme(),       })    if err != nil {       setupLog.Error(err, "unable to create controller")       os.Exit(1)    }     err = ctrl.NewWebhookManagedBy(mgr).       For(&api.ChaosPod{}).       Complete()    if err != nil {       setupLog.Error(err, "unable to create webhook")       os.Exit(1)    }     setupLog.Info("starting manager")    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {       setupLog.Error(err, "problem running manager")       os.Exit(1)    } } 

这个 manager 就是 controller-runtimepkgmanagermanager.go 下的 Manager, Manager 通过初始化 Caches 和 Clients 等共享依赖,并将它们提供给 Runnables。

type Manager interface { 	// 提供了与APIServer交互的方式,如incluster,indexer,cache等 	cluster.Cluster      // Runnable 是任意可允许的cm中的组件,如 webhook,controller,Caches,在new中调用时,     // 可以看到是传入的是一个controller,这里可以启动的是带有Start()方法的,通过调用Start()     // 来启动组件     Add(Runnable) error          // 实现选举方法。当elected关闭,则选举为leader 	Elected() <-chan struct{}  	// 这为一些列健康检查和指标的方法,和我们关注的没有太大关系 	AddMetricsExtraHandler(path string, handler http.Handler) error 	AddHealthzCheck(name string, check healthz.Checker) error 	AddReadyzCheck(name string, check healthz.Checker) error  	// Start将启动所有注册进来的控制器,直到ctx取消。如果有任意controller报错,则立即退出     // 如果使用了 LeaderElection,则必须在此返回后立即退出二进制文件, 	Start(ctx context.Context) error  	// GetWebhookServer returns a webhook.Server 	GetWebhookServer() *webhook.Server  	// GetLogger returns this manager's logger. 	GetLogger() logr.Logger  	// GetControllerOptions returns controller global configuration options. 	GetControllerOptions() v1alpha1.ControllerConfigurationSpec } 

controller-manager

controllerManager 则实现了这个manager的抽象

type controllerManager struct { 	sync.Mutex 	started bool  	stopProcedureEngaged *int64 	errChan              chan error 	runnables            *runnables 	 	cluster cluster.Cluster  	// recorderProvider 用于记录eventhandler source predictate 	recorderProvider *intrec.Provider  	// resourceLock forms the basis for leader election 	resourceLock resourcelock.Interface  	// 在退出时是否关闭选举租约 	leaderElectionReleaseOnCancel bool 	// 一些指标性的,暂时不需要关注 	metricsListener net.Listener 	metricsExtraHandlers map[string]http.Handler 	healthProbeListener net.Listener 	readinessEndpointName string 	livenessEndpointName string 	readyzHandler *healthz.Handler 	healthzHandler *healthz.Handler  	// 有关controller全局参数 	controllerOptions v1alpha1.ControllerConfigurationSpec  	logger logr.Logger  	// 用于关闭 LeaderElection.Run(...) 的信号 	leaderElectionStopped chan struct{}      // 取消选举,在失去选举后,必须延迟到gracefulShutdown之后os.exit() 	leaderElectionCancel context.CancelFunc  	// leader取消选举 	elected chan struct{}  	port int 	host string 	certDir string 	webhookServer *webhook.Server 	webhookServerOnce sync.Once 	// 非leader节点强制leader的等待时间 	leaseDuration time.Duration 	// renewDeadline is the duration that the acting controlplane will retry 	// refreshing leadership before giving up. 	renewDeadline time.Duration 	// LeaderElector重新操作的时间 	retryPeriod time.Duration 	// gracefulShutdownTimeout 是在manager停止之前让runnables停止的持续时间。 	gracefulShutdownTimeout time.Duration  	// onStoppedLeading is callled when the leader election lease is lost. 	// It can be overridden for tests. 	onStoppedLeading func()  	shutdownCtx context.Context 	internalCtx    context.Context 	internalCancel context.CancelFunc 	internalProceduresStop chan struct{} } 

workflow

了解完ControllerManager之后,我们通过 example 来看看 ControllerManager 的workflow

func main() {    ctrl.SetLogger(zap.New())    // New一个manager    mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})    if err != nil {       setupLog.Error(err, "unable to start manager")       os.Exit(1)    }     // in a real controller, we'd create a new scheme for this    err = api.AddToScheme(mgr.GetScheme())    if err != nil {       setupLog.Error(err, "unable to add scheme")       os.Exit(1)    }     err = ctrl.NewControllerManagedBy(mgr).       For(&api.ChaosPod{}).       Owns(&corev1.Pod{}).       Complete(&reconciler{          Client: mgr.GetClient(),          scheme: mgr.GetScheme(),       })    if err != nil {       setupLog.Error(err, "unable to create controller")       os.Exit(1)    }     err = ctrl.NewWebhookManagedBy(mgr).       For(&api.ChaosPod{}).       Complete()    if err != nil {       setupLog.Error(err, "unable to create webhook")       os.Exit(1)    }     setupLog.Info("starting manager")    if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {       setupLog.Error(err, "problem running manager")       os.Exit(1)    } } 
  • 通过 manager.New() 初始化一个manager,这里面会初始化一些列的manager的参数
  • 通过 ctrl.NewControllerManagedBy 注册 controller 到manager中
    • ctrl.NewControllerManagedBy 是 builder的一个别名,构建出一个builder类型的controller
    • builder 中的 ctrl 就是 controller
  • 启动manager

builder

下面看来看下builder在构建时做了什么

// Builder builds a Controller. type Builder struct { 	forInput         ForInput 	ownsInput        []OwnsInput 	watchesInput     []WatchesInput 	mgr              manager.Manager 	globalPredicates []predicate.Predicate 	ctrl             controller.Controller 	ctrlOptions      controller.Options 	name             string } 

我们看到 example 中是调用了 For() 动作,那么这个 For() 是什么呢?

通过注释,我们可以看到 For() 提供了 调解对象类型,ControllerManagedBy 通过 reconciling object 来相应对应create/delete/update 事件。调用 For() 相当于调用了 Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{})

func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder { 	if blder.forInput.object != nil { 		blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation") 		return blder 	} 	input := ForInput{object: object} 	for _, opt := range opts { 		opt.ApplyToFor(&input) //最终把我们要监听的对象每个 opts注册进去 	}  	blder.forInput = input 	return blder } 

接下来是调用的 Owns()Owns() 看起来和 For() 功能是类似的。只是说属于不同,是通过Owns方法设置的

func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder { 	input := OwnsInput{object: object} 	for _, opt := range opts { 		opt.ApplyToOwns(&input) 	}  	blder.ownsInput = append(blder.ownsInput, input) 	return blder } 

最后到了 Complete(),Complete 是完成这个controller的构建

// Complete builds the Application Controller. func (blder *Builder) Complete(r reconcile.Reconciler) error { 	_, err := blder.Build(r) 	return err }  // Build 创建控制器并返回 func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) { 	if r == nil { 		return nil, fmt.Errorf("must provide a non-nil Reconciler") 	} 	if blder.mgr == nil { 		return nil, fmt.Errorf("must provide a non-nil Manager") 	} 	if blder.forInput.err != nil { 		return nil, blder.forInput.err 	} 	// Checking the reconcile type exist or not 	if blder.forInput.object == nil { 		return nil, fmt.Errorf("must provide an object for reconciliation") 	}  	// Set the ControllerManagedBy 	if err := blder.doController(r); err != nil { 		return nil, err 	}  	// Set the Watch 	if err := blder.doWatch(); err != nil { 		return nil, err 	}  	return blder.ctrl, nil } 

这里面可以看到,会完成 doController 和 doWatch

doController会初始化好这个controller并返回

func (blder *Builder) doController(r reconcile.Reconciler) error { 	globalOpts := blder.mgr.GetControllerOptions()  	ctrlOptions := blder.ctrlOptions 	if ctrlOptions.Reconciler == nil { 		ctrlOptions.Reconciler = r 	}  	// 通过检索GVK获得默认的名称 	gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme()) 	if err != nil { 		return err 	}  	// 设置并发,如果最大并发为0则找到一个     // 追踪下去看似是对于没有设置时,例如会根据 app group中的 ReplicaSet设定     // 就是在For()传递的一个类型的数量来确定并发的数量 	if ctrlOptions.MaxConcurrentReconciles == 0 { 		groupKind := gvk.GroupKind().String()  		if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 { 			ctrlOptions.MaxConcurrentReconciles = concurrency 		} 	}  	// Setup cache sync timeout. 	if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil { 		ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout 	} 	// 给controller一个name,如果没有初始化传递,则使用Kind做名称 	controllerName := blder.getControllerName(gvk)  	// Setup the logger. 	if ctrlOptions.LogConstructor == nil { 		log := blder.mgr.GetLogger().WithValues( 			"controller", controllerName, 			"controllerGroup", gvk.Group, 			"controllerKind", gvk.Kind, 		)  		lowerCamelCaseKind := strings.ToLower(gvk.Kind[:1]) + gvk.Kind[1:]  		ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger { 			log := log 			if req != nil { 				log = log.WithValues( 					lowerCamelCaseKind, klog.KRef(req.Namespace, req.Name), 					"namespace", req.Namespace, "name", req.Name, 				) 			} 			return log 		} 	}  	// 这里就是构建一个新的控制器了,也就是前面说到的  manager.New() 	blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions) 	return err } 

manager.New()

start Manager

接下来是manager的启动,也就是对应的 start()doWatch()

通过下述代码我们可以看出来,对于 doWatch() 就是把 compete() 前的一些资源的事件函数都注入到controller 中

func (blder *Builder) doWatch() error { 	// 调解类型,这也也就是对于For的obj来说,我们需要的是什么结构的,如非结构化数据或metadata-only     // metadata-only就是配置成一个GVK schema.GroupVersionKind 	typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) 	if err != nil { 		return err     }&source.Kind{}     // 一些准备工作,将对象封装为&source.Kind{}     //  	src := &source.Kind{Type: typeForSrc} 	hdler := &handler.EnqueueRequestForObject{} // 就是包含obj的一个事件队列 	allPredicates := append(blder.globalPredicates, blder.forInput.predicates...) 	// 这里又到之前说过的controller watch了     // 将一系列的准备动作注入到cache 如 source eventHandler predicate     if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { 		return err 	}  	// 再重复 ownsInput 动作 	for _, own := range blder.ownsInput { 		typeForSrc, err := blder.project(own.object, own.objectProjection) 		if err != nil { 			return err 		} 		src := &source.Kind{Type: typeForSrc} 		hdler := &handler.EnqueueRequestForOwner{ 			OwnerType:    blder.forInput.object, 			IsController: true, 		} 		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) 		allPredicates = append(allPredicates, own.predicates...) 		if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { 			return err 		} 	}  	// 在对 ownsInput 进行重复的操作 	for _, w := range blder.watchesInput { 		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) 		allPredicates = append(allPredicates, w.predicates...)  		// If the source of this watch is of type *source.Kind, project it. 		if srckind, ok := w.src.(*source.Kind); ok { 			typeForSrc, err := blder.project(srckind.Type, w.objectProjection) 			if err != nil { 				return err 			} 			srckind.Type = typeForSrc 		}  		if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil { 			return err 		} 	} 	return nil } 

由于前两部 builder 的操作将 mgr 指针传入到 builder中,并且操作了 complete() ,也就是操作了 build() ,这代表了对 controller 完成了初始化,和事件注入(watch)的操作,所以 Start(),就是将controller启动

func (cm *controllerManager) Start(ctx context.Context) (err error) { 	cm.Lock() 	if cm.started { 		cm.Unlock() 		return errors.New("manager already started") 	} 	var ready bool 	defer func() { 		if !ready { 			cm.Unlock() 		} 	}()  	// Initialize the internal context. 	cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)  	// 这个channel代表了controller的停止 	stopComplete := make(chan struct{}) 	defer close(stopComplete) 	// This must be deferred after closing stopComplete, otherwise we deadlock. 	defer func() { 		stopErr := cm.engageStopProcedure(stopComplete) 		if stopErr != nil { 			if err != nil { 				err = kerrors.NewAggregate([]error{err, stopErr}) 			} else { 				err = stopErr 			} 		} 	}()  	// Add the cluster runnable. 	if err := cm.add(cm.cluster); err != nil { 		return fmt.Errorf("failed to add cluster to runnables: %w", err) 	}     // 指标类 	if cm.metricsListener != nil { 		cm.serveMetrics() 	} 	if cm.healthProbeListener != nil { 		cm.serveHealthProbes() 	} 	if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil { 		if !errors.Is(err, wait.ErrWaitTimeout) { 			return err 		} 	}  	// 等待informer同步完成 	if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil { 		if !errors.Is(err, wait.ErrWaitTimeout) { 			return err 		} 	}  	// 非选举模式,runnable将在cache同步完成后启动 	if err := cm.runnables.Others.Start(cm.internalCtx); err != nil { 		if !errors.Is(err, wait.ErrWaitTimeout) { 			return err 		} 	}  	// Start the leader election and all required runnables. 	{ 		ctx, cancel := context.WithCancel(context.Background()) 		cm.leaderElectionCancel = cancel 		go func() { 			if cm.resourceLock != nil { 				if err := cm.startLeaderElection(ctx); err != nil { 					cm.errChan <- err 				} 			} else { 				// Treat not having leader election enabled the same as being elected. 				if err := cm.startLeaderElectionRunnables(); err != nil { 					cm.errChan <- err 				} 				close(cm.elected) 			} 		}() 	}  	ready = true 	cm.Unlock() 	select { 	case <-ctx.Done(): 		// We are done 		return nil 	case err := <-cm.errChan: 		// Error starting or running a runnable 		return err 	} } 

可以看到上面启动了4种类型的runnable,实际上就是对这runnable进行启动,例如 controller,cache等。

回顾一下,我们之前在使用code-generator 生成,并自定义controller时,我们也是通过启动 informer.Start() ,否则会报错。

最后可以通过一张关系图来表示,client-go与controller-manager之间的关系

深入解析kubernetes controller-runtime

Reference

diving controller runtime

发表评论

相关文章