client-go之Controller&Processor源码分析
1.controller与Processor概述
Controller
Controller从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer本地缓存,并通知Processor相关对象有变化事件发生。
Processor
Processor根据Controller的通知,即根据对象的变化事件类型,调用相应的ResourceEventHandler来处理对象的变化。
先通过一张informer概要架构图看一下Controller&Processor所处位置与概要功能。
2.Controller初始化与启动分析
2.1 Cotroller初始化-New
New用于初始化Controller,方法比较简单。
// staging/src/k8s.io/client-go/tools/cache/controller.go func New(c *Config) Controller { ctlr := &controller{ config: *c, clock: &clock.RealClock{}, } return ctlr }
2.2 Controller启动-controller.Run
controller.Run为controller的启动方法,这里主要看到几个点:
(1)调用NewReflector,初始化Reflector;
(2)调用r.Run,实际上是调用了Reflector的启动方法来启动Reflector(Reflector相关的分析前面的博客已经分析过了,这里不再重复);
(3)调用c.processLoop,开始controller的核心处理;
// staging/src/k8s.io/client-go/tools/cache/controller.go func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() go func() { <-stopCh c.config.Queue.Close() }() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.clock = c.clock c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() var wg wait.Group defer wg.Wait() wg.StartWithChannel(stopCh, r.Run) wait.Until(c.processLoop, time.Second, stopCh) }
3.controller核心处理方法分析
controller.processLoop即为controller的核心处理方法。
controller.processLoop
controller的核心处理方法processLoop中,最重要的逻辑是循环调用c.config.Queue.Pop将DeltaFIFO中的队头元素给pop出来(实际上pop出来的是Deltas,是Delta的切片类型),然后调用c.config.Process
方法来做处理,当处理出错时,再调用c.config.Queue.AddIfNotPresent
将对象重新加入到DeltaFIFO中去。
func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } }
根据前面sharedIndexInformer的初始化与启动分析(sharedIndexInformer.Run)可以得知,c.config.Process即为s.HandleDeltas方法,所以接下来看到s.HandleDeltas方法的分析。
c.config.Process/s.HandleDeltas
根据前面分析知道HandleDeltas要处理的是Deltas,是Delta的切片类型。
再来看到HandleDeltas方法的主要逻辑:
(1)循环遍历Deltas,拿到单个Delta;
(2)判断Delta的类型;
(3)如果是Added、Updated、Sync类型,则从indexer中获取该对象,存在则调用s.indexer.Update来更新indexer中的该对象,随后构造updateNotification struct,并调用s.processor.distribute方法;如果indexer中不存在该对象,则调用s.indexer.Add来往indexer中添加该对象,随后构造addNotification struct,并调用s.processor.distribute方法;
(4)如果是Deleted类型,则调用s.indexer.Delete来将indexer中的该对象删除,随后构造deleteNotification struct,并调用s.processor.distribute方法;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil } type updateNotification struct { oldObj interface{} newObj interface{} } type addNotification struct { newObj interface{} } type deleteNotification struct { oldObj interface{} }
至此,Controller的分析就结束了,用一张图来回忆一下Controller的功能与架构。
4.processor核心处理方法分析
sharedIndexInformer.processor.distribute
接下来分析一下前面提到的s.processor.distribute方法。
可以看到distribute方法最终是将构造好的addNotification、updateNotification、deleteNotification对象写入到p.addCh中。
sync类型的对象写入到p.syncingListeners中,但informer中貌似没有启动p.syncingListeners或对p.syncingListeners做处理,所以sync类型的对象变化(也即list操作得到的对象所生成的对象变化)会被忽略?有待验证。
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() if sync { for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } } } func (p *processorListener) add(notification interface{}) { p.addCh <- notification }
sharedIndexInformer.processor.run
s.processor.run启动了processor,其中注意到listener.run与listener.pop两个核心方法。
这里可以看到processor的run方法中只启动了p.listeners,没有启动p.syncingListeners。
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true }() <-stopCh p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop } p.wg.Wait() // Wait for all .pop() and .run() to stop }
processorListener.pop
分析processorListener的pop方法可以得知,其逻辑实际上就是将p.addCh中的对象给拿出来,然后丢进了p.nextCh中。那么谁来处理p.nextCh呢?继续往下看。
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) // Tell .run() to stop var nextCh chan<- interface{} var notification interface{} for { select { case nextCh <- notification: // Notification dispatched var ok bool notification, ok = p.pendingNotifications.ReadOne() if !ok { // Nothing to pop nextCh = nil // Disable this select case } case notificationToAdd, ok := <-p.addCh: if !ok { return } if notification == nil { // No notification to pop (and pendingNotifications is empty) // Optimize the case - skip adding to pendingNotifications notification = notificationToAdd nextCh = p.nextCh } else { // There is already a notification waiting to be dispatched p.pendingNotifications.WriteOne(notificationToAdd) } } } }
processorListener.run
在processorListener的run方法中,将循环读取p.nextCh,判断对象类型,是updateNotification则调用p.handler.OnUpdate方法,是addNotification则调用p.handler.OnAdd方法,是deleteNotification则调用p.handler.OnDelete方法做处理。
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (p *processorListener) run() { // this call blocks until the channel is closed. When a panic happens during the notification // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) // the next notification will be attempted. This is usually better than the alternative of never // delivering again. stopCh := make(chan struct{}) wait.Until(func() { // this gives us a few quick retries before a long pause and then a few more quick retries err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { for next := range p.nextCh { switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) } } // the only way to get here is if the p.nextCh is empty and closed return true, nil }) // the only way to get here is if the p.nextCh is empty and closed if err == nil { close(stopCh) } }, 1*time.Minute, stopCh) }
而p.handler.OnUpdate、p.handler.OnAdd、p.handler.OnDelete方法实际上就是自定义的的ResourceEventHandlerFuncs了。
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: onAdd, UpdateFunc: onUpdate, DeleteFunc: onDelete, })
// staging/src/k8s.io/client-go/tools/cache/controller.go type ResourceEventHandlerFuncs struct { AddFunc func(obj interface{}) UpdateFunc func(oldObj, newObj interface{}) DeleteFunc func(obj interface{}) } func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) { if r.AddFunc != nil { r.AddFunc(obj) } } func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) { if r.UpdateFunc != nil { r.UpdateFunc(oldObj, newObj) } } func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) { if r.DeleteFunc != nil { r.DeleteFunc(obj) } }
至此,Processor的分析也结束了,用一张图来回忆一下Processor的功能与架构。
总结
Controller
Controller从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer本地缓存,并通知Processor相关对象有变化事件发生:
(1)如果是Added、Updated、Sync类型,则从indexer中获取该对象,存在则调用s.indexer.Update来更新indexer中的该对象,随后构造updateNotification struct,并通知Processor;如果indexer中不存在该对象,则调用s.indexer.Add来往indexer中添加该对象,随后构造addNotification struct,并通知Processor;
(2)如果是Deleted类型,则调用s.indexer.Delete来将indexer中的该对象删除,随后构造deleteNotification struct,并通知Processor;
Processor
Processor根据Controller的通知,即根据对象的变化事件类型(addNotification、updateNotification、deleteNotification),调用相应的ResourceEventHandler(addFunc、updateFunc、deleteFunc)来处理对象的变化。
informer架构中的Controller&Processor
在对informer中的Controller与Processor分析完之后,接下来将分析informer中的Indexer。