2021年3月

你好,我是一笑。

再上一篇文章中,我们使用 fork 的形式完成了高性能网络服务的开发,但是由于 fork 的形式开销比较大,我们需要使用另外一种方式来进行实现,也就是这篇文章中要讲解的 select。

相对来说 select 要比 fork 高效很多,而我们常说的 select 其实就是Linux 为我们提供的异步事件 I/O,当发生事件的时候系统内核会给我们发送通知,与多进程多线程相比,异步 IO最大的优势就是开销小,因为无论是进程、线程在上下文切换的时候都会有一定的开销。同时使用异步 IO,我们就不必再去维护进程或者线程的状态 。

select 可以看做是半自动,处理的并不是特别好,不是完全按照事件进行处理,在应用层,我们还需要使用循环来进行遍历文件句柄的变化。

在 PHP 中为我们提供了 socket_select 函数,我们可以通过这个函数来监听文件句柄的变化。

<?php
//创建服务端的socket套接流,net协议为IPv4,protocol协议为TCP
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);

if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
    echo "Failed to set socket options!";
    exit();
}
socket_set_nonblock($socket);

// 绑定接收的套接流主机和端口,与客户端相对应
if (!socket_bind($socket, '0.0.0.0', 8888)) {
    echo 'server bind fail:' . socket_strerror(socket_last_error());
    exit();
}

// 监听套接流
if (!socket_listen($socket, 4)) {
    echo 'server listen fail:' . socket_strerror(socket_last_error());
    exit();
}

$all_sock[] = $socket;
//让服务器无限获取客户端传过来的信息
for (; ;) {
    $read = $write = $except = [];
    $read = $all_sock;

    $changed = socket_select($read, $write, $except, null);

    if (!$changed) {
        print "[error]socket_select() failed
        (" . socket_strerror(socket_last_error()) . ")\n";
    }

    if ($changed < 1) {
        continue;
    }

    foreach ($read as $key => $rsock) {
        // 服务端连接
        if ($rsock === $socket) {

            $client = socket_accept($socket);
            if (!$client) {
                echo "socket_accept error:!" . socket_strerror(socket_last_error());
                exit();
            }
            $all_sock[] = $client;
        } else {
          //客户端连接
            //读socket_read的作用就是读出socket_accept()的资源并把它转化为字符串
                  $string = socket_read($rsock, 1024);
                echo 'server receive is :' . $string . PHP_EOL;//PHP_EOL为php的换行预定义常量
                $return_client = 'server receive is : ' . $string . PHP_EOL;
                socket_write($rsock, $return_client, strlen($return_client));
            }
        }
    }
}
socket_close($socket);

select 有两个不足:

  • select 会受到单个进程最大打开描述符的限制。它只能监听的最大文件描述符个数由__FD_SIZE 决定,它的默认值是 1024;
  • select 返回后,需要遍历文件描述符集才能找到哪些文件描述符就绪了,文件描述符集在内核空间和用户空间频繁拷贝提升了系统的开销。

在下一章节,我们将使用 epoll 来解决这个问题。

你好,我是一笑。

在前面的两章中,我们已经实现了守护进程和基础的 TCP Server,在这个章节我们将实现高性能的网络服务,其实实现高性能网络服务有很多方法,在接下来的几个章节中,我将一一列举。

在我们上一节的代码中,如果你比较细心,就会发现当多个客户端连接服务其的时候是可以连接上,但是只有只有第一个连接到 tcp 的服务才可以进行通讯,其他的客户端必须等待第一个连接关闭后才可以发送请求。

真正的网络服务肯定不能这样实现,今天我们就要解决这个问题,怎么样才能够让 TCP 的服务器能够接收成百上千,甚至是上万的请求。

对于上一章的 TCP 服务器只有一个用户能够通讯,通过 fork 的改造就可以变成成百上千的用户来来连接服务器,每个连接过来我们都 fork 一个子进程来并行处理,就可以实现多人并发处理。

