Writing会监听你的实际状态与期望状态同步的一个过程

概览

根据Kubernetes文档中对Controller的描述,Controller是kubernetes中负责协调的组件。根据设计模式,Controller 会不断的将你的对象(比如 Pods)从当前状态同步到想要的状态。当然,Controller 会听取您的实际和期望的状态。

编写控制器

package main
import (
	"flag"
	"fmt"
	"os"
	"time"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/fields"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/klog"
)
type Controller struct {
	lister     cache.Indexer
	controller cache.Controller
	queue      workqueue.RateLimitingInterface
}
func NewController(lister cache.Indexer, controller cache.Controller, queue workqueue.RateLimitingInterface) *Controller {
	return &Controller{
		lister:     lister,
		controller: controller,
		queue:      queue,
	}
}
func (c *Controller) processItem() bool {
	item, quit := c.queue.Get()
	if quit {
		return false
	}
	defer c.queue.Done(item)
	fmt.Println(item)
	err := c.processWrapper(item.(string))
	if err != nil {
		c.handleError(item.(string))
	}
	return true
}
func (c *Controller) handleError(key string) {
	if c.queue.NumRequeues(key) < 3 {
		c.queue.AddRateLimited(key)
		return
	}
	c.queue.Forget(key)
	klog.Infof("Drop Object %s in queue", key)
}
func (c *Controller) processWrapper(key string) error {
	item, exists, err := c.lister.GetByKey(key)
	if err != nil {
		klog.Error(err)
		return err
	}
	if !exists {
		klog.Info(fmt.Sprintf("item %v not exists in cache.n", item))
	} else {
		fmt.Println(item.(*v1.Pod).GetName())
	}
	return err
}
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
	defer utilruntime.HandleCrash()
	defer c.queue.ShutDown()
	klog.Infof("Starting custom controller")
	go c.controller.Run(stopCh)
	if !cache.WaitForCacheSync(stopCh, c.controller.HasSynced) {
		utilruntime.HandleError(fmt.Errorf("sync failed."))
		return
	}
	for i := 0; i < threadiness; i++ {
		go wait.Until(func() {
			for c.processItem() {
			}
		}, time.Second, stopCh)
	}
	 1 {
				if newObj.(*v1.Pod).Status.Conditions[1].Status == "True" {
					fmt.Println("update: the Ready Status", newObj.(*v1.Pod).Status.Conditions[1].Status)
				} else {
					fmt.Println("update: the Ready Status ", newObj.(*v1.Pod).Status.Conditions[1].Status)
					fmt.Println("update: the Ready Reason ", newObj.(*v1.Pod).Status.Conditions[1].Reason)
				}
				if newObj.(*v1.Pod).Status.Conditions[2].Status == "True" {
					fmt.Println("update: the PodCondition Status", newObj.(*v1.Pod).Status.Conditions[2].Status)
				} else {
					fmt.Println("update: the PodCondition Status ", newObj.(*v1.Pod).Status.Conditions[2].Status)
					fmt.Println("update: the PodCondition Reason ", newObj.(*v1.Pod).Status.Conditions[2].Reason)
				}
				if newObj.(*v1.Pod).Status.Conditions[3].Status == "True" {
					fmt.Println("update: the PodScheduled Status", newObj.(*v1.Pod).Status.Conditions[3].Status)
				} else {
					fmt.Println("update: the PodScheduled Status ", newObj.(*v1.Pod).Status.Conditions[3].Status)
					fmt.Println("update: the PodScheduled Reason ", newObj.(*v1.Pod).Status.Conditions[3].Reason)
				}
			}
		},
		DeleteFunc: func(obj interface{}) {
			fmt.Println("delete ", obj.(*v1.Pod).GetName(), "Status ", obj.(*v1.Pod).Status.Phase)
			// 上面是事件函数的处理,下面是对workqueue的操作
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
	}, cache.Indexers{})
	c := NewController(indexer, controller, queue)
	stopCh := make(chan struct{})
	stopCh1 := make(chan struct{})
	c.Run(1, stopCh)
	defer close(stopCh)
	<-stopCh1
}

从日志可以看出,创建Pod之后的步骤大概是4步:

add  netbox
default/netbox
netbox
update netbox status Pending to Pending
update: the Initialized Status True
update netbox status Pending to Pending
update: the Initialized Status True
update: the Ready Status  False
update: the Ready Reason  ContainersNotReady
update: the PodCondition Status  False
update: the PodCondition Reason  ContainersNotReady
update: the PodScheduled Status True
update netbox status Pending to Running
update: the Initialized Status True
update: the Ready Status True
update: the PodCondition Status True
update: the PodScheduled Status True

与 kubectl describe pod 看到的内容页面大致相似

参考

controllers.md

© 版权声明
THE END
喜欢就支持一下吧
点赞71 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片