Overview
根据Kuberneter文档对Controller的描述,Controller在kubernetes中是负责协调的组件,根据设计模式可知,controller会不断的你的对象(如Pod)从当前状态与期望状态同步的一个过程。当然Controller会监听你的实际状态与期望状态。
Writing Controllers
package main import ( "flag" "fmt" "os" "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" "k8s.io/client-go/util/workqueue" "k8s.io/klog" ) type Controller struct { lister cache.Indexer controller cache.Controller queue workqueue.RateLimitingInterface } func NewController(lister cache.Indexer, controller cache.Controller, queue workqueue.RateLimitingInterface) *Controller { return &Controller{ lister: lister, controller: controller, queue: queue, } } func (c *Controller) processItem() bool { item, quit := c.queue.Get() if quit { return false } defer c.queue.Done(item) fmt.Println(item) err := c.processWrapper(item.(string)) if err != nil { c.handleError(item.(string)) } return true } func (c *Controller) handleError(key string) { if c.queue.NumRequeues(key) < 3 { c.queue.AddRateLimited(key) return } c.queue.Forget(key) klog.Infof("Drop Object %s in queue", key) } func (c *Controller) processWrapper(key string) error { item, exists, err := c.lister.GetByKey(key) if err != nil { klog.Error(err) return err } if !exists { klog.Info(fmt.Sprintf("item %v not exists in cache.n", item)) } else { fmt.Println(item.(*v1.Pod).GetName()) } return err } func (c *Controller) Run(threadiness int, stopCh chan struct{}) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() klog.Infof("Starting custom controller") go c.controller.Run(stopCh) if !cache.WaitForCacheSync(stopCh, c.controller.HasSynced) { utilruntime.HandleError(fmt.Errorf("sync failed.")) return } for i := 0; i < threadiness; i++ { go wait.Until(func() { for c.processItem() { } }, time.Second, stopCh) } <-stopCh klog.Info("Stopping custom controller") } func main() { var ( k8sconfig *string //使用kubeconfig配置文件进行集群权限认证 restConfig *rest.Config err error ) if home := homedir.HomeDir(); home != "" { k8sconfig = flag.String("kubeconfig", fmt.Sprintf("%s/.kube/config", home), "kubernetes auth config") } k8sconfig = k8sconfig flag.Parse() if _, err := os.Stat(*k8sconfig); err != nil { panic(err) } if restConfig, err = rest.InClusterConfig(); err != nil { // 这里是从masterUrl 或者 kubeconfig传入集群的信息,两者选一 restConfig, err = clientcmd.BuildConfigFromFlags("", *k8sconfig) if err != nil { panic(err) } } restset, err := kubernetes.NewForConfig(restConfig) lister := cache.NewListWatchFromClient(restset.CoreV1().RESTClient(), "pods", "default", fields.Everything()) queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) indexer, controller := cache.NewIndexerInformer(lister, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { fmt.Println("add ", obj.(*v1.Pod).GetName()) key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { queue.Add(key) } }, UpdateFunc: func(oldObj, newObj interface{}) { fmt.Println("update", newObj.(*v1.Pod).GetName()) if newObj.(*v1.Pod).Status.Conditions[0].Status == "True" { fmt.Println("update: the Initialized Status", newObj.(*v1.Pod).Status.Conditions[0].Status) } else { fmt.Println("update: the Initialized Status ", newObj.(*v1.Pod).Status.Conditions[0].Status) fmt.Println("update: the Initialized Reason ", newObj.(*v1.Pod).Status.Conditions[0].Reason) } if len(newObj.(*v1.Pod).Status.Conditions) > 1 { if newObj.(*v1.Pod).Status.Conditions[1].Status == "True" { fmt.Println("update: the Ready Status", newObj.(*v1.Pod).Status.Conditions[1].Status) } else { fmt.Println("update: the Ready Status ", newObj.(*v1.Pod).Status.Conditions[1].Status) fmt.Println("update: the Ready Reason ", newObj.(*v1.Pod).Status.Conditions[1].Reason) } if newObj.(*v1.Pod).Status.Conditions[2].Status == "True" { fmt.Println("update: the PodCondition Status", newObj.(*v1.Pod).Status.Conditions[2].Status) } else { fmt.Println("update: the PodCondition Status ", newObj.(*v1.Pod).Status.Conditions[2].Status) fmt.Println("update: the PodCondition Reason ", newObj.(*v1.Pod).Status.Conditions[2].Reason) } if newObj.(*v1.Pod).Status.Conditions[3].Status == "True" { fmt.Println("update: the PodScheduled Status", newObj.(*v1.Pod).Status.Conditions[3].Status) } else { fmt.Println("update: the PodScheduled Status ", newObj.(*v1.Pod).Status.Conditions[3].Status) fmt.Println("update: the PodScheduled Reason ", newObj.(*v1.Pod).Status.Conditions[3].Reason) } } }, DeleteFunc: func(obj interface{}) { fmt.Println("delete ", obj.(*v1.Pod).GetName(), "Status ", obj.(*v1.Pod).Status.Phase) // 上面是事件函数的处理,下面是对workqueue的操作 key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { queue.Add(key) } }, }, cache.Indexers{}) c := NewController(indexer, controller, queue) stopCh := make(chan struct{}) stopCh1 := make(chan struct{}) c.Run(1, stopCh) defer close(stopCh) <-stopCh1 }
通过日志可以看出,Pod create后的步骤大概为4步:
- Initialized:初始化好后状态为Pending
- PodScheduled:然后调度
- PodCondition
- Ready
add netbox default/netbox netbox update netbox status Pending to Pending update: the Initialized Status True update netbox status Pending to Pending update: the Initialized Status True update: the Ready Status False update: the Ready Reason ContainersNotReady update: the PodCondition Status False update: the PodCondition Reason ContainersNotReady update: the PodScheduled Status True update netbox status Pending to Running update: the Initialized Status True update: the Ready Status True update: the PodCondition Status True update: the PodScheduled Status True
大致上与 kubectl describe pod
看到的内容页相似
default-scheduler Successfully assigned default/netbox to master-machine Normal Pulling 85s kubelet Pulling image "cylonchau/netbox" Normal Pulled 30s kubelet Successfully pulled image "cylonchau/netbox" Normal Created 30s kubelet Created container netbox Normal Started 30s kubelet Started container netbox
Reference