分类 PHP 下的文章

本文转载于学而思网校技术团队,2020.10月我加入了好未来励步事业部,我们事业部开始大面积应用 swoole,所以对 swoole 进行了深入学习。本文作者:陈曹奇昊

初次接触Swoole的PHP开发者多少都会有点雾里看花的感觉,看不清本质。一部分PHP开发者并不清楚Swoole是什么,只是觉得很牛掰就想用了,这种行为无异于写作文的时候总想堆砌一些华丽的辞藻或是引经据典来提升文章逼格,却背离了文章的主题,本末倒置,每一种技术的诞生都有它的原因,异步或是协程不是万能的银弹,你需要它的时候再去用它,而不是想用它而用它,毕竟编程世界的惯性是巨大的,这天下还是同步阻塞的天下。还有一部分开发者是对Swoole有了一些自己的见解,但对错参半,写出来的程序能跑,甚至也能上生产,但不是最优的,其中大部分问题都源于开发者无法将惯有的思维方式灵活转变。

协程

首先协程的最简定义是用户态线程,它不由操作系统而是由用户创建,跑在单个线程(核心)上,比进程或是线程都更加轻量化,通常创建它只有内存消耗:假如你的配置允许你开几千个进程或线程,那么开几万个几十万个协程也是很轻松的事情,只要内存足够大,你可以几乎无止境地创建新的协程。在Swoole下,协程的切换实现是依靠双栈切换,即C栈和PHP栈同时切换,由于有栈协程的上下文总是足够的小,且在用户态便能完成切换,它的切换速度也总是远快于进程、线程,一般只需要纳秒级的CPU时间,对于实际运行的逻辑代码来说这点开销总是可以忽略不计(尤其是在一个重IO的程序中,通过调用分析可以发现协程切换所占的CPU时间非常之低)。

对于Swoole这样的有栈协程,你完全可以简单地将其看做是一个栈切换器,你可以在运行的子程序中随意切换到另一个子程序,底层会保存好被切走的协程的执行位置,回来时可以从原先的位置继续往下运行。

2023-03-25T15:29:53.png

Swoole多进程模型下的进程、线程、协程关系图
但这篇文章我们要谈的并不只是单单「协程」这一个概念,还隐含了关于异步网络IO一系列的东西,光有协程是什么也做不了的,因为Swoole的协程永远运行在一个线程中,想用它做并行计算是不可能的,运行速度只会因为创建开销而更慢,没有异步网络IO支持,你只能在不同协程间切来切去玩。

实际上PHP早就实现了协程,yield关键字就是允许你从一个函数中让出执行权,需要的时候能重新回到让出的位置继续往下执行,但它没有流行起来也有多种原因,一个是它的传染性,每一层调用都需要加关键字,另一个就是PHP没有高效可靠的异步IO支持,让其食之无味。

异步

注:本文中提到的异步IO并非全为严格定义上的异步IO,更多的是日常化的表达
简单了解了协程,再让我们来理解一下什么是异步IO吧。严格来说,在Unix下我们常说的异步并不是真异步,而是同步非阻塞,但是其效果和异步非常相近,所以我们日常中还是以异步相称。同步非阻塞和真异步区别在于:真异步是你提交读写请求后直接检查读写是否已完成即可,所以在Win下这样的技术被叫做「完成端口」,而同步非阻塞仅是操作不会长时间地陷入内核,但你需要在检查到可读或可写后,调用API同步地去拷贝数据,这会不可避免地陷入内核态,但read/write通常并不会阻塞太多的时间,从宏观上整个程序仍可以看作是全异步的。

阻塞非阻塞
同步read, write(read, write) + poll/ select / epoll / kqueue
异步-aio_read, aio_write, IOCP(windows)

在实际使用中,「伪异步」的Reactor模型并不比Windows下IOCP的Proactor逊色,并且我更喜欢Reactor的可控性,当然为了追求极致的性能和解决网络和文件异步IO统一的问题,未来Linux的io_uring可能会成为新的趋势。
2023-03-25T15:30:12.png

Reactor运行流程简图
我们可以通过上面的图片简单理解Reactor模型的运行流程,所谓的「异步」不过是多路复用带来的观感效果,你的程序不会阻塞在一个IO上,而是在无事可干的时候再阻塞在一堆IO上,即IO操作不在你需要CPU的时候阻塞你,你就不会感受到IO阻塞的存在。

结合现实情景来说,以前你要买饭(IO操作),你得下楼去买,还得排队等饭店大厨做完才能取回家吃(IO阻塞),到了下一餐,你又得重复之前的操作,很是麻烦,而且越是繁忙的时候等的时间越长(慢速IO),你觉得一天到晚净排队了,极大地浪费了你写代码的时间(CPU时间)。现在有了外卖,你直接下单(异步请求)就可以继续专心写代码(非阻塞),你还可以一次定三份饭(多路IO),饭到了骑手打电话让你下楼取(事件触发),前后只花了不到几分钟(同步读写,如果是Proactor连取餐都省了,直接给你送上楼),周六晚上的九点,你终于合上电脑,觉得充实极了,因为你几乎一整周都在写代码(CPU利用率高)。

协程+异步=同步非阻塞编程

现在我们有了协程和异步,我们可以做什么呢?那就是异步的同步化。这时候有的开发者就会说了,诶呀好不容易习惯异步了,怎么又退回到同步了呢。这就是为什么有些开发者始终写不出最优的协程代码的原因,异步由于操作的完成不是立即的,所以我们需要回调,而回调总是反人类的,嵌套的回调更是如此。

而结合协程,消灭回调我们只需要两步:在发出异步请求之后挂起协程,在异步回调触发时恢复协程。