在对服务端进行修改之前,我们先对客户端进行一定的修改,我们先让客户端支持 Ctrl+C 退出程序,并且通知服务端要进行关闭 Scoket 了。

实现起来非常简单,我们只需要对 Ctrl+C 信号进行监听就可以了。

pcntl_signal(SIGINT, function () use ($socket) {
    echo "接收到退出信号,关闭 socket 并且退出进程" . PHP_EOL;
    $message = '.exit';
    socket_write($socket, $message, strlen($message));
    socket_close($socket);
    exit();
});

我们使用 pcntl_signal 来监听 SIGINT 信号,如果接收到信号就向服务端发送 .exit 告诉服务端,客户端要退出了,并且关闭 socket 连接,这么做的好处是,当我们客户端退出后可以主动通知服务端,服务端可以自由的做一些其他处理。

SIGINT:程序终止(interrupt)信号, 在用户键入INTR字符(通常是Ctrl-C)时发出,用于通知前台进程组终止进程。

同时,我们在循环发送的数据中增加 pcntl_signal_dispatch()函数检测是否有新的信号等待 dispatching。

while (true) {
      // 检测是否有新的信号等待 dispatching
    pcntl_signal_dispatch();
    // 向服务端写入字符串信息
    $ret = socket_write($socket, $message, strlen($message));
    if (!$ret) {
        echo 'fail to write' . socket_strerror(socket_last_error());
        exit();
    }
    // 读取服务端返回来的套接流信息
    $callback = socket_read($socket, 1024);
    echo 'server return message is:' . PHP_EOL . $callback;
    sleep(1);
}

接下来,我们来实现今天的主题,使用 fork 来实现高性能的网络服务器。它的基本原理非常简单,首先主进程进行监听连接,每收到一个连接就创建一个子进程进行数据包的收发。

    while ($string = socket_read($accept_resource, 1024)) {
        echo 'server receive is :' . $string . PHP_EOL;//PHP_EOL为php的换行预定义常量
        $return_client = 'server receive is : ' . $string . PHP_EOL;
        //Socket_write的作用是向socket_create的套接流写入信息,或者向socket_accept的套接流写入信息
        socket_write($accept_resource, $return_client, strlen($return_client));
    }

在上一章,我们使用 while 死循环来接收客户端传来的数据,主进程就会被阻塞在这里,其他的连接虽然可以连接但是无法传输数据,我们只需要将这段代码放到子进程中执行就可以了。

<?php
cli_set_process_title("tcp-master-process");
pcntl_async_signals(true);

//创建服务端的socket套接流,net协议为IPv4,protocol协议为TCP
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);

if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
    echo "Failed to set socket options!";
    exit();
}

// 绑定接收的套接流主机和端口,与客户端相对应
if (!socket_bind($socket, '0.0.0.0', 8888)) {
    echo 'server bind fail:' . socket_strerror(socket_last_error());
    exit();
}

// 监听套接流
if (!socket_listen($socket, 4)) {
    echo 'server listen fail:' . socket_strerror(socket_last_error());
    exit();
}

// 处理子进程退出
pcntl_signal(SIGCHLD, function ($id) {

    fprintf(STDOUT, "我接收到一个信号,编号为:%d \n", $id);

    $pid = pcntl_waitpid(-1, $status, WNOHANG);

    if ($pid > 0) {
        fprintf(STDOUT, "PID=%d 子进程退出了", $pid);
    }
});

