【知识点】go的channel无法满足kubernetes的应用场景

一般队列

在kubernetes中,go channel的使用不能满足kubernetes的应用场景,比如延迟、限速等;在kubernetes中,队列分为三种,common queue,delay queue delaying queue,rate limiters queue

接口

接口作为所有队列的抽象定义

type Interface interface {
	Add(item interface{})
	Len() int
	Get() (item interface{}, shutdown bool)
	Done(item interface{})
	ShutDown()
	ShuttingDown() bool
}

实施

type Type struct { // 一个work queue
	queue []t // queue用slice做存储
	dirty set // 脏位,定义了需要处理的元素,类似于操作系统,表示已修改但为写入
	processing set // 当前正在处理的元素集合
	cond *sync.Cond
	shuttingDown bool
	metrics queueMetrics
	unfinishedWorkUpdatePeriod time.Duration
	clock                      clock.Clock
}
type empty struct{}
type t interface{} // t queue中的元素
type set map[t]empty // dirty 和 processing中的元素

可以看到核心属性是 queue 、dirty 、processing

延迟队列

在研究优先级队列之前,需要对Heap有一定的了解,因为延迟队列使用堆作为延迟队列

堆是一种基于树属性的特殊数据结构; heap 是一个完整的二叉树类型,有两种类型:

二叉堆的存储规则:

那么在下图中,就是堆

堆的实现

例子:向左添加一个值为42的元素的过程

第 1 步:将新元素放在堆中的第一个可用位置。这将使结构保持为完整的二叉树,但它可能不再是堆,因为新元素的值可能大于其父元素。

第二步:如果新元素的值大于父元素,则将新元素与父元素交换,直到新元素到达根,或者当新元素大于等于该值时停止其父元素的

这个过程称为向上再堆化

示例:删除根

步骤1:将根元素复制到用于返回值的变量中,将最深层的最后一个元素复制到根,然后从树中取出最后一个节点。这个元素被称为 out-of-place 。

第 2 步:将错位元素与其最大子元素交换,并返回第 1 步中保存的值。

这个过程称为向下再堆化

优先队列

优先级队列的行为:

它是如何工作的:

实现的代码:

参考

Client-go 的延迟队列

Kubernetes 中延迟队列的设计非常漂亮。通过heap实现的延迟队列和kubernetes中的pass队列,完成了延迟队列的功能。

// 注释中给了一个hot-loop热循环,通过这个loop实现了delaying
type DelayingInterface interface {
	Interface // 继承了workqueue的功能
	AddAfter(item interface{}, duration time.Duration) // 在time后将内容添加到工作队列中
}

实现延迟接口的示例