Swoole\Coroutine\run(function(){
    // 1. 创建定时器并挂起协程#1
    Swoole\Coroutine::sleep(1);
    // 3. 协程恢复,继续向下运行退出,再次让出
});
// 2. 协程#1让出,进入事件循环,等待1s后定时器回调触发,恢复协程#1
// 4. 协程#1退出并让出,没有更多事件,事件循环退出,进程结束
短短的一行协程sleep,使用时几乎与同步阻塞的sleep无异,却是异步的。
for ($n = 10; $n--;) {
    Swoole\Coroutine::create(function(){
        Swoole\Coroutine::sleep(1);
    });
}

我们循环创建十个协程并各sleep一秒,但实际运行可以发现整个进程只阻塞了一秒,这就表明在Swoole提供的API下,阻塞操作都由进程级别的阻塞变为了协程级别的阻塞,这样我们可以以很小的开销在进程内通过创建大量协程来处理大量的IO任务。

协程代码编写思路

定时任务

当我们说到定时任务时,很多人第一时间都想到定时器,这没错,但是在协程世界,它不是最佳选择。

$stopTimer = false;
$timerContext = [];
$timerId = Swoole\Timer::tick(1, function () {
    // do something
    global $timerContext;
    global $timerId;
    global $stopTimer;
    $timerContext[] = 'data';
    if ($stopTimer) {
        var_dump($timerContext);
        Swoole\Timer::clear($timerId);
    }
});
// if we want to stop it:
$stopTimer = true;

在异步回调下,我们需要以这样的方式来掌控定时器,每一次定时器回调都会创建一个新的协程,并且我们不得不通过全局变量来维护它的上下文。

如果是协程呢?

Swoole\Coroutine\run(function() {
    $channel = new Swoole\Coroutine\Channel;
    Swoole\Coroutine::create(function () use ($channel) {
        $context = [];
        while (!$channel->pop(0.001)) {
            $context[] = 'data';
        }
        var_dump($context);
    });
    // if we want to stop it, just call:
    $channel->push(true);
});

完全同步的写法,从始至终只在一个协程里,不会丢失上下文,channel->pop在这里的效果相当于毫秒级sleep,并且我们可以通过push数据去停止这个定时器,非常的简单清晰。

Task

由于开发者的强烈要求,Swoole官方曾经做了一个错误的决定,就是在Task进程中支持协程和异步IO。

图片
正如图中所示,Task进程最初被设计为用来处理无法异步化的任务,充当类似于PHP-FPM的角色(半异步半同步模型),这样各司其职,能够将执行效率最大化。

最早期的Swoole开发者,甚至直接将Swoole的Worker进程用于执行同步阻塞任务,这种做法并非没有可取之处,它比PHP-FPM下的效率更高,因为程序是持续运行,常驻内存的,少了一些VM启动和销毁的开销,只是需要自己处理资源的生命周期等问题。

此外就是使用异步API的开发者,他们会开一堆Task进程,将一些暂时无法异步化的同步阻塞任务丢过去处理。

而以上两种都是历史条件下正确并合适的Swoole打开方式。

但是还有一小撮开发者,一股脑地把所有任务都投递给Task进程,以为这样就实现了任务异步化,Worker进程除了接收响应和投递任务什么也不干,殊不知这就相当于每一个任务的处理多了两次数据序列化开销 + 两次数据反序列开销 + 两次IPC开销 + 进程切换开销。

而当协程逐渐成为新的趋势后,又有越来越多的社区呼声要求Task进程也能支持协程和异步IO,这样他们就可以将协程方式编写的任务投递到Task中执行。但异步任务可以很轻量地在本进程被快速处理掉,对Worker整体性能并不会有太大影响,他们这样的行为,也是典型的舍近求远。

Task方式处理协程任务

$server->on('Receive', function(Swoole\Server $server) {
    # 投递任务,序列化任务数据,通过IPC发送给Task进程
    $task_id = $server->task('foo'); 
});
# 切换到Task进程
# 接收并反序列化Worker通过IPC发送来的任务数据
$server->on('Task', function (Swoole\Server $server, $task_id, $from_id, $data) {
    # 使用协程DNS查询
    $result = \Swoole\Coroutine::gethostbyname($data);
    # 序列化数据,通过IPC发送回Worker进程
    $server->finish($result);
});
# 回到Worker进程
# 接收并反序列化Task通过IPC发送来的结果数据
$server->on('Finish', function (Swoole\Server $server, int $task_id, $result) {
    # 需要通过任务id才能确认是哪个任务的结果
    echo "Task#{$task_id} finished";
    # 打印结果
    var_dump($result);
});

协程方式写Task

注:batch方法由swoole/library提供,内置支持需要Swoole-v4.5.2及以上版本,低版本可以自己使用Channel来调度

use Swoole\Coroutine;

Coroutine\run(function () {
    # 并发三个DNS查询任务
    $result = Coroutine\batch([
        '100tal' => function () {
            return Coroutine::gethostbyname('www.100tal.com');
        },
        'xueersi' => function () {
            return Coroutine::gethostbyname('www.xueersi.com');
        },
        'zhiyinlou' => function () {
            return Coroutine::gethostbyname('www.zhiyinlou.com');
        }
    ]);
    var_dump($result);
});

输出(API保证返回值顺序与输入顺序一致,不会因为异步而乱序)

array(3) {
  ["100tal"]=>
  string(14) "203.107.33.189"
  ["xueersi"]=>
  string(12) "60.28.226.27"
  ["zhiyinlou"]=>
  string(14) "101.36.129.150"
}

