延迟队列是我们在业务中经常遇到的场景,例如订单过期,定时发布,定时推送等等。
在 Zset里面,每一个成员都有一个所谓的分数:score,把当前时间作为分数,因为 Zset 是有序的,时间越小的排名越靠前。所以使用Zset作为延时队列就充分利用了score。
我们还是用订单例子来距离,如果订单超过 24 小时未付款,那么久取消订单,这是在电商业务中最常见的处理模式。
那么在 redis 中的数据个是就如图 1.1。

而程序执行流程如下图

- 用户下单后将订单信息插入到数据库中
- 将数据插入到 redis 中,结构如图 1.1
- 启动死循环,定时从 zset 中获取接近当前时间戳的值,然后逻辑判断是否更新数据。
接下来我们回顾一下 zset相关的命令:
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
注意排序是按照 score 从小到大排序的,这也是我们想要的,其中 min 和 max 可以是 -inf 和 +inf。
- -inf 分数的最低值
- +inf 分数的最高值
ZRANGEBYSCORE order:delay_queue -inf 5000 WINTHSCORES
这代表你不知道最低值是多少,取值范围是 <= 5000
ZRANGEBYSCORE order:delay_queue -inf current_timestamp WINTHSCORES limit 0 10
代表取 10 条最小等于当前时间的所有内容。
我们还是使用命令行脚本来模拟入队操作。
<?php
$orderId = $argv[1] ?? '';
if (empty($orderId)) {
exit("请填写要压入队列的数据");
}
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(0);
$queueName = 'order:delay_queue';
$redis->zAdd($queueName, time(), $orderId);
执行代码,向队列中插入四条数据。
# 执行入队操作
➜ php zset_client.php O20190101
➜ php zset_client.php O20190102
➜ php zset_client.php O20190103
➜ php zset_client.php O20190104
# redis 查看数据
127.0.0.1:6379> zrange order:delay_queue 0 -1 withscores
1) "O20190101"
2) "1667213048"
3) "O20190102"
4) "1667213049"
5) "O20190103"
6) "1667213050"
7) "O20190104"
8) "1667213051"
接下来我们来实现消费端的代码:
<?php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(0);
$queueName = 'order:delay_queue';
while (true) {
$orderIds = $redis->zRangeByScore($queueName, "-inf", time(), ['limit' => [0, 10]]);
$ids = implode(',', $orderIds);
if (!empty($ids)) {
//模拟执行 SQL 语句,4 代表已经取消
$sql = "update `orders` set `status` = 4 where in ({$ids})";
echo "$sql" . PHP_EOL;
//执行其他业务逻辑,例如推送通知等操作
}
$redis->zRem($queueName, ...$orderIds);
usleep(500 * 1000);
}
这里需要注意我们在更新数据库的时候不要一条一条的更新,同时这些代码只是用来演示基本原理,还有好多需要处理的内容。
- 处理程序的退出,监听信号
- 尽量使用异步方法处理业务逻辑,可以尝试使用 swoole,workman 带有多进程管理的框架或者已经封装好的类库。