//让服务器无限获取客户端传过来的信息
for (; ;) {
    // socket_accept的作用就是接受socket_bind()所绑定的主机发过来的套接流
    $accept_resource = socket_accept($socket);
    if (!$accept_resource) {
        echo "socket_accept error:!" . socket_strerror(socket_last_error());
        exit();
    }

    $pid = pcntl_fork();
    if ($pid == 0) {
        cli_set_process_title("tcp-worker-process");
        //读socket_read的作用就是读出socket_accept()的资源并把它转化为字符串

        while ($string = socket_read($accept_resource, 1024)) {
            if ($string == '.exit') {
                break;
            }
            echo 'server receive is :' . $string . PHP_EOL;//PHP_EOL为php的换行预定义常量
            $return_client = 'server receive is : ' . $string . PHP_EOL;
            //Socket_write的作用是向socket_create的套接流写入信息,或者向socket_accept的套接流写入信息
            socket_write($accept_resource, $return_client, strlen($return_client));
        }

        //关闭客户端连接
        socket_close($accept_resource);
        echo "没有数据了,关闭连接" . PHP_EOL;
        exit();
    } else {
        //关闭连接
        pcntl_signal_dispatch();
    }
}

if ($pid != 0) {
    //关闭连接
    socket_close($socket);
}

我们在接收到新连接后 fork 了子进程,在子进程中处理 tcp 的收发包,同时设置了进程名称方便我们在命令行中查看。

 cli_set_process_title("tcp-worker-process");

这样一来,我们就实现了一个比较高性能的服务了,通过 fork 的方式虽然可以实现高性能网络服务,但是同样也带来了很多问题。首先是资源被长期占用,只要客户端与服务端一直建立长连接,那么子进程一直被占用,进程在Linux 中是非常重要的资源,占用一个就少一个,他是有一定的限量的,几千几万被占用对 Linux 系统来说是灾难性的。

分配子进程花费时间长,因为创建子进程的成本是非常高的,如果很多连接突然过来,你会发现创建子进程的时间会很长,性能非常的满,这就是 fork 带来的问题,在下一节我们将利用 Select 异步I/O 解决这个问题。

同时在使用 fork 子进程这种方式的时候,我们还需要对子进程的状态进行维护,如果子进程先退出,而父进程没有处理的话就会变成僵尸进程,最终耗尽我们的进程号,导致无法启动新的进程,这是非常恐怖的事情。

处理僵尸进程的方法也很简单,我们只需要在父进程中接受子进程的退出信号即可。

// 处理子进程退出
pcntl_signal(SIGCHLD, function ($id) {
    fprintf(STDOUT, "我接收到一个信号,编号为:%d \n", $id);
    $pid = pcntl_waitpid(-1, $status, WNOHANG);
    if ($pid > 0) {
        fprintf(STDOUT, "PID=%d 子进程退出了", $pid);
    }
});

我们启动服务后,我们分别启动两个client,将其中的代码进行稍微的更改,第二个的编号设置为#2,就会发现都可以进行通信了。

server receive is :hello echo server #2 !
server receive is :hello echo server #1 !
server receive is :hello echo server #2 !
server receive is :hello echo server #1 !

同时我们使用 ps 命令来查看。

$ ps ajx | grep process
501 53843 98750   0  1:46下午 ttys004    0:00.03 tcp master process
501 53854 53843   0  1:46下午 ttys004    0:00.00 tcp tcp process
501 53907 53843   0  1:46下午 ttys004    0:00.00 tcp tcp process

我们一共是启动了两个 tcp client,在后台也就 fork 出了两个子进程,我们可以在多启动一个 #3 的 client 来试验一下。

你好,我是一笑。

在本节我们来介绍一些 socket 基础编程,其实 socket 基础编程非常的简单,你可以把它理解为文件描述符,每一个 socket 连接都是一个文件描述符,与我们打开文件、关闭文件是很类似的,只不过在其中增加了一些跟网络相关的 API 而已。

但是对于底层协议和基本原理还是很复杂的,我会放在后面的章节中进行讲解,在前面还是以使用为主,对于 socket 编程其实并不是很复杂,只需要通过几个 API 就可以实现了。

如何实现一个 TCP Server