非常的简单易懂,不存在任何序列化或者IPC开销,并且由于程序是完全非阻塞的,大量的Task任务也不会对整体性能造成影响,所以说Task进程中使用协程或异步完全就是个错误,作为一个程序员,思维的僵化是很可怕的。

读到这里大家应该也能明白,我们所谈论的协程化技术实际上可以看做传统同步阻塞和非阻塞技术的超集,非阻塞的技术让程序可以同时处理大量IO,协程技术则是实现了可调度的异步单元,它让异步程序的行为变得更加可控。如果你的程序只有一个协程,那么程序整体就是同步阻塞的;如果你的程序在创建某个协程以后不关心它的内部返回值,它就是异步的。

希望通过本文,大家能够加深对协程和异步IO的理解,写出高质量可维护性强的协程程序。

cover

PHP 是无处不在的,可以说是互联网 Web 应用上使用最广泛的语言。

然而,它的高性能并不为人所知,尤其是在涉及到高并发系统时。这就是为什么对于这样特殊的用例,正在被 Node (是的,我知道,它不是一种语言)、Go 和 Elixir 等语言接管。

也就是说,您可以做很多事情来改进服务器上的 PHP 性能。本文主要关注 php-fpm 方面的内容,如果您使用 Nginx,这是在服务器上的默认配置。

如果你知道 php-fpm 是什么,请直接跳到优化部分。

什么是 php-fpm?

许多开发人员对 DevOps 方面的知识不太感兴趣,即使是那些对此感兴趣的开发人员,也极少有人知道它的底层原理。有趣的是,当浏览器发送一个请求到运行 PHP 的服务器上时,PHP 也不是最先进行处理请求的服务;而是,HTTP 服务器,Apache 和 Nginx 是其中最主要的两个。「web 服务器」决定如何与 PHP 进行通信,然后传递请求的类型,数据和头部信息到 PHP 进程。

php-fpm

上图是 PHP 项目的请求 - 响应生命周期(图片来源: ProinerTech)

在现代 PHP 应用中,「find file」部分即为 index.php 文件,它是在服务器配置文件中配置的用于处理所有请求的代理。

如今,Web 服务器究竟如何连接 PHP 正在进化,如果我们要深入研究所有细节,这篇文章的长度将激增。但粗略来说,在 Apache 作为 Web 服务器首选的时间段,PHP 是作为包含在服务器内部的模块。

所以每当一个请求被接收,服务器将开启一个新的进程,它将自动包含 PHP 和执行请求。这个方法被称作 mod_php,“PHP 作为一个模块” 的缩写。这种方法有其局限性,而 Nginx 和 php-fpm 克服了它。

在 php-fpm 中,管理 PHP 的责任在于服务器内部的 PHP 程序。换言之,Web 服务器 (Nginx, 在本例中), 不在乎 PHP 在哪和怎样运行的,只要它知道如何发送和接收数据即可。如果需要,在这种情况下,您可以将 PHP 视为另一台服务器,它管理传入请求的某些子 PHP 进程(因此,我们将请求送到服务器,该请求由服务器接收并传递到服务器 — — 太疯狂了!:-P)。

如果你用过 Nginx, 你会看到这些代码:

location ~ \.php$ {
    try_files $uri =404;
    fastcgi_split_path_info ^(.+\.php)(/.+)$;
    fastcgi_pass unix:/run/php/php7.2-fpm.sock;
    fastcgi_index index.php;
    include fastcgi_params;
    fastcgi_param  SCRIPT_FILENAME $document_root$fastcgi_script_name;
}

对于这一行:fastcgi_pass unix:/run/php/php7.2-fpm.sock;,它告诉 Nginx 通过 php7.2-fpm.sock 的 socket 与 php
进程通信。因此,对于每个传入的请求,Nginx 都通过这个文件写入数据,在接收到输出后,将其发送回浏览器。

我必须再次强调,对于如何运行这不是最完整或者最准确的,但对于大多数 DevOps 任务是完全准确的。

除此之外,让我们回顾一下到目前为止所学到的东西:

  • PHP 不会直接接收浏览器发送的请求。像 Nginx 这种 Web 服务器首先会拦截它。
  • Web 服务器知道如何连接到 PHP 进程,并将所有请求数据(粘贴所有内容)传递到 PHP 上。
  • PHP 完成其职责后,会将响应发送回 Web 服务器,然后将其发送回客户端(在大多数情况下为浏览器)。

流程图如下:

php nginx php-fpm 流程图

PHP 和 Nginx 如何协同工作? (图片来源:数据狗)

到目前为止都不错,那么关键问题来了:PHP-FPM 到底是什么呢?

PHP 中的 FPM 代表 「快速进程管理器」, 花式解释就是说,在服务器上运行的 PHP 并不是单个进程,而是由这个 FPM 进程管理器派生、控制和终止的一些 PHP 进程。web 服务器将请求传递给的就是这个进程管理器。

PHP-FPM 本身就是一个完整的兔子洞,所以如果您愿意,可以随意探索,但是对于我们的目的,这些解释就足够啦。 ?

为什么要优化 php-fpm?

一般在正常运行的情况下,为什么要考虑优化呢? 为什么不将事物保持原样。

具有讽刺意味的是,一般我为大多数用例提供建议的话。 如果您的设置运行良好,并且没有特殊用例,请使用默认设置。 但是,如果您希望扩展一台机器之外的能力,那么从一台机器中挤出最大的处理能力是必不可少的,因为它可以将您服务器的花费减少一半(甚至更多!)。

