2019年8月

延迟队列是我们在业务中经常遇到的场景,例如订单过期,定时发布,定时推送等等。

在 Zset里面,每一个成员都有一个所谓的分数:score,把当前时间作为分数,因为 Zset 是有序的,时间越小的排名越靠前。所以使用Zset作为延时队列就充分利用了score。

我们还是用订单例子来距离,如果订单超过 24 小时未付款,那么久取消订单,这是在电商业务中最常见的处理模式。

那么在 redis 中的数据个是就如图 1.1。

图 1.1

而程序执行流程如下图

  1. 用户下单后将订单信息插入到数据库中
  2. 将数据插入到 redis 中,结构如图 1.1
  3. 启动死循环,定时从 zset 中获取接近当前时间戳的值,然后逻辑判断是否更新数据。

接下来我们回顾一下 zset相关的命令:

ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]

注意排序是按照 score 从小到大排序的,这也是我们想要的,其中 min 和 max 可以是 -inf 和 +inf。

  • -inf 分数的最低值
  • +inf 分数的最高值
ZRANGEBYSCORE order:delay_queue -inf 5000  WINTHSCORES

这代表你不知道最低值是多少,取值范围是 <= 5000

ZRANGEBYSCORE order:delay_queue -inf current_timestamp WINTHSCORES limit 0 10

代表取 10 条最小等于当前时间的所有内容。

我们还是使用命令行脚本来模拟入队操作。

<?php

$orderId = $argv[1] ?? '';
if (empty($orderId)) {
    exit("请填写要压入队列的数据");
}
$redis = new Redis();

$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(0);
$queueName = 'order:delay_queue';

$redis->zAdd($queueName, time(), $orderId);

执行代码,向队列中插入四条数据。

# 执行入队操作
➜ php zset_client.php O20190101
➜ php zset_client.php O20190102
➜ php zset_client.php O20190103
➜ php zset_client.php O20190104

# redis 查看数据
127.0.0.1:6379> zrange order:delay_queue 0 -1 withscores
1) "O20190101"
2) "1667213048"
3) "O20190102"
4) "1667213049"
5) "O20190103"
6) "1667213050"
7) "O20190104"
8) "1667213051"

接下来我们来实现消费端的代码:

<?php


$redis = new Redis();

$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(0);
$queueName = 'order:delay_queue';

while (true) {

    $orderIds = $redis->zRangeByScore($queueName, "-inf", time(), ['limit' => [0, 10]]);
    $ids = implode(',', $orderIds);

    if (!empty($ids)) {
        //模拟执行 SQL 语句,4 代表已经取消
        $sql = "update `orders` set `status` = 4 where in ({$ids})";
        echo "$sql" . PHP_EOL;
        //执行其他业务逻辑,例如推送通知等操作
    }

    $redis->zRem($queueName, ...$orderIds);
    usleep(500 * 1000);
}

这里需要注意我们在更新数据库的时候不要一条一条的更新,同时这些代码只是用来演示基本原理,还有好多需要处理的内容。

  1. 处理程序的退出,监听信号
  2. 尽量使用异步方法处理业务逻辑,可以尝试使用 swoole,workman 带有多进程管理的框架或者已经封装好的类库。

Redis 本身是用来做缓存的,但是其中有一些特性是我们可以用来完成消息队列的功能,比如如果能够容忍数据的丢失,并且持久化方面要求不是很高的场景下完成任务的分布式处理。

队列的基本实现

Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边),而我们取出的时候可以从头部或者尾部来获取数据。

我们可以先头lpush 插入数据,然后再rpop来取数据,来模拟队列的先进先出。

$ redis-cli -h 127.0.0.1
127.0.0.1:6379> auth 123456
OK
127.0.0.1:6379> select 0
OK
127.0.0.1:6379> lpush order:queue SN20190801
(integer) 1
127.0.0.1:6379> lpush order:queue SN20190802
(integer) 2
127.0.0.1:6379> lpush order:queue SN20190803
(integer) 3
127.0.0.1:6379> rpop order:queue
"SN20190801"
127.0.0.1:6379> rpop order:queue
"SN20190802"
127.0.0.1:6379> rpop order:queue
"SN20190803"
127.0.0.1:6379> rpop order:queue
(nil)

如果想要批量处理,我们可以使用 LRANGE 命令:

127.0.0.1:6379> lpush order:queue SN20190801
(integer) 1
127.0.0.1:6379> lpush order:queue SN20190802
(integer) 2
127.0.0.1:6379> lpush order:queue SN20190803
(integer) 3
127.0.0.1:6379> lrange order:queue 0 10
1) "SN20190803"
2) "SN20190802"
3) "SN20190801"

基于这三个命令,我们就可以实现一个异步的消息队列了,我们首先来模拟消息队列的生产,最终我们的消费模型就如图 1.1。

未命名绘图.drawio (1)

图 1.1 消费模型

<?php

// 接收参数
$orderId = $argv[1] ?? '';

if (empty($orderId)) {
    exit("请填写要压入队列的数据");
}

// 新建 redis 连接
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(1);

try {
    echo $redis->lPush('order:queue', $orderId) . PHP_EOL;
} catch (Exception $exception) {
    //捕获异常,出现异常情况应该进行处理重传或者报警等等
    echo $exception->getMessage() . PHP_EOL;
}
// 释放资源
$redis->close();

这段代码非常简单,通过 $argv 来获取参数,也就是 order_id,通知消费端,该 id 有数据变更,要进行一些异步操作。

处理端我们使用 brPop 来阻塞获进行消费就可以了。

<?php

$redis = new Redis();

$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(1);

while (true) {
//阻塞获取,10 秒后没有获取到证明超时
    $result = $redis->brPop("order:queue", 10);
    //$result 包含两个返回值
    //$result[0] redis key 的名称 order:queue
    //$result[1] redis value 也就是我们压入队列的数据
    if ($result && $result[0]) {
        //业务处理逻辑
        echo "接收到订单号为 [" . $result[1] . "] 的订单,开始执行业务逻辑";
        usleep(500 * 1000); // 休眠500 毫秒,让出 CPU 时间片
    } else {
        continue;
    }
}

首先是使用 while 死循环来模拟一直在消费, brPop 设置一个十秒的阻塞,如果阻塞过了十秒就放弃获取,执行下一次逻辑,在处理完成后,我们要 usleep 500 毫秒,让出 CPU 时间片避免 CPU 资源被该进程一直占用。

这样一来一个简单的 redis 异步消息队列就实现好了,到这一步最好不要直接在生产环境这么写。

我们还需要处理 kill 信号来实现优雅退出,因为 kill 信号是直接杀死进程,这个时候任务可能正在处于执行状态,这种处理方式是不可重入的,因为你无法确定某个操作环节会出错。

利用bRPopLPush命令补救数据丢失

如果你不能容忍消息的丢失或者持久化,我们可以使用比较成熟的 MQ,例如 RocketMQ,RabbitMQ 或者 kafka,这些 MQ 都可以比较好的解决这个问题。

bRPopLPush 是 redis 提供给我们的一个命令,改命令从列表中取出最后一个元素,并插入到另外一个列表的头部; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

我们可以利用这个提醒替换 BRPOP ,在获取队列内容的时候将其丢到一个备份 key 中,当消息执行结束后,再将备份的数据从备份 key 中删除。

<?php

$redis = new Redis();

$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(1);

$orderQueueKey = 'order:queue';
$backupQueueKey = 'order:queue:backup';

// 程序启动优先处理 backup key 中的数据
while (true) {
    $result = $redis->brPop($backupQueueKey, 1);
    if ($result && $result[0]) {
        //业务处理逻辑
        echo "接收到订单号为 [" . $result[1] . "] 的订单,开始执行业务逻辑" . PHP_EOL;
        sleep(3); // 模拟耗时操作
        echo "处理完成" . PHP_EOL;
        usleep(500 * 1000); // 休眠500 毫秒,让出 CPU 时间片
    } else {
        break;
    }
}

while (true) {

//阻塞获取,10 秒后没有获取到证明超时
    $result = $redis->bRPopLPush("order:queue", "order:queue:backup", 10);

    if ($result) {
        //业务处理逻辑
        echo "接收到订单号为 [" . $result . "] 的订单,开始执行业务逻辑" . PHP_EOL;
        sleep(3); // 模拟耗时操作
        echo "处理完成" . PHP_EOL;
        //处理完成后取出key
        $redis->lPop($backupQueueKey);
        
        usleep(500 * 1000); // 休眠500 毫秒,让出 CPU 时间片
    } else {
        continue;
    }
}

我们首先在真正的处理逻辑前增加了对备份key 的处理,让程序启动后先处理之前没有梳理完成的数据。

在最下方处理队列的地方也做了改动,在处理完成后取出备份 key 中的数据。