[TOC]
基本概念
延时队列相比于普通队列最大的区别就体现在其延时的属性上,普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。从某种意义上来讲,延迟队列的结构并不像一个队列,而更像是一种以时间为权重的有序堆结构。
应用场景
延迟队列的应用场景非常的广泛,比如说以下的场景:
- 新建的订单,如果用户在15分钟内未支付,则自动取消。
- 公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户。
- 安全工单超过24小时未处理,则自动拉企业微信群提醒相关责任人。
- 用户下单外卖以后,距离超时时间还有10分钟时提醒外卖小哥即将超时。
- 买家收到订单后, 一天内没有评价,向买家发消息,要求评价
总体来说就是,一个事件后,一段时间内没有发什么 A 事件,那么就需要发什么 B 事件。
几种实现方式
redis zset
那么我们可以通过以下这几个操作使用Redis的ZSet来实现一个延迟队列:
- 入队操作:ZADD KEY timestamp task, 我们将需要处理的任务,按其需要延迟处理时间作为Score加入到ZSet中。Redis的ZAdd的时间复杂度是O(logN),N是ZSet中元素个数,因此我们能相对比较高效的进行入队操作。
- 起一个进程定时(比如每隔一秒)通过ZREANGEBYSCORE方法查询ZSet中Score最小的元素,具体操作为:ZRANGEBYSCORE KEY -inf +inf limit 0 1 WITHSCORES。查询结果有两种情况:
- 查询出的分数小于等于当前时间戳,说明到这个任务需要执行的时间了,则去异步处理该任务;
- 查询出的分数大于当前时间戳,由于刚刚的查询操作取出来的是分数最小的元素,所以说明ZSet中所有的任务都还没有到需要执行的时间,则休眠一秒后继续查询;
同样的,ZRANGEBYSCORE操作的时间复杂度为O(logN + M),其中N为ZSet中元素个数,M为查询的元素个数,因此我们定时查询操作也是比较高效的。
rabitmq 死信队列
RabbitMQ本身并不直接提供对延迟队列的支持,我们依靠RabbitMQ的TTL以及死信队列功能,来实现延迟队列的效果。那就让我们首先来了解一下,RabbitMQ的死信队列以及TTL功能。
死信队列实际上是一种RabbitMQ的消息处理机制,当RabbmitMQ在生产和消费消息的时候,消息遇到如下的情况,就会变成“死信”:
- 消息被拒绝basic.reject/ basic.nack 并且不再重新投递 requeue=false
- 消息超时未消费,也就是TTL过期了
- 消息队列到达最大长度
消息一旦变成一条死信,便会被重新投递到死信交换机(Dead-Letter-Exchange),然后死信交换机根据绑定规则转发到对应的死信队列上,监听该队列就可以让消息被重新消费。
时间轮算法(TimeWheel)
如上图所示,时间轮是一个存储延迟消息的环形队列,其底层采用数组实现,可以高效循环遍历。这个环形队列中的每个元素对应一个延迟任务列表,这个列表是一个双向环形链表,链表中每一项都代表一个需要执行的延迟任务。
时间轮会有表盘指针,表示时间轮当前所指时间,随着时间推移,该指针会不断前进,并处理对应位置上的延迟任务列表。
多层时间轮
到现在为止一切都非常棒,但是细心的同学可能发现了,上面的时间轮的大小是固定的,只有12秒。如果此时我们有一个需要延迟200秒的任务,我们应该怎么处理呢?直接扩充整个时间轮的大小吗?这显然不可取,因为这样做的话我们就需要维护一个非常非常大的时间轮,内存是不可接受的,而且底层数组大了之后寻址效率也会降低,影响性能。
为此,Kafka引入了多层时间轮的概念。其实多层时间轮的概念和我们的机械表上时针、分针、秒针的概念非常类似,当仅使用秒针无法表示当前时间时,就使用分针结合秒针一起表示。同样的,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。
go-zero 中的堆多层时间轮算法的实现
时间轮结构
1 | TimingWheel struct { |
设置延迟任务
1 | // 计算在 slots 中的位置,以及需要转几圈(使用圈层下降的方法来实现多层时间轮) |
时间推动
1 | func NewTicker(d time.Duration) Ticker { |
遍历当前时间(index) 的 slot
1 | func (tw *TimingWheel) scanAndRunTasks(l *list.List) { |
执行任务
1 | func (tw *TimingWheel) runTasks(tasks []timingTask) { |
beanstalkd 延迟队列
Beanstalkd 的特性
- 支持优先级(支持任务插队)
- 延迟(实现定时任务)
- 持久化(定时把内存中的数据刷到binlog日志)
- 预留(把任务设置成预留,消费者无法取出任务,等某个合适时机再拿出来处理)
- 任务超时重发(消费者必须在指定时间内处理任务,如果没有则认为任务失败,重新进入队列)
Beanstalkd 的客户端在将一个任务放入到 Beanstalkd 的队列时,可以指定延迟的时间时间, Beanstalkd 的 server 在收到这个带延迟时间的任务时, 会先将等待延迟时间这么长的时间,然后将任务放入队列中。
golang 简单使用
1 | func put() { |
go-queue 中的封装
可以参考 example : https://github.com/tal-tech/go-queue