要说明的另一件事情是,Nginx 是为处理巨大的工作负载而构建的。 它能够同时处理成千上万的连接,但是如果您的 PHP 设置不合理,那么您将浪费很多资源,因为 Nginx 必须等待 PHP 完成当前处理之后才可以接受下一个请求,最终 Nginx 不能为您的服务提供任何优势!

所以,接下来让我们看看尝试优化 php-fpm 时我们到底要优化什么。

如何优化 PHP-FPM ?

php-fpm 的配置文件在不同服务器上的位置可能不同,因此您需要做一些调查来确定它的位置。在 UNIX 上,你可以使用 find 命令。在我的 Ubuntu 上,它的路径是 /etc/php/7.2/fpm/php-fpm.conf 。当然,7.2 是我正在运行的 PHP 版本。
下面是这个文件的前几行代码:

;;;;;;;;;;;;;;;;;;;;;
; FPM Configuration ;
;;;;;;;;;;;;;;;;;;;;;

; All relative paths in this configuration file are relative to PHP's install
; prefix (/usr). This prefix can be dynamically changed by using the
; '-p' argument from the command line.

;;;;;;;;;;;;;;;;;;
; Global Options ;
;;;;;;;;;;;;;;;;;;

[global]
; Pid file
; Note: the default prefix is /var
; Default Value: none
pid = /run/php/php7.2-fpm.pid

; Error log file
; If it's set to "syslog", log is sent to syslogd instead of being written
; into a local file.
; Note: the default prefix is /var
; Default Value: log/php-fpm.log
error_log = /var/log/php7.2-fpm.log

很明显:这一行 pid = /run/php/php7.2-fpm.pid 告诉我们哪个文件包含了 php-fpm 进程的进程 id。

我们还看到 /var/log/php7.2-fpm.log 是 php-fpm 存储日志的地方。

在这个文件中,像下面这样添加三个变量:

emergency_restart_threshold 10
emergency_restart_interval 1m
process_control_timeout 10s

前两个设置是警告性的,它们告诉 php-fpm 进程,如果 10 个子进程在一分钟内失败,主 php-fpm 进程应该重新启动自己。
这听起来可能不够稳健,但是 PHP 是一个短暂的进程,它会泄漏内存,所以在出现高故障时重新启动主进程可以解决很多问题。
第三个选项是 process_control_timeout,它告诉子进程在执行从父进程接收到的信号之前需要等待这么长的时间。这个设置是非常有用的。例如,当父进程发送终止信号时,子进程正在处理某些事情的时候。十秒的时间,他们会有一个更好的机会完成任务并且优雅地退出。

令人惊讶的是,这 不是 php-fpm 的核心配置!这是因为,为了 web 请求服务,php-fpm 创建了一个新的进程池,它将具有一个单独的配置。在我的例子中,进程池的名称是 www,我想编辑的文件是 /etc/php/7.2/fpm/pool.d/www.conf。
让我们来看看文件的内容:

; Start a new pool named 'www'.
; the variable $pool can be used in any directive and will be replaced by the
; pool name ('www' here)
[www]

; Per pool prefix
; It only applies on the following directives:
; - 'access.log'
; - 'slowlog'
; - 'listen' (unixsocket)
; - 'chroot'
; - 'chdir'
; - 'php_values'
; - 'php_admin_values'
; When not set, the global prefix (or /usr) applies instead.
; Note: This directive can also be relative to the global prefix.
; Default Value: none
;prefix = /path/to/pools/$pool

; Unix user/group of processes
; Note: The user is mandatory. If the group is not set, the default user's group
;       will be used.
user = www-data
group = www-data

快速浏览一下上面代码片段的末尾,您就会明白为什么服务器进程以 www-data 的形式运行了。如果您在设置网站时遇到文件权限问题,您可能要将目录的所有者或组更改为 www-data,从而允许 PHP 进程写入日志文件和上传文档等。
最后,我们到达了问题的根源,流程管理器 (pm) 设置。一般情况下,默认值是这样的:

pm = dynamic
pm.max_children = 5
pm.start_servers = 3
pm.min_spare_servers = 2
pm.max_spare_servers = 4
pm.max_requests = 200

那么,这里的 「dynamic(动态)」是什么意思呢?我认为官方文档最好地解释了这一点 (我的意思是,这应该已经是您正在编辑的文件的一部分,但是我在这里复制了它,以防它不是):

; Choose how the process manager will control the number of child processes.
; Possible Values:
;   static  - a fixed number (pm.max_children) of child processes;
;   dynamic - the number of child processes are set dynamically based on the
;             following directives. With this process management, there will be
;             always at least 1 children.
;             pm.max_children      - the maximum number of children that can
;                                    be alive at the same time.
;             pm.start_servers     - the number of children created on startup.
;             pm.min_spare_servers - the minimum number of children in 'idle'
;                                    state (waiting to process). If the number
;                                    of 'idle' processes is less than this
;                                    number then some children will be created.
;             pm.max_spare_servers - the maximum number of children in 'idle'
;                                    state (waiting to process). If the number
;                                    of 'idle' processes is greater than this
;                                    number then some children will be killed.
;  ondemand - no children are created at startup. Children will be forked when
;             new requests will connect. The following parameter are used:
;             pm.max_children           - the maximum number of children that
;                                         can be alive at the same time.
;             pm.process_idle_timeout   - The number of seconds after which
;                                         an idle process will be killed.
; Note: This value is mandatory.

由此可见,有三个可用值:

  • Static: 无论什么情况,都会保持一个固定的 PHP 进程数量。
  • Dynamic: 我们需要指定 php-fpm 在任何给定时间点会保持活动的最小以及最大进程数量。
  • ondemand: 按照需求创建和销毁进程。

