编写一个kubernetes controller

Overview

根据Kuberneter文档对Controller的描述,Controller在kubernetes中是负责协调的组件,根据设计模式可知,controller会不断的你的对象(如Pod)从当前状态与期望状态同步的一个过程。当然Controller会监听你的实际状态与期望状态。

Writing Controllers

package main  import ( 	"flag" 	"fmt" 	"os" 	"time"  	v1 "k8s.io/api/core/v1" 	"k8s.io/apimachinery/pkg/fields" 	utilruntime "k8s.io/apimachinery/pkg/util/runtime" 	"k8s.io/apimachinery/pkg/util/wait" 	"k8s.io/client-go/kubernetes" 	"k8s.io/client-go/rest" 	"k8s.io/client-go/tools/cache" 	"k8s.io/client-go/tools/clientcmd" 	"k8s.io/client-go/util/homedir" 	"k8s.io/client-go/util/workqueue" 	"k8s.io/klog" )  type Controller struct { 	lister     cache.Indexer 	controller cache.Controller 	queue      workqueue.RateLimitingInterface }  func NewController(lister cache.Indexer, controller cache.Controller, queue workqueue.RateLimitingInterface) *Controller { 	return &Controller{ 		lister:     lister, 		controller: controller, 		queue:      queue, 	} }  func (c *Controller) processItem() bool { 	item, quit := c.queue.Get() 	if quit { 		return false 	} 	defer c.queue.Done(item) 	fmt.Println(item) 	err := c.processWrapper(item.(string)) 	if err != nil { 		c.handleError(item.(string)) 	} 	return true }  func (c *Controller) handleError(key string) {  	if c.queue.NumRequeues(key) < 3 { 		c.queue.AddRateLimited(key) 		return 	} 	c.queue.Forget(key) 	klog.Infof("Drop Object %s in queue", key) }  func (c *Controller) processWrapper(key string) error { 	item, exists, err := c.lister.GetByKey(key) 	if err != nil { 		klog.Error(err) 		return err 	} 	if !exists { 		klog.Info(fmt.Sprintf("item %v not exists in cache.n", item)) 	} else { 		fmt.Println(item.(*v1.Pod).GetName()) 	} 	return err }  func (c *Controller) Run(threadiness int, stopCh chan struct{}) { 	defer utilruntime.HandleCrash() 	defer c.queue.ShutDown() 	klog.Infof("Starting custom controller")  	go c.controller.Run(stopCh)  	if !cache.WaitForCacheSync(stopCh, c.controller.HasSynced) { 		utilruntime.HandleError(fmt.Errorf("sync failed.")) 		return 	}  	for i := 0; i < threadiness; i++ { 		go wait.Until(func() { 			for c.processItem() { 			} 		}, time.Second, stopCh) 	} 	<-stopCh 	klog.Info("Stopping custom controller") }  func main() { 	var ( 		k8sconfig  *string //使用kubeconfig配置文件进行集群权限认证 		restConfig *rest.Config 		err        error 	) 	if home := homedir.HomeDir(); home != "" { 		k8sconfig = flag.String("kubeconfig", fmt.Sprintf("%s/.kube/config", home), "kubernetes auth config") 	} 	k8sconfig = k8sconfig 	flag.Parse() 	if _, err := os.Stat(*k8sconfig); err != nil { 		panic(err) 	}  	if restConfig, err = rest.InClusterConfig(); err != nil { 		// 这里是从masterUrl 或者 kubeconfig传入集群的信息,两者选一 		restConfig, err = clientcmd.BuildConfigFromFlags("", *k8sconfig) 		if err != nil { 			panic(err) 		} 	} 	restset, err := kubernetes.NewForConfig(restConfig) 	lister := cache.NewListWatchFromClient(restset.CoreV1().RESTClient(), "pods", "default", fields.Everything()) 	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) 	indexer, controller := cache.NewIndexerInformer(lister, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{ 		AddFunc: func(obj interface{}) { 			fmt.Println("add ", obj.(*v1.Pod).GetName()) 			key, err := cache.MetaNamespaceKeyFunc(obj) 			if err == nil { 				queue.Add(key) 			}  		}, 		UpdateFunc: func(oldObj, newObj interface{}) { 			fmt.Println("update", newObj.(*v1.Pod).GetName()) 			if newObj.(*v1.Pod).Status.Conditions[0].Status == "True" { 				fmt.Println("update: the Initialized Status", newObj.(*v1.Pod).Status.Conditions[0].Status) 			} else { 				fmt.Println("update: the Initialized Status ", newObj.(*v1.Pod).Status.Conditions[0].Status) 				fmt.Println("update: the Initialized Reason ", newObj.(*v1.Pod).Status.Conditions[0].Reason) 			}  			if len(newObj.(*v1.Pod).Status.Conditions) > 1 { 				if newObj.(*v1.Pod).Status.Conditions[1].Status == "True" { 					fmt.Println("update: the Ready Status", newObj.(*v1.Pod).Status.Conditions[1].Status) 				} else { 					fmt.Println("update: the Ready Status ", newObj.(*v1.Pod).Status.Conditions[1].Status) 					fmt.Println("update: the Ready Reason ", newObj.(*v1.Pod).Status.Conditions[1].Reason) 				}  				if newObj.(*v1.Pod).Status.Conditions[2].Status == "True" { 					fmt.Println("update: the PodCondition Status", newObj.(*v1.Pod).Status.Conditions[2].Status) 				} else { 					fmt.Println("update: the PodCondition Status ", newObj.(*v1.Pod).Status.Conditions[2].Status) 					fmt.Println("update: the PodCondition Reason ", newObj.(*v1.Pod).Status.Conditions[2].Reason) 				}  				if newObj.(*v1.Pod).Status.Conditions[3].Status == "True" { 					fmt.Println("update: the PodScheduled Status", newObj.(*v1.Pod).Status.Conditions[3].Status) 				} else { 					fmt.Println("update: the PodScheduled Status ", newObj.(*v1.Pod).Status.Conditions[3].Status) 					fmt.Println("update: the PodScheduled Reason ", newObj.(*v1.Pod).Status.Conditions[3].Reason) 				} 			}  		}, 		DeleteFunc: func(obj interface{}) { 			fmt.Println("delete ", obj.(*v1.Pod).GetName(), "Status ", obj.(*v1.Pod).Status.Phase) 			// 上面是事件函数的处理,下面是对workqueue的操作 			key, err := cache.MetaNamespaceKeyFunc(obj) 			if err == nil { 				queue.Add(key) 			} 		}, 	}, cache.Indexers{})  	c := NewController(indexer, controller, queue) 	stopCh := make(chan struct{}) 	stopCh1 := make(chan struct{}) 	c.Run(1, stopCh) 	defer close(stopCh) 	<-stopCh1 }  

通过日志可以看出,Pod create后的步骤大概为4步:

  • Initialized:初始化好后状态为Pending
  • PodScheduled:然后调度
  • PodCondition
  • Ready
add  netbox default/netbox netbox update netbox status Pending to Pending update: the Initialized Status True update netbox status Pending to Pending update: the Initialized Status True update: the Ready Status  False update: the Ready Reason  ContainersNotReady update: the PodCondition Status  False update: the PodCondition Reason  ContainersNotReady update: the PodScheduled Status True   update netbox status Pending to Running update: the Initialized Status True update: the Ready Status True update: the PodCondition Status True update: the PodScheduled Status True 

大致上与 kubectl describe pod 看到的内容页相似

default-scheduler  Successfully assigned default/netbox to master-machine   Normal  Pulling    85s   kubelet            Pulling image "cylonchau/netbox"   Normal  Pulled     30s   kubelet            Successfully pulled image "cylonchau/netbox"   Normal  Created    30s   kubelet            Created container netbox   Normal  Started    30s   kubelet            Started container netbox 

Reference

controllers.md

发表评论

相关文章