实现一个 TCP Server 的基本步骤如下:

  1. 创建 scoket ,指定使用 TCP 协议,对应的是函数为 socket_create;
  2. 将 scoket 与地址和端口绑定,对应的函数为 socket_bind, 相当于打开了一个文件句柄,这个文件要与你的网络地址和端口进行绑定,用户在发送网络连接的时候,在系统的底层就会把事件抛给 socket。
  3. 侦听端口,对应的函数 socket_listen
  4. 接收创建新的socket 对应函数 socket_accept
  5. 使用 read 接收数据 对应函数 socket_read
  6. 使用 send 发送数据对应函数 socket_end
  7. 使用 close 关闭连接对应函数 scoket_close;

我们通过这七个步骤就可以实现一个 TCP Server。

在我们实现 TCP Server 的时候还可以使用 socket_set_option 给 TCP 设置一些选项,其中最常见的选项如下:

  • SO_REUSEADDR 端口处于 WAIT_TIME 仍然可以启动,通过这个选项我们可以对地址进行重用,当我们的网络服务器开启的时候,实际上已经打开了很多 TCP 连接,当 TCP 连接关闭的时候会发起 WAIT_TIME 等待 FINISH ,只有 FINISH 过来的时候才会关闭。当我们启用这个参数后,不用等待 FINISH 状态,直接可以复用地址和端口,对于服务端而言特别有效,服务端关闭了连接,最长可能会等待两分钟。
  • SO_RECVBUF,可以通过这个选项来调整Buffer 的大小,当数据来的时候没能及时处理可以在系统中将数据缓冲,一般设置 4M~8M。
  • SO_SNDBUF,发送 Buffer,如果在即时通信的时候设置较大的值,会产生一定的延迟。所以需要按照实际应用来设置大小。

下面我们编写一个类似于 echo 的服务,客户端与服务单保持长连接, 客户端不断向的服务端发送 hello echo server #1 !,然后服务端返回该值。

<?php
//创建服务端的socket套接流,net协议为IPv4,protocol协议为TCP
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);

if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
    echo "Failed to set socket options!";
    exit();
}

// 绑定接收的套接流主机和端口,与客户端相对应
if (!socket_bind($socket, '0.0.0.0', 8888)) {
    echo 'server bind fail:' . socket_strerror(socket_last_error());
    exit();
}

// 监听套接流
if (!socket_listen($socket, 4)) {
    echo 'server listen fail:' . socket_strerror(socket_last_error());
    exit();
}
//让服务器无限获取客户端传过来的信息
for (; ;) {
    // socket_accept的作用就是接受socket_bind()所绑定的主机发过来的套接流
    $accept_resource = socket_accept($socket);
    if (!$accept_resource) {
        echo "socket_accept error:!" . socket_strerror(socket_last_error());
        exit();
    }


    //读socket_read的作用就是读出socket_accept()的资源并把它转化为字符串
    while ($string = socket_read($accept_resource, 1024)) {
        echo 'server receive is :' . $string . PHP_EOL;//PHP_EOL为php的换行预定义常量
        $return_client = 'server receive is : ' . $string . PHP_EOL;
        //Socket_write的作用是向socket_create的套接流写入信息,或者向socket_accept的套接流写入信息
        socket_write($accept_resource, $return_client, strlen($return_client));
    }
    socket_close($accept_resource);

}

socket_close($socket);

如何实现 TCP 客户端

相对比服务端,客户端的实现要简单很多:

  1. 创建 scoket,指定使用 TCP
  2. 使用 connect 与服务端进行连接
  3. 收发数据socket_read, socket_write,
  4. close 关闭连接
<?php
//创建一个socket套接流
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);

//接收套接流的最大超时时间1秒,后面是微秒单位超时时间,设置为零,表示不管它
socket_set_option($socket, SOL_SOCKET, SO_RCVTIMEO, array("sec" => 1, "usec" => 0));