那这些设置有什么影响呢?

简而言之,如果你有个小流量的网站,“dynamic” 设置在大多数时间内都是一种资源的浪费。假设你的 pm.min_spare_servers 设置成了 3,那会有三个 PHP 进程会被创建并保持运行,甚至是网站没有流量时。这种情况下,“ondemand” 就是个更好的选择,可以让系统决定何时启动新的进程。

另一方面,大流量 或者必须快速响应的网站将在这种情况下被惩罚。 最好避免创建新的 PHP 进程的额外开销,使其成为池的一部分并对其进行监控。

使用 pm = static 固定子进程的数量,使最大的系统资源用于服务请求而不是管理 PHP。假如你确定走这条路,注意它有其指导方针和陷阱。关于它的一篇相当密集但非常有用的文章是 这篇 。

写在最后

由于有关网络性能的文章可能会引发争论或使人们感到困惑,因此在结束本文之前,我觉得需要讲几句话。 性能调优既涉及系统知识,也涉及猜测和技巧。

即使您完全了解 php-fpm 的所有设置,也无法保证成功。 如果您不了解 php-fpm 的存在,那么您就不必浪费时间担心它。 继续做您已经在做的事情并继续下去。

同时,尽可能不让结果变得很戏剧性。 是的,您可以通过从头开始重新编译 PHP 并删除所有不需要的模块来获得更好的性能,但是这种方法在生产环境中不够明智。 优化某些内容的整个想法是查看您的需求是否与默认值不同(它们很少这样做!),并根据需要进行较小的更改。

在前段时间,我负责的推送中台出现了一个非常严重的 Bug,差点造成线上服务宕机。

我简单来描述一下当时的场景。

  1. 我们使用个推来来推送我们的 APP 通知,以及用户画像,用户分析等等。
  2. 每一次在 app 启动都会访问一个接口来判断用户信息,然后绑定标签,这个动作会触发 curl 函数访问个推的 API。
  3. 结果悲剧了,个推北京机房宕机。

我们当时的主要技术栈是 PHP,我们线上 curl 请求默认超时时间是 60 秒,这就导致我们的数据库链接近乎被打满。

由于 PHP-FPM 在脚本执行完成后才会释放进程,这也包括其中的资源,curl 请求的默认时间是 60 秒,这样也就意味着在 60 秒内,该进程也会被一直阻塞,直到超时为止,而 PDO 的生命周期是跟着 FPM 走的,除非调用手动 close() 这样才会关闭。

MySQL 连接断开也是有默认超时时间的,如果在超时时间范围内,客户端没有主动关闭,该连接会一直存在,但是 MySQL 的连接数是有限的,如果连接被打满那么新的连接将无法建,整个服务 GG 。

如果这个时候再有流量进来,那么就只有宕机一条死路了,于是立即找到运维修改默认配置,然后重启 php-fpm 释放 MYSQL 连接。

处理方式以及断路器

经过这一次的线上事故,总结出了一点,对于任何第三方提供的接口,都不要完全信赖!,都要去考虑如果对方出现问题,我这里应该如何操作!

经过复盘我们当时存在以下问题:

  1. 环境配置不透明,开发基本不关注线上配置。
  2. 没有考虑第三方失败该如何进行处理。
  3. 架构设计不完善,此类功能完全也可以异步化。

第一点是由于工作经验不足或者缺乏沟通导致的,很好解决,当这个事情解决之后,团队对线上的配置情况和运维一起进行了一次梳理。

第二点,我们选择了熔断器,首先说一下什么是熔断器。

第三点,对业务进行了梳理,将强依赖接口改成了消息队列进行异步处理。

de8a07841802fdcd985039e8afb1e58b

图 1.1 熔断器

如果你看过自己家中的电闸开关的话就会看到上图这个东西, 当电流正常情况下熔断器是关闭状态电流就可以正常通过,如果电流负载过大为了保护家用电器就会开启,然后关闭电源保证电器安全。

在我们的代码中熔断器其实就是一个设计模式,我们可以通过很简单的代码就可以实现熔断器。

图 1.2 无熔断器流程图

图 1.2 中是在没有熔断器我们在开发这类业务的时候,可能会对第三方服务进行 try catch 来判断异常,如果有异常在反复几次调用,多次尝试失败后,可能就直接返回报错,提供有损的服务,像上面我提到的那场事故中这个问题就会被放大,因为 60 秒后才会触发超时,也就意味着,如果重试三次那么该接口的耗时就会增加到 3 分钟,正常情况下是不会有业务接口能够接收三分钟这样耗时的。

当我们加上熔断器后,那么流程就会变成图1.3 那样。

图 1.3

引入断路器后,不再是 app server 直接访问第三方服务,而是将访问委托给 fuse 熔断器来进行,断路器判断当前的开闭状态来判断是否要发起请求。

  1. 如果是处于关闭状态,也就是第三方服务可以正常访问,那就直接访问第三方服务,
  2. 如果熔断器处于开启状态,那么正面目前第三方服务不可用,则直接调用备用函数,这也是微服务中一个重要的概念——服务降级

利用 Redis Sorted Set 实现简单的熔断器

由于我们使用的 PHP-FPM,这也就导致我们的 PHP 代码是无法常驻内存的,这个时候,就需要使用 Redis 来进行记数,在数据结构方面我们选择使用有序集合。

keyscoremember
circuit10pushMessage2User
circuit1pushMessage2List

score 用来存放失败的次数,member 用来存放失败的函数。

搭建骨架

我们先来按照上一小节的内容先将熔断器的挂架代码打起来。

<?php

class CircuitBreaker
{
    private $zSetKey = 'circuit';

