0%

算法之延迟队列

[TOC]

基本概念

延时队列相比于普通队列最大的区别就体现在其延时的属性上,普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。从某种意义上来讲,延迟队列的结构并不像一个队列,而更像是一种以时间为权重的有序堆结构。

应用场景

延迟队列的应用场景非常的广泛,比如说以下的场景:

  • 新建的订单,如果用户在15分钟内未支付,则自动取消。
  • 公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户。
  • 安全工单超过24小时未处理,则自动拉企业微信群提醒相关责任人。
  • 用户下单外卖以后,距离超时时间还有10分钟时提醒外卖小哥即将超时。
  • 买家收到订单后, 一天内没有评价,向买家发消息,要求评价

总体来说就是,一个事件后,一段时间内没有发什么 A 事件,那么就需要发什么 B 事件。

几种实现方式

redis zset

那么我们可以通过以下这几个操作使用Redis的ZSet来实现一个延迟队列:

  • 入队操作:ZADD KEY timestamp task, 我们将需要处理的任务,按其需要延迟处理时间作为Score加入到ZSet中。Redis的ZAdd的时间复杂度是O(logN),N是ZSet中元素个数,因此我们能相对比较高效的进行入队操作。
  • 起一个进程定时(比如每隔一秒)通过ZREANGEBYSCORE方法查询ZSet中Score最小的元素,具体操作为:ZRANGEBYSCORE KEY -inf +inf limit 0 1 WITHSCORES。查询结果有两种情况:
  • 查询出的分数小于等于当前时间戳,说明到这个任务需要执行的时间了,则去异步处理该任务;
  • 查询出的分数大于当前时间戳,由于刚刚的查询操作取出来的是分数最小的元素,所以说明ZSet中所有的任务都还没有到需要执行的时间,则休眠一秒后继续查询;

同样的,ZRANGEBYSCORE操作的时间复杂度为O(logN + M),其中N为ZSet中元素个数,M为查询的元素个数,因此我们定时查询操作也是比较高效的。

rabitmq 死信队列

RabbitMQ本身并不直接提供对延迟队列的支持,我们依靠RabbitMQ的TTL以及死信队列功能,来实现延迟队列的效果。那就让我们首先来了解一下,RabbitMQ的死信队列以及TTL功能。

死信队列实际上是一种RabbitMQ的消息处理机制,当RabbmitMQ在生产和消费消息的时候,消息遇到如下的情况,就会变成“死信”:

  • 消息被拒绝basic.reject/ basic.nack 并且不再重新投递 requeue=false
  • 消息超时未消费,也就是TTL过期了
  • 消息队列到达最大长度

消息一旦变成一条死信,便会被重新投递到死信交换机(Dead-Letter-Exchange),然后死信交换机根据绑定规则转发到对应的死信队列上,监听该队列就可以让消息被重新消费。

时间轮算法(TimeWheel)

如上图所示,时间轮是一个存储延迟消息的环形队列,其底层采用数组实现,可以高效循环遍历。这个环形队列中的每个元素对应一个延迟任务列表,这个列表是一个双向环形链表,链表中每一项都代表一个需要执行的延迟任务。

时间轮会有表盘指针,表示时间轮当前所指时间,随着时间推移,该指针会不断前进,并处理对应位置上的延迟任务列表。

多层时间轮

到现在为止一切都非常棒,但是细心的同学可能发现了,上面的时间轮的大小是固定的,只有12秒。如果此时我们有一个需要延迟200秒的任务,我们应该怎么处理呢?直接扩充整个时间轮的大小吗?这显然不可取,因为这样做的话我们就需要维护一个非常非常大的时间轮,内存是不可接受的,而且底层数组大了之后寻址效率也会降低,影响性能。

为此,Kafka引入了多层时间轮的概念。其实多层时间轮的概念和我们的机械表上时针、分针、秒针的概念非常类似,当仅使用秒针无法表示当前时间时,就使用分针结合秒针一起表示。同样的,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。