//发送套接流的最大超时时间为6秒
socket_set_option($socket, SOL_SOCKET, SO_SNDTIMEO, array("sec" => 6, "usec" => 0));

//连接服务端的套接流,这一步就是使客户端与服务器端的套接流建立联系
if (!socket_connect($socket, '127.0.0.1', 8888)) {
    echo 'connect fail massage:' . socket_strerror(socket_last_error());
    exit();
}

$message = 'hello echo server #1 !';
//转为GBK编码,处理乱码问题,这要看你的编码情况而定,每个人的编码都不同
$message = mb_convert_encoding($message, 'GBK', 'UTF-8');

while (true) {
    //向服务端写入字符串信息
    $ret = socket_write($socket, $message, strlen($message));
    if (!$ret) {
        echo 'fail to write' . socket_strerror(socket_last_error());
        exit();
    }
    //读取服务端返回来的套接流信息
    $callback = socket_read($socket, 1024);
    echo 'server return message is:' . PHP_EOL . $callback;
    sleep(1);
}

到目前为止,服务端和客户端的代码就写好了,很简单,你可以自己编写一下。

你好,我是一笑。

目前市面上有很多开源库其中包括 Swoole、Workman、ReactPHP、AmpPHP 都可以非常方便的帮助我们去实现 Socket 或者 WebSocket 编程,但是这些都是开箱即用的工具,在这个系列的文章中,我将从 0 到 1 实现一个网络在线聊天室。

PHP Cli 后台服务守护进程

Socket 编程与 Web 或者 API 编程最大的差异就是我们的程序是要一直跑在我们的服务器后端执行,在我们手动执行一些 PHP 脚本的时候,如果是长时间的耗时操作我们可能会用到 nohup 命令将我们的脚本脱离终端,避免终端异常关闭后导致脚本执行退出。因为在我们正常的情况下如果你是在前端执行你执行的程序是属于当前终端的,当你终端结束时程序会收到 SIGSUP 的信号,收到信号后默认行为就是将程序直接杀死掉。

所以一般服务端的程序一般都是在后端去执行,所谓的后端就是我们的应用程序没有界面,也没有任何输出的信息,只能通过日志输出到一个文件里,而不是在标准的输入、输出和标准错误中看到任何信息。

最简单实现守护进程的方式就是使用 fork 函数来派生子进程,具体的流程如下:

1. 设置文件创建屏蔽字

文件创建屏蔽字是指屏蔽掉文件创建时的对应位(umask() 控制系统文件和目录默认权限)。由于使用fork系统调用新建的子进程继承了父进程的文件创建掩码,这就给该子进程使用文件带来了诸多的不便。因此,把文件创建掩码设置为0,可以大大增强该守护进程的灵活性。

2. fork 子进程,父进程退出,子进程成为孤儿进程被 init 进程接管

fork 子进程的目的是为了执行我们真正的业务逻辑,而主进程在 fork 子进程后就退出,这样一来 子进程就会变成孤儿进程,孤儿进程会自动被系统的 init 进程接管,这个时候其实也就完成了后台运行,而后面的操作是为了让我们的程序更加健壮。

3. 调用 setsid 建立新的进程会话

虽然你的 主进程退出了,但是你的父子关系其实还是存在的,它有一个sessionid,当你没有做改变的时候,他的 sessionid 还会存在,标记着它在哪个 session 中,既然父进程已经退出了,我们被 init 进程接管了,那么就需要重新设置一个进程会话。

4. 将当前工作目录切换到根目录

在 linux 下是可以 mount 很多磁盘的,你可能是在系统磁盘下,也可能是在新 mount 的数据磁盘上,实际上我们应该与这个磁盘脱离关系,所以要切换一下工作目录。

5. 关闭标准输入、标准输出、标准错误将其重定向到 /dev/null

当Linux启动一个进程时,会自动打开三个的端口:标准输入(Standard Input)、标准输出(Standard Output)和标准错误(Standard Error)。进程通常会通过这三个端口进行输入和输出。