    public function invoke(object $class, string $method, array $params, callable $fallback)
    {
        try {
            return $class->$method(...$params);

        } catch (Throwable $exception) {
            return $fallback(); //函数降级
        }
    }
}

CircuitBreaker 就是我们的熔断器类。

其中设计了一个 invoke 方法,这个方法就是上面讲到的代理请求,我们来通过 invoke 方法来让熔断器代理我们的请求。

  • $class: 我们首先要将类实例化后传入到 invoke中,这样的好处是进行依赖翻转,让熔断器可以通用。
  • $method: 是该类需要执行的方法。
  • $params: 是执行该方法时候需要传入的参数。
  • $callback:是一个闭包,当熔断器被打开,或者出现异常的时候,我们需要对服务进行降级。

下面是如何调用。

<?php

require 'CircuitBreaker.php';

class Pusher
{
    public function pushMessage2User($username, $message): bool
    {
        echo "使用 Pusher 进行发送 $username [$message]";
        return true;
    }
}

$c = new CircuitBreaker();
$messageData = ['maksim', 'hello world!'];
$pusher = new Pusher();
$c->invoke($pusher, 'pushMessage2User', $messageData, function () use ($messageData) {
    echo "使用站内信进行发送 $messageData[0] $messageData[1]";
    return true;
});

这个时候就相当于是一个简单的代理模式,由 CircuitBreaker 去执行 Pusher,并且捕获异常,如果出现异常就用降级函数。

在这里,我们模拟发送通知的业务场景,如果 Pusher 类发送失败了,那么我们就降级使用站内信的方式进行发送,当然也可以扔到一个队列里面,等业务恢复后在开启异步消息队列将消息推送出去。

为熔断器添加计数器

接下里,我们对 CircuitBreaker 进行修改,增加错误计数。

public function invoke(object $class, string $method, array $params, callable $fallback)
    {
        try {
            return $class->$method(...$params);
            
        } catch (Throwable $exception) {
            $member = get_class($class) . '_' . $method;
            $this->redis->zIncrBy($this->zSetKey, 1, $member);

            return $fallback(); //函数降级
        }
    }

这里的代码非常简单,当代码发生异常后,我们直接增加 redis 计数器,下面我们使用 Redis 的延迟队列来实现熔断器开关状态的切换。

首先,我们需要设置失败的阀值,当失败次数达到阀值后不再调用原函数。

public $failCount = 3;

private function isFail($member) {
    if ($this->redis->zScore($this->zSetKey, $member) >= $this->failCount ) {
        return true;
    }
    return false;
}
public function invoke(object $class, string $method, array $params, callable $fallback)
    {
        $member = get_class($class) . '_' . $method;
        try {
            if ($this->isFail($member)) {
                return $fallback();
            }
            return $class->$method(...$params);
            
        } catch (Throwable $exception) {
   
            $this->redis->zIncrBy($this->zSetKey, 1, $member);

            return $fallback(); //函数降级
        }
    }

实现熔断器的三种状态

现在我们就实现了熔断器的打开,接下来,我们需要实现熔断器的关闭,但是只有这两种状态熔断器是无法正常运作的,在日常生活中,当熔断器跳闸后,我们判断没有问题后会手动去尝试开启熔断器,同样在代码中,我们也需要一个这个样的过程,我们需要让熔断器再去尝试着去访问原来的方法,如果满足可关闭状态,那么把熔断器关掉。

而在这个试探的这个状态就是半开状态。

  1. 关:调用原函数,失败后计数器+1,并返回降级函数,到达一定阀值后进入“开”状态;
  2. 开:也就是直接返回降级函数,不调用原函数,但是我们可以给失败函数一些机会,于是出现半开状态,不过需要设定一定的定时器,一定时间后进入半开状态。
  3. 半开状态:处理半开妆容是熔断器的核心,调用函数时候有一定数量的调用时进入降级函数,有一定数量进入的是真实函数。如果函数已经稳定,那么就进入关状态。

状态值的定义如下:

define("BreakerStateOpen" , 1);         // 开
define("BreakerStateClose" , 2);         // 关,这是默认值
define("BreakerStateHalfOpen" , 3);     // 半开