go-zero 中的堆多层时间轮算法的实现

时间轮结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
TimingWheel struct {
interval time.Duration
ticker timex.Ticker
slots []*list.List // 时间轮,每一个slot里是一个任务双向链表
timers *SafeMap
tickedPos int
numSlots int
execute Execute
setChannel chan timingEntry
moveChannel chan baseEntry
removeChannel chan interface{}
drainChannel chan func(key, value interface{})
stopChannel chan lang.PlaceholderType
}
设置延迟任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 计算在 slots 中的位置,以及需要转几圈(使用圈层下降的方法来实现多层时间轮)
pos, circle := tw.getPositionAndCircle(task.delay)

newItem := &timingEntry{
baseEntry: task,
value: timer.item.value,
}


timer.item.circle = circle
newItem := &timingEntry{
baseEntry: task,
value: timer.item.value,
}
tw.slots[pos].PushBack(newItem)
// 向时间轮中设置 task
tw.setTimerPosition(pos, newItem)
时间推动
1
2
3
4
5
6
7
8
9
10
func NewTicker(d time.Duration) Ticker {
return &realTicker{
Ticker: time.NewTicker(d),
}
}

// 使用 golang 的 Ticker 来实现时间推动, 移动时间轮上的 slots 的 index
func (rt *realTicker) Chan() <-chan time.Time {
return rt.C
}
遍历当前时间(index) 的 slot
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (tw *TimingWheel) scanAndRunTasks(l *list.List) {
var tasks []timingTask
// task 双向链表
for e := l.Front(); e != nil; {
task := e.Value.(*timingEntry)

// 构造 tasks 列表
tasks = append(tasks, timingTask{
key: task.key,
value: task.value,
})
next := e.Next()
l.Remove(e)
tw.timers.Del(task.key)
e = next
}

// 执行所有的到期任务
tw.runTasks(tasks)
}
执行任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (tw *TimingWheel) runTasks(tasks []timingTask) {
if len(tasks) == 0 {
return
}

go func() {
for i := range tasks {
// 通过 时间轮注册的执行方法来执行 task
threading.RunSafe(func() {
tw.execute(tasks[i].key, tasks[i].value)
})
}
}()
}

beanstalkd 延迟队列

Beanstalkd 的特性

  • 支持优先级(支持任务插队)
  • 延迟(实现定时任务)
  • 持久化(定时把内存中的数据刷到binlog日志)
  • 预留(把任务设置成预留,消费者无法取出任务,等某个合适时机再拿出来处理)
  • 任务超时重发(消费者必须在指定时间内处理任务,如果没有则认为任务失败,重新进入队列)

Beanstalkd 的客户端在将一个任务放入到 Beanstalkd 的队列时,可以指定延迟的时间时间, Beanstalkd 的 server 在收到这个带延迟时间的任务时, 会先将等待延迟时间这么长的时间,然后将任务放入队列中。

golang 简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func put() {
c, err := beanstalk.Dial("tcp", addr)
if err != nil {
panic(err)
}
// 告诉 server 延迟 10分钟
id, err := c.Put([]byte("hello"), 1, 10 * time.Minute, 1*time.Second)
fmt.Println("put", id, err)
}

func get(name string, deleteFn func(*beanstalk.Conn, uint64), isReturn bool) {
for {
c, err := beanstalk.Dial("tcp", addr)
if err != nil {
panic(err)
}
id, body, err := c.Reserve(3 * time.Second)
fmt.Println("get", name, id, string(body), err)
if err != nil {
// 消费后删除
deleteFn(c, id)
}
if isReturn {
return
}
time.Sleep(1 * time.Second)
}
}

go-queue 中的封装

可以参考 example : https://github.com/tal-tech/go-queue

参考

主要的参考:你真的知道怎么实现一个延迟队列吗?

有赞延迟队列的设计

Go-Zero如何应对海量定时/延迟任务

go-queue的代码