后台程序是应该没有标准输入和标准输出以及标准错误的,所以要将其重定向到 /dev/null中。

进程从创建它的父进程那里继承了打开的文件描述符。如不关闭,将会浪费系统资源,造成进程所在的文件系统无法卸下以及引起无法预料的错误。

代码实现

<?php

interface Process
{
    public function run(): void;
}

class WriteFileProcess implements Process
{
    public function run(): void
    {
        while (true) {
            //在这里写你的业务代码,比如每隔1秒钟,往当前目录下的test.txt文件里面添加当前时间
            file_put_contents(dirname(__FILE__) . '/test.txt', date('Y-m-d H:i:s') . PHP_EOL, FILE_APPEND);
            sleep(1);
        }
    }
}

class Daemon
{
    private $pidFile;
    protected $process;

    function __construct(Process $process)
    {
        $this->pidFile = dirname(__FILE__) . '/daemontest.pid';
        $this->process = $process;
    }

    private function startDeamon()
    {
        if (file_exists($this->pidFile)) {
            echo "The file $this->pidFile exists.\n";
            exit();
        }

        $pid = pcntl_fork();
        if ($pid == -1) {
            //这里是创建进程失败
            die('could not fork');
        } else if ($pid) {
            //这里是父进程
            echo 'start ok';
            exit($pid);
        } else {
            umask(0);

            //这里是子进程
            file_put_contents($this->pidFile, getmypid());

            // 建立新的会话
            posix_setsid();

            //设置工作目录
            chdir('/');

            //重定向标准输出,标准输入,标准错误到 /dev/null
            fclose(STDIN);//0
            fclose(STDOUT);//1
            fclose(STDERR);//2

            //当关掉以上标准输出,标准输入,标准错误之后,如果后面要对文件操作(比如打开一个文件,写入,创建)它返回的文件描述符从0开始,这样可能造成未知异常
            //为了避免问题,我们使用输出从定向到 /dev/null 空设备文件解决这个问题,重新设置0,1,2 文件描述符用来代替标准输入,标准输出,标准错误,往 /dev/null 写入数据会被丢弃,这样就不会向终端输出数据了。

            fopen("/dev/null", 'a');
            fopen("/dev/null", 'a');
            fopen("/dev/null", 'a');

            return getmypid();
        }
    }

    private function start()
    {
        $this->startDeamon();
        $this->process->run();
    }

    private function stop()
    {
        if (file_exists($this->pidFile)) {
            $pid = file_get_contents($this->pidFile);
            posix_kill($pid, 9);
            unlink($this->pidFile);
        }
    }


    public function run($argv)
    {
        if ($argv[1] == 'start') {
            $this->start();
        } else if ($argv[1] == 'stop') {
            $this->stop();
        } else if ($argv[1] == 'restart') {
            $this->stop();
            $this->start();
        } else {
            echo 'param error';
        }
    }

}

$daemon = new Daemon(new WriteFileProcess());
$daemon->run($argv);

首先,我们定义了一个 process 的接口,这个接,而实现这个接口的 WriteFileProcess 就是我们需要执行的程序,在这段代码中,我们不断的向一个文件中写入当前时间,让程序一直保持运行状态,为了避免CPU 被这个程序沾满,我们写入的时间间隔为 1 秒。

Daemon 类中 startDeamon 方法就是我们实现守护进程的过程,在 Daemon 中还实现了另外两个方法 runstartstop

run 可以看做我们的程序入口,他解析传递的命令,当为 start 的时候开始记录 pid,然后执行守护进程方法,执行我们要执行的业务逻辑。

# 执行程序
php Daemon.php start
# 关闭程序
php Daemon.php stop
# 重启程序
php Dameon.php restart

你可以执行一下看看执行效果。

参考文档

  1. [PHP 的标准输入与输出-php教程-PHP中文网](