private funcion getState($member) {
    $getSocre = $this->redis->zScore($this->zSetKey, $member);
    if ($getScore >= $this->failCOunt {
        return BreakerStateOpen;
    }
    if ($getScore < 0) {
        return BreakerStateHalfOpen; // 如果值是 -1 则代表是半开状态
    }
   return BreakerStateClose;
}

接下来,利用定时器来完成进入半开状态,在 PHP-FPM 模式下定时器我们可以通过 Redis 实现,实现的方式有两种:

第一种:利用一个带有过期时间的 key 来完成,一旦进入“开”状态后,插入一个 key,设置过期时间,然后利用过期通知的方式来完成。

第二种:延迟队列,依然使用 Sorted Set 来完成,key 设置为 circuit_open,插入时间戳,编写一个死循环程序,反复监听。

通过这两种方式,监听过期时间,到期后把 circuit 对应的 member 的 score 设置为 -1,即代表是半开状态。

在这里我们通过延迟队列的方式来进行完成,如果不了解延迟队列可以通过《PHP 利用 Redis Sorted Set 实现延迟队列》这片文章来了解其原理。

<?php
  
while(true) {
    $members = $redis->zRangeByScore("circuit_open", "-inf", time(), ['limit' => [0, 10]]);
    
    if (count($members) > 0) {
        foreach ($members as $member) {
              $redis->zAdd('circuit', -1, $member);
        }
        
        //删掉 open key
          $redis->zRem("circuit_open", ...$members);
    }
    
    usleep(500 * 1000);//休眠 500 毫秒
}

这样一来,我们就可以使用死循环程序来不断的设置半开状态了。

接下来我们在 invoke 方法来处理,当熔断器进入到开状态时那么就设置一个定时器。

public function invoke(object $class, string $method, array $params, callable $fallback)
    {
        $member = get_class($class) . '_' . $method;
        $currentState = $this->getState($member); //获取当前状态
        try {
            if ($currentState == BreakerStateOpen)  {
                return $fallback();
            } else if ($currentState == BreakerStateHalfOpen) {
                //如果是半开状态
                if (rand(0, 100) % 2 == 0) {
                    return $fallback();
                } else {
                    $result = $class->$method(...$params);
                    // 半开状态下仍然要加计数器,目的是为了让他归 0
                    $this->redis->zIncrBy($this->zSetKey, 1, $member); 
                    return $result;
                }
            }
            return $class->$method(...$params);
            
        } catch (Throwable $exception) {
            if ($currentState == BreakerStateClose) {
                 //切换到开
                   $socre = $this->redis->zIncrBy($this->zSetKey, 1, $member);
                if ($score == $this->failCount) {
                    //增加队列
                       $this->redis->zAdd($this->zSetKey_open, time() + $this->openTime, $member);
                }
            }
            
            if ($currentState == BreakerStateHalfOpen) {
                $this->redis->zIncrBy($this->zSetKey, $this->failCount, $member); // 将熔断器重新设置为开状态
                $this->redis->zAdd($this->zSetKey_open, time() + $this->openTime, $member); //再次开启定时器
            }   
            return $fallback(); //函数降级
        }
    }

在这段代码中,我们重点关注第 10 行到第 16 行,这里“随机”让其访问真实函数,如果函数正常,则将计数器归 0,让其进入到关闭状态。

在第 22 行开始增加了对状态的判断,如果当前处于关闭状态,那么久打开,如果现在是半开状态,那证明半开尝试失败了,需要重新进入打开状态。

这里的半开状态的处理很简单,其实正常的熔断器还需要包括采样分析,来判断当前是否满足开合条件,同时上面的“随机”也无法控制到底有多少流量进入到正常函数,所以我们还需要增加真正的概率算法,将通过的流量控制在一个我们可以接受的范围内。

延迟队列是我们在业务中经常遇到的场景,例如订单过期,定时发布,定时推送等等。

在 Zset里面,每一个成员都有一个所谓的分数:score,把当前时间作为分数,因为 Zset 是有序的,时间越小的排名越靠前。所以使用Zset作为延时队列就充分利用了score。

我们还是用订单例子来距离,如果订单超过 24 小时未付款,那么久取消订单,这是在电商业务中最常见的处理模式。

那么在 redis 中的数据个是就如图 1.1。

图 1.1

而程序执行流程如下图

  1. 用户下单后将订单信息插入到数据库中
  2. 将数据插入到 redis 中,结构如图 1.1
  3. 启动死循环,定时从 zset 中获取接近当前时间戳的值,然后逻辑判断是否更新数据。

接下来我们回顾一下 zset相关的命令:

ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]

注意排序是按照 score 从小到大排序的,这也是我们想要的,其中 min 和 max 可以是 -inf 和 +inf。

  • -inf 分数的最低值
  • +inf 分数的最高值
ZRANGEBYSCORE order:delay_queue -inf 5000  WINTHSCORES

这代表你不知道最低值是多少,取值范围是 <= 5000

ZRANGEBYSCORE order:delay_queue -inf current_timestamp WINTHSCORES limit 0 10

代表取 10 条最小等于当前时间的所有内容。

我们还是使用命令行脚本来模拟入队操作。

<?php

$orderId = $argv[1] ?? '';
if (empty($orderId)) {
    exit("请填写要压入队列的数据");
}
$redis = new Redis();

$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(0);
$queueName = 'order:delay_queue';

$redis->zAdd($queueName, time(), $orderId);

执行代码,向队列中插入四条数据。

# 执行入队操作
➜ php zset_client.php O20190101
➜ php zset_client.php O20190102
➜ php zset_client.php O20190103
➜ php zset_client.php O20190104

# redis 查看数据
127.0.0.1:6379> zrange order:delay_queue 0 -1 withscores
1) "O20190101"
2) "1667213048"
3) "O20190102"
4) "1667213049"
5) "O20190103"
6) "1667213050"
7) "O20190104"
8) "1667213051"

接下来我们来实现消费端的代码:

<?php


$redis = new Redis();

$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(0);
$queueName = 'order:delay_queue';

while (true) {

    $orderIds = $redis->zRangeByScore($queueName, "-inf", time(), ['limit' => [0, 10]]);
    $ids = implode(',', $orderIds);

    if (!empty($ids)) {
        //模拟执行 SQL 语句,4 代表已经取消
        $sql = "update `orders` set `status` = 4 where in ({$ids})";
        echo "$sql" . PHP_EOL;
        //执行其他业务逻辑,例如推送通知等操作
    }

    $redis->zRem($queueName, ...$orderIds);
    usleep(500 * 1000);
}

这里需要注意我们在更新数据库的时候不要一条一条的更新,同时这些代码只是用来演示基本原理,还有好多需要处理的内容。

  1. 处理程序的退出,监听信号
  2. 尽量使用异步方法处理业务逻辑,可以尝试使用 swoole,workman 带有多进程管理的框架或者已经封装好的类库。

Redis 本身是用来做缓存的,但是其中有一些特性是我们可以用来完成消息队列的功能,比如如果能够容忍数据的丢失,并且持久化方面要求不是很高的场景下完成任务的分布式处理。

