通用队列
在kubernetes中,使用go的channel无法满足kubernetes的应用场景,如延迟、限速等;在kubernetes中存在三种队列通用队列 common queue
,延迟队列 delaying queue
,和限速队列 rate limiters queue
Inferface
Interface作为所有队列的一个抽象定义
|
|
Implementation
|
|
可以看到其中核心属性就是 queue
, dirty
, processing
延迟队列
在研究优先级队列前,需要对 Heap
有一定的了解,因为delay queue使用了 heap
做延迟队列
Heap
Heap
是基于树属性的特殊数据结构;heap是一种完全二叉树类型,具有两种类型:
- 如:B 是 A 的子节点,则 $key(A) \geq key(B)$ 。这就意味着具有最大Key的元素始终位于根节点,这类Heap称为最大堆 MaxHeap。
- 父节点的值小于或等于其左右子节点的值叫做 MinHeap
二叉堆的存储规则:
- 每个节点包含的元素大于或等于该节点子节点的元素。
- 树是完全二叉树。
那么下列图片中,那个是堆
heap的实现
实例:向左边添加一个值为42的元素的过程
步骤一:将新元素放入堆中的第一个可用位置。这将使结构保持为完整的二叉树,但它可能不再是堆,因为新元素可能具有比其父元素更大的值。
步骤二:如果新元素的值大于父元素,将新元素与父元素交换,直到达到新元素到根,或者新元素大于等于其父元素的值时将停止
这种过程被称为 向上调整 (reheapification upward
)
实例:移除根
步骤一:将根元素复制到用于返回值的变量中,将最深层的最后一个元素复制到根,然后将最后一个节点从树中取出。该元素称为 out-of-place
。
步骤二:而将异位元素与其最大值的子元素交换,并返回在步骤1中保存的值。
这个过程被称为向下调整 (reheapification downward
)
优先级队列
优先级队列的行为:
- 元素被放置在队列中,然后被取出。
- 优先级队列中的每个元素都有一个关联的数字,称为优先级。
- 当元素离开优先级队列时,最高优先级的元素最先离开。
如何实现的:
在优先级队列中,heap的每个节点都包含一个元素以及元素的优先级,并且维护树以便它遵循使用元素的优先级来比较节点的堆存储规则:
- 每个节点包含的元素的优先级大于或等于该节点子元素的优先级。
- 树是完全二叉树。
实现的代码:golang priorityQueue
Client-go 的延迟队列
在Kubernetes中对 delaying queue
的设计非常精美,通过使用 heap
实现的延迟队列,加上kubernetes中的通过队列,完成了延迟队列的功能。
|
|
具体实现了 DelayingInterface
的实例
|
|
那么延迟队列的整个数据结构如下图所示
而上面部分也说到了,这个延迟队列的核心就是一个优先级队列,而优先级队列又需要满足:
- 优先级队列中的每个元素都有一个关联的数字,称为优先级。
- 当元素离开优先级队列时,最高优先级的元素最先离开。
而 waitFor
就是这个优先级队列的数据结构
|
|
而 waitForPriorityQueue
是对 container/heap/heap.go.Inferface
的实现,其数据结构就是使最小 readyAt
位于Root 的一个 MinHeap
|
|
而这个的实现是 waitForPriorityQueue
|
|
而整个延迟队列的核心就是 waitingLoop
,作为了延迟队列的主要逻辑,检查 waitingForAddCh
有没有要延迟的内容,取出延迟的内容放置到 Heap
中;以及保证最大的阻塞周期
|
|
限速队列
限速队列 RateLimiting
是在优先级队列是在延迟队列的基础上进行扩展的一个队列
|
|
可以看到一个限速队列的抽象对应只要满足了 AddRateLimited()
, Forget()
, NumRequeues()
的延迟队列都是限速队列。看了解规则之后,需要对具体的实现进行分析。
|
|
rateLimitingType
则是对抽象规范 RateLimitingInterface
的实现,可以看出是在延迟队列的基础上增加了一个限速器 RateLimiter
|
|
抽象限速器的实现,有 BucketRateLimiter
, ItemBucketRateLimiter
, ItemExponentialFailureRateLimiter
, ItemFastSlowRateLimiter
, MaxOfRateLimiter
,下面对这些限速器进行分析
BucketRateLimiter
BucketRateLimiter
是实现 rate.Limiter
与 抽象 RateLimiter
的一个令牌桶,初始化时通过 workqueue.DefaultControllerRateLimiter()
进行初始化。
|
|
ItemBucketRateLimiter
ItemBucketRateLimiter
是作为列表存储每个令牌桶的实现,每个key都是单独的限速器
|
|
ItemExponentialFailureRateLimiter
如名所知 ItemExponentialFailureRateLimiter
限速器是一个错误指数限速器,根据错误的次数,将指数用于delay的时长,指数的计算公式为:$baseDelay\times2^{
|
|
ItemFastSlowRateLimiter
ItemFastSlowRateLimiter
,限速器先快速重试一定次数,然后慢速重试
|
|
MaxOfRateLimiter
MaxOfRateLimiter
是返回限速器列表中,延迟最大的那个限速器
|
|
如何使用Kubernetes的限速器
基于流量管制的限速队列实例,可以大量突发,但是需要进行整形,添加操作会根据 When()
中设计的需要等待的时间进行添加。根据不同的队列实现不同方式的延迟
|
|
因为默认的限速器不支持初始化 QPS,修改源码内的为 $BT(1, 5)$ ,执行结果可以看出,大突发流量时,超过桶内token数时,会根据token生成的速度进行放行。
图中,任务的添加是突发性的,日志打印的是同时添加,但是在添加前输出的日志,消费端可以看到实际是被延迟了。配置的是每秒一个token,实际上放行流量也是每秒一个token。