一般队列
在kubernetes中,go channel的使用不能满足kubernetes的应用场景,比如延迟、限速等;在kubernetes中,队列分为三种,common queue,delay queue delaying queue,rate limiters queue
接口
接口作为所有队列的抽象定义
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}
实施
type Type struct { // 一个work queue
queue []t // queue用slice做存储
dirty set // 脏位,定义了需要处理的元素,类似于操作系统,表示已修改但为写入
processing set // 当前正在处理的元素集合
cond *sync.Cond
shuttingDown bool
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
type empty struct{}
type t interface{} // t queue中的元素
type set map[t]empty // dirty 和 processing中的元素
可以看到核心属性是 queue 、dirty 、processing
延迟队列
在研究优先级队列之前,需要对Heap有一定的了解,因为延迟队列使用堆作为延迟队列
堆
堆是一种基于树属性的特殊数据结构; heap 是一个完整的二叉树类型,有两种类型:
二叉堆的存储规则:
那么在下图中,就是堆
堆的实现
例子:向左添加一个值为42的元素的过程
第 1 步:将新元素放在堆中的第一个可用位置。这将使结构保持为完整的二叉树,但它可能不再是堆,因为新元素的值可能大于其父元素。
第二步:如果新元素的值大于父元素,则将新元素与父元素交换,直到新元素到达根,或者当新元素大于等于该值时停止其父元素的
这个过程称为向上再堆化
示例:删除根
步骤1:将根元素复制到用于返回值的变量中,将最深层的最后一个元素复制到根,然后从树中取出最后一个节点。这个元素被称为 out-of-place 。
第 2 步:将错位元素与其最大子元素交换,并返回第 1 步中保存的值。
这个过程称为向下再堆化
优先队列
优先级队列的行为:
它是如何工作的:
实现的代码:
参考
堆
Client-go 的延迟队列
Kubernetes 中延迟队列的设计非常漂亮。通过heap实现的延迟队列和kubernetes中的pass队列,完成了延迟队列的功能。
// 注释中给了一个hot-loop热循环,通过这个loop实现了delaying
type DelayingInterface interface {
Interface // 继承了workqueue的功能
AddAfter(item interface{}, duration time.Duration) // 在time后将内容添加到工作队列中
}
实现延迟接口的示例
type delayingType struct {
Interface // 通用的queue
clock clock.Clock // 对比的时间 ,包含一些定时器的功能
type Clock interface {
PassiveClock
type PassiveClock interface {
Now() time.Time
Since(time.Time) time.Duration
}
After(time.Duration)
stopCh chan struct{} // 停止loop
stopOnce sync.Once // 保证退出只会触发一次
heartbeat clock.Ticker // 一个定时器,保证了loop的最大空事件等待时间
waitingForAddCh chan *waitFor // 普通的chan,用来接收数据插入到延迟队列中
metrics retryMetrics // 重试的指数
}
那么延迟队列的整个数据结构如下图所示
如前所述,这个延迟队列的核心是一个优先级队列,优先级队列需要满足:
而waitFor就是这个优先级队列的数据结构
type waitFor struct {
data t // 数据
readyAt time.Time // 加入工作队列的时间
index int // 优先级队列中的索引
}
而waitForPriorityQueue是container/heap/heap.go.Inferface的一个实现,它的数据结构是一个MinHeap,最小的readyAt位于Root中
type Interface interface {
sort.Interface
Push(x interface{}) // add x as element Len()
Pop() interface{} // remove and return element Len() - 1.
}
而这个的实现是waitForPriorityQueue
type waitForPriorityQueue []*waitFor
func (pq waitForPriorityQueue) Len() int {
return len(pq)
}
// 这个也是最重要的一个,就是哪个属性是排序的关键,也是heap.down和heap.up中使用的
func (pq waitForPriorityQueue) Less(i, j int) bool {
return pq[i].readyAt.Before(pq[j].readyAt)
}
func (pq waitForPriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
// push 和pop 必须使用heap.push 和heap.pop
func (pq *waitForPriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*waitFor)
item.index = n
*pq = append(*pq, item)
}
func (pq *waitForPriorityQueue) Pop() interface{} {
n := len(*pq)
item := (*pq)[n-1]
item.index = -1
*pq = (*pq)[0:(n - 1)]
return item
}
// Peek returns the item at the beginning of the queue, without removing the
// item or otherwise mutating the queue. It is safe to call directly.
func (pq waitForPriorityQueue) Peek() interface{} {
return pq[0]
}
整个延迟队列的核心是waitingLoop。作为延迟队列的主要逻辑,检查waitingForAddCh是否有要延迟的内容,取出延迟的内容放入Heap;并保证最长的阻塞期
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
never := make(
now := q.clock.Now()
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
if entry.readyAt.After(now) {
break // 时间没到则不处理
}
entry = heap.Pop(waitingForQueue).(*waitFor) // 从优先级队列中取出一个
q.Add(entry.data) // 添加到延迟队列中
delete(waitingEntryByData, entry.data) // 删除map表中的数据
}
// 如果存在数据则设置最近一个内容要执行的定时器
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor) // 窥视[0]和值
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) // 创建一个定时器
nextReadyAt = nextReadyAtTimer.C()
}
select {
case <-q.stopCh: // 退出
return
case <-q.heartbeat.C(): // 多久没有任何动作时重新一次循环
case <-nextReadyAt: // 如果有元素时间到了,则继续执行循环,处理上面添加的操作
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) { // 时间没到,是用readyAt和now对比time.Now
// 添加到延迟队列中,有两个 waitingEntryByData waitingForQueue
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
drained := false // 保证可以取完q.waitingForAddCh // addafter
for !drained {
select {
// 这里是一个有buffer的队列,需要保障这个队列读完
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default: // 保证可以退出,但限制于上一个分支的0~n的读取
// 如果上一个分支阻塞,则为没有数据就是取尽了,走到这个分支
// 如果上个分支不阻塞则读取到上个分支阻塞为止,代表阻塞,则走default退出
drained = true
}
}
}
}
}
限速队列
限速队列RateLimiting是在优先队列和延迟队列的基础上扩展的队列
type RateLimitingInterface interface {
DelayingInterface // 继承延迟队列
// 在限速器准备完成后(即合规后)添加条目到队列中
AddRateLimited(item interface{})
// drop掉条目,无论成功或失败
Forget(item interface{})
// 被重新放入队列中的次数
NumRequeues(item interface{}) int
}
只要满足 AddRateLimited() 、 Forget() 、 NumRequeues() 的延迟队列是限速队列,就可以看到限速队列的抽象对应关系。了解了规则之后,就需要分析具体的实现了。
type rateLimitingType struct {
DelayingInterface
rateLimiter RateLimiter
}
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}
func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}
rateLimitingType 是抽象规范 RateLimitingInterface 的实现。可以看到延迟队列中增加了一个限速器RateLimiter。
type RateLimiter interface {
// when决定等待多长时间
When(item interface{}) time.Duration
// drop掉item
// or for success, we'll stop tracking it
Forget(item interface{})
// 重新加入队列中的次数
NumRequeues(item interface{}) int
}
抽象限速器的实现包括 BucketRateLimiter , ItemBucketRateLimiter , ItemExponentialFailureRateLimiter , ItemFastSlowRateLimiter , MaxOfRateLimiter ,下面对这些限速器进行分析
BucketRateLimiter
BucketRateLimiter 是一个令牌桶,它实现了 rate.Limiter 和抽象的 RateLimiter。通过workqueue.DefaultControllerRateLimiter()初始化。
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
关于令牌桶算法的更多信息,请参考这里
ItemBucketRateLimiter
ItemBucketRateLimiter 是将每个令牌桶存储为一个列表的实现,每个键是一个单独的速率限制器
type ItemBucketRateLimiter struct {
r rate.Limit
burst int
limitersLock sync.Mutex
limiters map[interface{}]*rate.Limiter
}
func NewItemBucketRateLimiter(r rate.Limit, burst int) *ItemBucketRateLimiter {
return &ItemBucketRateLimiter{
r: r,
burst: burst,
limiters: make(map[interface{}]*rate.Limiter),
}
}
ItemExponentialFailureRateLimiter
顾名思义,ItemExponentialFailureRateLimiter 是一个错误索引限速器。根据错误的数量,该指标用于延迟时间。该指数的计算公式为:(baseDelaytimes2^{})。可以看出When决定了流量整形的延迟时间,并根据错误次数延长重试时间。
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int // 失败的次数
baseDelay time.Duration // 延迟基数
maxDelay time.Duration // 最大延迟
}
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
![图片[1]-【知识点】go的channel无法满足kubernetes的应用场景-唐朝资源网](https://images.43s.cn/wp-content/uploads//2022/06/1655478646428_9.png)
return r.maxDelay
}
return calculated
}
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
ItemFastSlowRateLimiter
ItemFastSlowRateLimiter,限速器快速重试一定次数,然后慢速重试
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
maxFastAttempts int // 最大尝试次数
fastDelay time.Duration // 快的速度
slowDelay time.Duration // 慢的速度
}
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
return &ItemFastSlowRateLimiter{
failures: map[interface{}]int{},
fastDelay: fastDelay,
slowDelay: slowDelay,
maxFastAttempts: maxFastAttempts,
}
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
r.failures[item] = r.failures[item] + 1
// 当错误次数没超过快速的阈值使用快速,否则使用慢速
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
MaxOfRateLimiter
MaxOfRateLimiter 是返回的限速器列表中延迟最大的限速器
type MaxOfRateLimiter struct {
limiters []RateLimiter
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
![图片[2]-【知识点】go的channel无法满足kubernetes的应用场景-唐朝资源网](https://images.43s.cn/wp-content/uploads//2022/06/1655478646428_10.png)
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
return &MaxOfRateLimiter{limiters: limiters}
}
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
ret := 0
// 找到列表內所有的NumRequeues(失败的次数),以最多次的为主。
for _, limiter := range r.limiters {
curr := limiter.NumRequeues(item)
if curr > ret {
ret = curr
}
}
return ret
}
func (r *MaxOfRateLimiter) Forget(item interface{}) {
for _, limiter := range r.limiters {
limiter.Forget(item)
}
}
如何使用 Kubernetes 限速器
基于流量控制的限速队列实例可以大量突发,但需要整形。加法运算会根据When()中设计的等待时间进行加法运算。根据不同的队列实现不同的延迟
package main
import (
"fmt"
"log"
"strconv"
"time"
"k8s.io/client-go/util/workqueue"
)
func main() {
stopCh := make(chan string)
timeLayout := "2006-01-02:15:04:05.0000"
limiter := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
length := 20 // 一共请求20次
chs := make([]chan string, length)
for i := 0; i < length; i++ {
chs[i] = make(chan string, 1)
go func(taskId string, ch chan string) {
item := "Task-" + taskId + time.Now().Format(timeLayout)
log.Println(item + " Added.")
limiter.AddRateLimited(item) // 添加会根据When() 延迟添加到工作队列中
}(strconv.FormatInt(int64(i), 10), chs[i])
go func() {
for {
key, quit := limiter.Get()
if quit {
return
}
log.Println(fmt.Sprintf("%s process done", key))
defer limiter.Done(key)
}
}()
}
<-stopCh
}
由于默认限速器不支持初始化QPS,将源码修改为(BT(1, 5)),执行结果可以看到,当有大流量突发时,桶中token的数量超过桶中token的数量,将根据token生成的速度释放。
图中,任务的添加是突然的,同时添加了日志打印,但是消费者可以看到添加前的日志输出实际上是延迟的。配置是每秒一个token,实际释放流量也是每秒一个token。
暂无评论内容