Redis 本身是用来做缓存的,但是其中有一些特性是我们可以用来完成消息队列的功能,比如如果能够容忍数据的丢失,并且持久化方面要求不是很高的场景下完成任务的分布式处理。
队列的基本实现
Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边),而我们取出的时候可以从头部或者尾部来获取数据。
我们可以先头lpush 插入数据,然后再rpop来取数据,来模拟队列的先进先出。
$ redis-cli -h 127.0.0.1
127.0.0.1:6379> auth 123456
OK
127.0.0.1:6379> select 0
OK
127.0.0.1:6379> lpush order:queue SN20190801
(integer) 1
127.0.0.1:6379> lpush order:queue SN20190802
(integer) 2
127.0.0.1:6379> lpush order:queue SN20190803
(integer) 3
127.0.0.1:6379> rpop order:queue
"SN20190801"
127.0.0.1:6379> rpop order:queue
"SN20190802"
127.0.0.1:6379> rpop order:queue
"SN20190803"
127.0.0.1:6379> rpop order:queue
(nil)
如果想要批量处理,我们可以使用 LRANGE
命令:
127.0.0.1:6379> lpush order:queue SN20190801
(integer) 1
127.0.0.1:6379> lpush order:queue SN20190802
(integer) 2
127.0.0.1:6379> lpush order:queue SN20190803
(integer) 3
127.0.0.1:6379> lrange order:queue 0 10
1) "SN20190803"
2) "SN20190802"
3) "SN20190801"
基于这三个命令,我们就可以实现一个异步的消息队列了,我们首先来模拟消息队列的生产,最终我们的消费模型就如图 1.1。
<?php
// 接收参数
$orderId = $argv[1] ?? '';
if (empty($orderId)) {
exit("请填写要压入队列的数据");
}
// 新建 redis 连接
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(1);
try {
echo $redis->lPush('order:queue', $orderId) . PHP_EOL;
} catch (Exception $exception) {
//捕获异常,出现异常情况应该进行处理重传或者报警等等
echo $exception->getMessage() . PHP_EOL;
}
// 释放资源
$redis->close();
这段代码非常简单,通过 $argv
来获取参数,也就是 order_id
,通知消费端,该 id 有数据变更,要进行一些异步操作。
处理端我们使用 brPop
来阻塞获进行消费就可以了。
<?php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(1);
while (true) {
//阻塞获取,10 秒后没有获取到证明超时
$result = $redis->brPop("order:queue", 10);
//$result 包含两个返回值
//$result[0] redis key 的名称 order:queue
//$result[1] redis value 也就是我们压入队列的数据
if ($result && $result[0]) {
//业务处理逻辑
echo "接收到订单号为 [" . $result[1] . "] 的订单,开始执行业务逻辑";
usleep(500 * 1000); // 休眠500 毫秒,让出 CPU 时间片
} else {
continue;
}
}
首先是使用 while
死循环来模拟一直在消费, brPop
设置一个十秒的阻塞,如果阻塞过了十秒就放弃获取,执行下一次逻辑,在处理完成后,我们要 usleep
500 毫秒,让出 CPU 时间片避免 CPU 资源被该进程一直占用。
这样一来一个简单的 redis 异步消息队列就实现好了,到这一步最好不要直接在生产环境这么写。
我们还需要处理 kill 信号来实现优雅退出,因为 kill 信号是直接杀死进程,这个时候任务可能正在处于执行状态,这种处理方式是不可重入的,因为你无法确定某个操作环节会出错。
利用bRPopLPush命令补救数据丢失
如果你不能容忍消息的丢失或者持久化,我们可以使用比较成熟的 MQ,例如 RocketMQ,RabbitMQ 或者 kafka,这些 MQ 都可以比较好的解决这个问题。
bRPopLPush 是 redis 提供给我们的一个命令,改命令从列表中取出最后一个元素,并插入到另外一个列表的头部; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
我们可以利用这个提醒替换 BRPOP ,在获取队列内容的时候将其丢到一个备份 key 中,当消息执行结束后,再将备份的数据从备份 key 中删除。
<?php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(1);
$orderQueueKey = 'order:queue';
$backupQueueKey = 'order:queue:backup';
// 程序启动优先处理 backup key 中的数据
while (true) {
$result = $redis->brPop($backupQueueKey, 1);
if ($result && $result[0]) {
//业务处理逻辑
echo "接收到订单号为 [" . $result[1] . "] 的订单,开始执行业务逻辑" . PHP_EOL;
sleep(3); // 模拟耗时操作
echo "处理完成" . PHP_EOL;
usleep(500 * 1000); // 休眠500 毫秒,让出 CPU 时间片
} else {
break;
}
}
while (true) {
//阻塞获取,10 秒后没有获取到证明超时
$result = $redis->bRPopLPush("order:queue", "order:queue:backup", 10);
if ($result) {
//业务处理逻辑
echo "接收到订单号为 [" . $result . "] 的订单,开始执行业务逻辑" . PHP_EOL;
sleep(3); // 模拟耗时操作
echo "处理完成" . PHP_EOL;
//处理完成后取出key
$redis->lPop($backupQueueKey);
usleep(500 * 1000); // 休眠500 毫秒,让出 CPU 时间片
} else {
continue;
}
}
我们首先在真正的处理逻辑前增加了对备份key 的处理,让程序启动后先处理之前没有梳理完成的数据。
在最下方处理队列的地方也做了改动,在处理完成后取出备份 key 中的数据。