PHP 利用 Redis List 数据结构实现消息队列

PHP , 实战973 字

Redis 本身是用来做缓存的,但是其中有一些特性是我们可以用来完成消息队列的功能,比如如果能够容忍数据的丢失,并且持久化方面要求不是很高的场景下完成任务的分布式处理。

队列的基本实现

Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边),而我们取出的时候可以从头部或者尾部来获取数据。

我们可以先头lpush 插入数据,然后再rpop来取数据,来模拟队列的先进先出。

$ redis-cli -h 127.0.0.1
127.0.0.1:6379> auth 123456
OK
127.0.0.1:6379> select 0
OK
127.0.0.1:6379> lpush order:queue SN20190801
(integer) 1
127.0.0.1:6379> lpush order:queue SN20190802
(integer) 2
127.0.0.1:6379> lpush order:queue SN20190803
(integer) 3
127.0.0.1:6379> rpop order:queue
"SN20190801"
127.0.0.1:6379> rpop order:queue
"SN20190802"
127.0.0.1:6379> rpop order:queue
"SN20190803"
127.0.0.1:6379> rpop order:queue
(nil)

如果想要批量处理,我们可以使用 LRANGE 命令:

127.0.0.1:6379> lpush order:queue SN20190801
(integer) 1
127.0.0.1:6379> lpush order:queue SN20190802
(integer) 2
127.0.0.1:6379> lpush order:queue SN20190803
(integer) 3
127.0.0.1:6379> lrange order:queue 0 10
1) "SN20190803"
2) "SN20190802"
3) "SN20190801"

基于这三个命令,我们就可以实现一个异步的消息队列了,我们首先来模拟消息队列的生产,最终我们的消费模型就如图 1.1。

未命名绘图.drawio (1)

图 1.1 消费模型

<?php

// 接收参数
$orderId = $argv[1] ?? '';

if (empty($orderId)) {
    exit("请填写要压入队列的数据");
}

// 新建 redis 连接
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(1);

try {
    echo $redis->lPush('order:queue', $orderId) . PHP_EOL;
} catch (Exception $exception) {
    //捕获异常,出现异常情况应该进行处理重传或者报警等等
    echo $exception->getMessage() . PHP_EOL;
}
// 释放资源
$redis->close();

这段代码非常简单,通过 $argv 来获取参数,也就是 order_id,通知消费端,该 id 有数据变更,要进行一些异步操作。

处理端我们使用 brPop 来阻塞获进行消费就可以了。

<?php

$redis = new Redis();

$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(1);

while (true) {
//阻塞获取,10 秒后没有获取到证明超时
    $result = $redis->brPop("order:queue", 10);
    //$result 包含两个返回值
    //$result[0] redis key 的名称 order:queue
    //$result[1] redis value 也就是我们压入队列的数据
    if ($result && $result[0]) {
        //业务处理逻辑
        echo "接收到订单号为 [" . $result[1] . "] 的订单,开始执行业务逻辑";
        usleep(500 * 1000); // 休眠500 毫秒,让出 CPU 时间片
    } else {
        continue;
    }
}

首先是使用 while 死循环来模拟一直在消费, brPop 设置一个十秒的阻塞,如果阻塞过了十秒就放弃获取,执行下一次逻辑,在处理完成后,我们要 usleep 500 毫秒,让出 CPU 时间片避免 CPU 资源被该进程一直占用。

这样一来一个简单的 redis 异步消息队列就实现好了,到这一步最好不要直接在生产环境这么写。

我们还需要处理 kill 信号来实现优雅退出,因为 kill 信号是直接杀死进程,这个时候任务可能正在处于执行状态,这种处理方式是不可重入的,因为你无法确定某个操作环节会出错。

利用bRPopLPush命令补救数据丢失

如果你不能容忍消息的丢失或者持久化,我们可以使用比较成熟的 MQ,例如 RocketMQ,RabbitMQ 或者 kafka,这些 MQ 都可以比较好的解决这个问题。

bRPopLPush 是 redis 提供给我们的一个命令,改命令从列表中取出最后一个元素,并插入到另外一个列表的头部; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

我们可以利用这个提醒替换 BRPOP ,在获取队列内容的时候将其丢到一个备份 key 中,当消息执行结束后,再将备份的数据从备份 key 中删除。

<?php

$redis = new Redis();

$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(1);

$orderQueueKey = 'order:queue';
$backupQueueKey = 'order:queue:backup';

// 程序启动优先处理 backup key 中的数据
while (true) {
    $result = $redis->brPop($backupQueueKey, 1);
    if ($result && $result[0]) {
        //业务处理逻辑
        echo "接收到订单号为 [" . $result[1] . "] 的订单,开始执行业务逻辑" . PHP_EOL;
        sleep(3); // 模拟耗时操作
        echo "处理完成" . PHP_EOL;
        usleep(500 * 1000); // 休眠500 毫秒,让出 CPU 时间片
    } else {
        break;
    }
}

while (true) {

//阻塞获取,10 秒后没有获取到证明超时
    $result = $redis->bRPopLPush("order:queue", "order:queue:backup", 10);

    if ($result) {
        //业务处理逻辑
        echo "接收到订单号为 [" . $result . "] 的订单,开始执行业务逻辑" . PHP_EOL;
        sleep(3); // 模拟耗时操作
        echo "处理完成" . PHP_EOL;
        //处理完成后取出key
        $redis->lPop($backupQueueKey);
        
        usleep(500 * 1000); // 休眠500 毫秒,让出 CPU 时间片
    } else {
        continue;
    }
}

我们首先在真正的处理逻辑前增加了对备份key 的处理,让程序启动后先处理之前没有梳理完成的数据。

在最下方处理队列的地方也做了改动,在处理完成后取出备份 key 中的数据。

maksim
Maksim(一笑,吡罗),PHPer,Goper
OωO
开启隐私评论,您的评论仅作者和评论双方可见