type delayingType struct {
	Interface // 通用的queue 
	clock clock.Clock // 对比的时间 ,包含一些定时器的功能
    	type Clock interface {
            PassiveClock
            		type PassiveClock interface {
                        Now() time.Time
                        Since(time.Time) time.Duration
                    }
            After(time.Duration) 
	stopCh chan struct{} // 停止loop
	stopOnce sync.Once // 保证退出只会触发一次
	heartbeat clock.Ticker // 一个定时器,保证了loop的最大空事件等待时间
	waitingForAddCh chan *waitFor // 普通的chan,用来接收数据插入到延迟队列中
	metrics retryMetrics // 重试的指数
}

那么延迟队列的整个数据结构如下图所示

如前所述,这个延迟队列的核心是一个优先级队列,优先级队列需要满足:

而waitFor就是这个优先级队列的数据结构

type waitFor struct {
	data    t // 数据
	readyAt time.Time // 加入工作队列的时间
	index int // 优先级队列中的索引
}

而waitForPriorityQueue是container/heap/heap.go.Inferface的一个实现,它的数据结构是一个MinHeap,最小的readyAt位于Root中

type Interface interface {
	sort.Interface
	Push(x interface{}) // add x as element Len()
	Pop() interface{}   // remove and return element Len() - 1.
}

而这个的实现是waitForPriorityQueue

type waitForPriorityQueue []*waitFor
func (pq waitForPriorityQueue) Len() int {
	return len(pq)
}
// 这个也是最重要的一个,就是哪个属性是排序的关键,也是heap.down和heap.up中使用的
func (pq waitForPriorityQueue) Less(i, j int) bool {
	return pq[i].readyAt.Before(pq[j].readyAt)
}
func (pq waitForPriorityQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
	pq[i].index = i
	pq[j].index = j
}
// push 和pop 必须使用heap.push 和heap.pop
func (pq *waitForPriorityQueue) Push(x interface{}) {
	n := len(*pq)
	item := x.(*waitFor)
	item.index = n
	*pq = append(*pq, item)
}
func (pq *waitForPriorityQueue) Pop() interface{} {
	n := len(*pq)
	item := (*pq)[n-1]
	item.index = -1
	*pq = (*pq)[0:(n - 1)]
	return item
}
// Peek returns the item at the beginning of the queue, without removing the
// item or otherwise mutating the queue. It is safe to call directly.
func (pq waitForPriorityQueue) Peek() interface{} {
	return pq[0]
}

整个延迟队列的核心是waitingLoop。作为延迟队列的主要逻辑,检查waitingForAddCh是否有要延迟的内容,取出延迟的内容放入Heap;并保证最长的阻塞期

func (q *delayingType) waitingLoop() {
	defer utilruntime.HandleCrash()
	never := make(
		now := q.clock.Now()
		for waitingForQueue.Len() > 0 {
			entry := waitingForQueue.Peek().(*waitFor)
			if entry.readyAt.After(now) {
				break // 时间没到则不处理
			}
			entry = heap.Pop(waitingForQueue).(*waitFor) // 从优先级队列中取出一个
			q.Add(entry.data) // 添加到延迟队列中
			delete(waitingEntryByData, entry.data) // 删除map表中的数据
		}
		// 如果存在数据则设置最近一个内容要执行的定时器
		nextReadyAt := never
		if waitingForQueue.Len() > 0 {
			if nextReadyAtTimer != nil {
				nextReadyAtTimer.Stop()
			}
			entry := waitingForQueue.Peek().(*waitFor) // 窥视[0]和值
			nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) // 创建一个定时器
			nextReadyAt = nextReadyAtTimer.C()
		}
		select {
		case <-q.stopCh: // 退出
			return
		case <-q.heartbeat.C(): // 多久没有任何动作时重新一次循环
		case <-nextReadyAt: // 如果有元素时间到了,则继续执行循环,处理上面添加的操作
		case waitEntry := <-q.waitingForAddCh:
			if waitEntry.readyAt.After(q.clock.Now()) { // 时间没到,是用readyAt和now对比time.Now
				// 添加到延迟队列中,有两个 waitingEntryByData waitingForQueue
				insert(waitingForQueue, waitingEntryByData, waitEntry)
			} else {
				q.Add(waitEntry.data)
			}
			drained := false // 保证可以取完q.waitingForAddCh // addafter
			for !drained {
				select {
                // 这里是一个有buffer的队列,需要保障这个队列读完
				case waitEntry := <-q.waitingForAddCh: 
					if waitEntry.readyAt.After(q.clock.Now()) {
						insert(waitingForQueue, waitingEntryByData, waitEntry)
					} else {
						q.Add(waitEntry.data)
					}
				default: // 保证可以退出,但限制于上一个分支的0~n的读取
				// 如果上一个分支阻塞,则为没有数据就是取尽了,走到这个分支
				// 如果上个分支不阻塞则读取到上个分支阻塞为止,代表阻塞,则走default退出
					drained = true
				}
			}
		}
	}
}

限速队列

限速队列RateLimiting是在优先队列和延迟队列的基础上扩展的队列

type RateLimitingInterface interface {
	DelayingInterface // 继承延迟队列
	// 在限速器准备完成后(即合规后)添加条目到队列中
	AddRateLimited(item interface{})
	// drop掉条目,无论成功或失败
	Forget(item interface{})
	// 被重新放入队列中的次数
	NumRequeues(item interface{}) int
}

只要满足 AddRateLimited() 、 Forget() 、 NumRequeues() 的延迟队列是限速队列,就可以看到限速队列的抽象对应关系。了解了规则之后,就需要分析具体的实现了。

type rateLimitingType struct {
	DelayingInterface
	rateLimiter RateLimiter
}
func (q *rateLimitingType) AddRateLimited(item interface{}) {
	q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))

}
func (q *rateLimitingType) NumRequeues(item interface{}) int {
	return q.rateLimiter.NumRequeues(item)
}
func (q *rateLimitingType) Forget(item interface{}) {
	q.rateLimiter.Forget(item)
}

rateLimitingType 是抽象规范 RateLimitingInterface 的实现。可以看到延迟队列中增加了一个限速器RateLimiter。

type RateLimiter interface {
	// when决定等待多长时间
	When(item interface{}) time.Duration
	// drop掉item
	// or for success, we'll stop tracking it
	Forget(item interface{})
	// 重新加入队列中的次数
	NumRequeues(item interface{}) int
}

抽象限速器的实现包括 BucketRateLimiter , ItemBucketRateLimiter , ItemExponentialFailureRateLimiter , ItemFastSlowRateLimiter , MaxOfRateLimiter ,下面对这些限速器进行分析

BucketRateLimiter

BucketRateLimiter 是一个令牌桶,它实现了 rate.Limiter 和抽象的 RateLimiter。通过workqueue.DefaultControllerRateLimiter()初始化。

func DefaultControllerRateLimiter() RateLimiter {
	return NewMaxOfRateLimiter(
		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
		// 10 qps, 100 bucket size.  This is only for retry speed and its only the overall factor (not per item)
		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
	)
}

关于令牌桶算法的更多信息,请参考这里

ItemBucketRateLimiter

ItemBucketRateLimiter 是将每个令牌桶存储为一个列表的实现,每个键是一个单独的速率限制器