队列的基本实现

Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边),而我们取出的时候可以从头部或者尾部来获取数据。

我们可以先头lpush 插入数据,然后再rpop来取数据,来模拟队列的先进先出。

$ redis-cli -h 127.0.0.1
127.0.0.1:6379> auth 123456
OK
127.0.0.1:6379> select 0
OK
127.0.0.1:6379> lpush order:queue SN20190801
(integer) 1
127.0.0.1:6379> lpush order:queue SN20190802
(integer) 2
127.0.0.1:6379> lpush order:queue SN20190803
(integer) 3
127.0.0.1:6379> rpop order:queue
"SN20190801"
127.0.0.1:6379> rpop order:queue
"SN20190802"
127.0.0.1:6379> rpop order:queue
"SN20190803"
127.0.0.1:6379> rpop order:queue
(nil)

如果想要批量处理,我们可以使用 LRANGE 命令:

127.0.0.1:6379> lpush order:queue SN20190801
(integer) 1
127.0.0.1:6379> lpush order:queue SN20190802
(integer) 2
127.0.0.1:6379> lpush order:queue SN20190803
(integer) 3
127.0.0.1:6379> lrange order:queue 0 10
1) "SN20190803"
2) "SN20190802"
3) "SN20190801"

基于这三个命令,我们就可以实现一个异步的消息队列了,我们首先来模拟消息队列的生产,最终我们的消费模型就如图 1.1。

未命名绘图.drawio (1)

图 1.1 消费模型

<?php

// 接收参数
$orderId = $argv[1] ?? '';

if (empty($orderId)) {
    exit("请填写要压入队列的数据");
}

// 新建 redis 连接
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(1);

try {
    echo $redis->lPush('order:queue', $orderId) . PHP_EOL;
} catch (Exception $exception) {
    //捕获异常,出现异常情况应该进行处理重传或者报警等等
    echo $exception->getMessage() . PHP_EOL;
}
// 释放资源
$redis->close();

这段代码非常简单,通过 $argv 来获取参数,也就是 order_id,通知消费端,该 id 有数据变更,要进行一些异步操作。

处理端我们使用 brPop 来阻塞获进行消费就可以了。

<?php

$redis = new Redis();

$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(1);

while (true) {
//阻塞获取,10 秒后没有获取到证明超时
    $result = $redis->brPop("order:queue", 10);
    //$result 包含两个返回值
    //$result[0] redis key 的名称 order:queue
    //$result[1] redis value 也就是我们压入队列的数据
    if ($result && $result[0]) {
        //业务处理逻辑
        echo "接收到订单号为 [" . $result[1] . "] 的订单,开始执行业务逻辑";
        usleep(500 * 1000); // 休眠500 毫秒,让出 CPU 时间片
    } else {
        continue;
    }
}

首先是使用 while 死循环来模拟一直在消费, brPop 设置一个十秒的阻塞,如果阻塞过了十秒就放弃获取,执行下一次逻辑,在处理完成后,我们要 usleep 500 毫秒,让出 CPU 时间片避免 CPU 资源被该进程一直占用。

这样一来一个简单的 redis 异步消息队列就实现好了,到这一步最好不要直接在生产环境这么写。

我们还需要处理 kill 信号来实现优雅退出,因为 kill 信号是直接杀死进程,这个时候任务可能正在处于执行状态,这种处理方式是不可重入的,因为你无法确定某个操作环节会出错。

利用bRPopLPush命令补救数据丢失

如果你不能容忍消息的丢失或者持久化,我们可以使用比较成熟的 MQ,例如 RocketMQ,RabbitMQ 或者 kafka,这些 MQ 都可以比较好的解决这个问题。

bRPopLPush 是 redis 提供给我们的一个命令,改命令从列表中取出最后一个元素,并插入到另外一个列表的头部; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

我们可以利用这个提醒替换 BRPOP ,在获取队列内容的时候将其丢到一个备份 key 中,当消息执行结束后,再将备份的数据从备份 key 中删除。

<?php

$redis = new Redis();

$redis->connect('127.0.0.1', 6379);
$redis->auth('123456');
$redis->select(1);

$orderQueueKey = 'order:queue';
$backupQueueKey = 'order:queue:backup';

// 程序启动优先处理 backup key 中的数据
while (true) {
    $result = $redis->brPop($backupQueueKey, 1);
    if ($result && $result[0]) {
        //业务处理逻辑
        echo "接收到订单号为 [" . $result[1] . "] 的订单,开始执行业务逻辑" . PHP_EOL;
        sleep(3); // 模拟耗时操作
        echo "处理完成" . PHP_EOL;
        usleep(500 * 1000); // 休眠500 毫秒,让出 CPU 时间片
    } else {
        break;
    }
}

while (true) {

//阻塞获取,10 秒后没有获取到证明超时
    $result = $redis->bRPopLPush("order:queue", "order:queue:backup", 10);

    if ($result) {
        //业务处理逻辑
        echo "接收到订单号为 [" . $result . "] 的订单,开始执行业务逻辑" . PHP_EOL;
        sleep(3); // 模拟耗时操作
        echo "处理完成" . PHP_EOL;
        //处理完成后取出key
        $redis->lPop($backupQueueKey);
        
        usleep(500 * 1000); // 休眠500 毫秒,让出 CPU 时间片
    } else {
        continue;
    }
}

我们首先在真正的处理逻辑前增加了对备份key 的处理,让程序启动后先处理之前没有梳理完成的数据。

在最下方处理队列的地方也做了改动,在处理完成后取出备份 key 中的数据。