type ItemBucketRateLimiter struct {
	r     rate.Limit
	burst int
	limitersLock sync.Mutex
	limiters     map[interface{}]*rate.Limiter
}
func NewItemBucketRateLimiter(r rate.Limit, burst int) *ItemBucketRateLimiter {
	return &ItemBucketRateLimiter{
		r:        r,
		burst:    burst,
		limiters: make(map[interface{}]*rate.Limiter),
	}
}

ItemExponentialFailureRateLimiter

顾名思义,ItemExponentialFailureRateLimiter 是一个错误索引限速器。根据错误的数量,该指标用于延迟时间。该指数的计算公式为:(baseDelaytimes2^{})。可以看出When决定了流量整形的延迟时间,并根据错误次数延长重试时间。

type ItemExponentialFailureRateLimiter struct {
	failuresLock sync.Mutex
	failures     map[interface{}]int // 失败的次数
	baseDelay time.Duration // 延迟基数
	maxDelay  time.Duration // 最大延迟
}
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()
	exp := r.failures[item]
	r.failures[item] = r.failures[item] + 1
	// The backoff is capped such that 'calculated' value never overflows.
	backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
	if backoff > math.MaxInt64 {
		return r.maxDelay
	}
	calculated := time.Duration(backoff)
	if calculated > r.maxDelay {

图片[1]-【知识点】go的channel无法满足kubernetes的应用场景-唐朝资源网

return r.maxDelay } return calculated } func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int { r.failuresLock.Lock() defer r.failuresLock.Unlock() return r.failures[item] } func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) { r.failuresLock.Lock() defer r.failuresLock.Unlock() delete(r.failures, item) }

ItemFastSlowRateLimiter

ItemFastSlowRateLimiter,限速器快速重试一定次数,然后慢速重试

type ItemFastSlowRateLimiter struct {
	failuresLock sync.Mutex
	failures     map[interface{}]int
	maxFastAttempts int // 最大尝试次数
	fastDelay       time.Duration // 快的速度
	slowDelay       time.Duration // 慢的速度
}
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
	return &ItemFastSlowRateLimiter{
		failures:        map[interface{}]int{},
		fastDelay:       fastDelay,
		slowDelay:       slowDelay,
		maxFastAttempts: maxFastAttempts,
	}
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()
	r.failures[item] = r.failures[item] + 1
	// 当错误次数没超过快速的阈值使用快速,否则使用慢速
	if r.failures[item] <= r.maxFastAttempts {
		return r.fastDelay
	}
	return r.slowDelay
}
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()
	return r.failures[item]
}
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
	r.failuresLock.Lock()
	defer r.failuresLock.Unlock()
	delete(r.failures, item)
}

MaxOfRateLimiter

MaxOfRateLimiter 是返回的限速器列表中延迟最大的限速器

type MaxOfRateLimiter struct {
	limiters []RateLimiter
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
	ret := time.Duration(0)

图片[2]-【知识点】go的channel无法满足kubernetes的应用场景-唐朝资源网

for _, limiter := range r.limiters { curr := limiter.When(item) if curr > ret { ret = curr } } return ret } func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter { return &MaxOfRateLimiter{limiters: limiters} } func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int { ret := 0 // 找到列表內所有的NumRequeues(失败的次数),以最多次的为主。 for _, limiter := range r.limiters { curr := limiter.NumRequeues(item) if curr > ret { ret = curr } } return ret } func (r *MaxOfRateLimiter) Forget(item interface{}) { for _, limiter := range r.limiters { limiter.Forget(item) } }

如何使用 Kubernetes 限速器

基于流量控制的限速队列实例可以大量突发,但需要整形。加法运算会根据When()中设计的等待时间进行加法运算。根据不同的队列实现不同的延迟

package main
import (
	"fmt"
	"log"
	"strconv"
	"time"
	"k8s.io/client-go/util/workqueue"
)
func main() {
	stopCh := make(chan string)
	timeLayout := "2006-01-02:15:04:05.0000"
	limiter := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
	length := 20 // 一共请求20次
	chs := make([]chan string, length)
	for i := 0; i < length; i++ {
		chs[i] = make(chan string, 1)
		go func(taskId string, ch chan string) {
			item := "Task-" + taskId + time.Now().Format(timeLayout)
			log.Println(item + " Added.")
            limiter.AddRateLimited(item) // 添加会根据When() 延迟添加到工作队列中
		}(strconv.FormatInt(int64(i), 10), chs[i])
		go func() {
			for {
				key, quit := limiter.Get()
				if quit {
					return
				}
				log.Println(fmt.Sprintf("%s process done", key))
				defer limiter.Done(key)
			}
		}()
	}
	<-stopCh
}

由于默认限速器不支持初始化QPS,将源码修改为(BT(1, 5)),执行结果可以看到,当有大流量突发时,桶中token的数量超过桶中token的数量,将根据token生成的速度释放。

图中,任务的添加是突然的,同时添加了日志打印,但是消费者可以看到添加前的日志输出实际上是延迟的。配置是每秒一个token,实际释放流量也是每秒一个token。

© 版权声明
THE END
喜欢就支持一下吧
点赞119 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片