PHP 原生 Socket 网络编程:实现 Reactor 模型开发 Echo 服务

Socket 编程721 字

更新日志:

  • 2023-3-20 增加文字描述

你好,我是一笑,在前面的章节中,我们实现了基于 select socket 开发,在这里,我们将利用 select 实现 Reactor模型。

Reactor 翻译过来的意思是「反应堆」,可能大家会联想到物理学里的核反应堆,实际上并不是的这个意思。

这里的反应指的是「对事件反应」,也就是来了一个事件,Reactor 就有相对应的反应/响应。

事实上,Reactor 模式也叫 Dispatcher 模式,我觉得这个名字更贴合该模式的含义,即 I/O 多路复用监听事件,收到事件后,根据事件类型分配(Dispatch)给某个进程 / 线程。

具体流程如下:

  1. 监听事件
  2. 捕获事件
  3. 判断事件类型

    • write Handler 写事件
    • read handler 读时间
    • accept handler

EventLoop,它是整个事件驱动的核心,它管理者事件表,不断循环处理就绪的文件事件,在这里我们可以理解为就是一个死循环。


function LoopEventRun() {
    $eventLoop->stop = 0;
    while (!$eventLoop->stop) {
        $eventLoop->processEvents(AE_ALL_EVENTS);
    }
}

AE_ALL_EVENTS 代表着所有事件。在这里我们设计三个常量。

  • const AE_FILE_EVENTS = 1;: 文件事件,其实Socket 也是一种文件事件,socket_create返回的是文件描述符。
  • const AE_TIME_EVENTS = 2;: 时间事件。
  • const AE_ALL_EVENTS = (AE_FILE_EVENTS | AE_TIME_EVENTS); 代表着所有事件

我们首先来实现 tcp server。

<?php

class TcpSocket
{
    protected string $address;
    protected int $port;

    public function __construct(string $address, int $port)
    {
        $this->address = $address;
        $this->port = $port;
    }

    /**
     * @return Socket
     * @author 国柱<[email protected]>
     */
    public function getTcpServer(): Socket

    {
        //创建服务端的socket套接流,net协议为IPv4,protocol协议为TCP
        $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
        if (!$socket) {
            echo "Failed to create socket";
            exit();
        }

        if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
            echo "Failed to set socket options!";
            exit();
        }
        socket_set_nonblock($socket);

        // 绑定接收的套接流主机和端口,与客户端相对应
        if (!socket_bind($socket, $this->address, $this->port)) {
            echo 'server bind fail:' . socket_strerror(socket_last_error());
            exit();
        }
        // 监听套接流
        if (!socket_listen($socket, 4)) {
            echo 'server listen fail:' . socket_strerror(socket_last_error());
            exit();
        }
        echo "create socket success: $this->address:$this->port" . PHP_EOL;
        return $socket;
    }

}

这段代码非常简单,就是标准的创建 socket。

<?php

const AE_FILE_EVENTS = 1;
const AE_TIME_EVENTS = 2;
const AE_ALL_EVENTS = (AE_FILE_EVENTS | AE_TIME_EVENTS);

const AE_DONT_WAIT = 4;

const AE_READABLE = 1;
const AE_WRITABLE = 2;
const AE_EXCEPTION = 4;

class EventLoop
{

    public int $stop = 0;

    // 存储所有事件
    public array $eventContainer = [];

    public function addEvent(SocketEvent $event): void
    {
        $this->eventContainer[] = $event;
    }

    /**
     * @param $flags
     * @return int
     * @author 国柱<[email protected]>
     */
    public function processEvents($flags): int
    {
        $read = $write = $except = [];
        $number = 0;
        $processed = 0;
        if ($flags & AE_FILE_EVENTS) {
            // 遍历链表
            foreach ($this->eventContainer as $event) {
                //如果是读事件就将 fd 放入到 rfds 中
                if ($event->mask & AE_READABLE) {
                    $read[] = $event->fd;
                }
                // 如果是写时间爱你就放进 $write 中
                if ($event->mask & AE_WRITABLE) {
                    $write[] = $event->fd;
                }
                // 如果是屏障事件就放进  $except 中
                if ($event->mask & AE_EXCEPTION) {
                    $except[] = $event->fd;
                }
                $number++;
            }
        }

        if ($number || (($flags & AE_TIME_EVENTS) && !($flags & AE_DONT_WAIT))) {
            $retrieval = socket_select($read, $write, $except, null);
            if ($retrieval > 0) {
                foreach ($this->eventContainer as $event) {
                    $socketFd = $event->fd;
                    if (($event->mask & AE_READABLE && in_array($socketFd, $read)) || ($event->mask & AE_WRITABLE && in_array($socketFd, $write)) || ($event->mask & AE_EXCEPTION && in_array($socketFd, $except))) {
                        $mask = 0;

                        if ($event->mask & AE_READABLE && in_array($socketFd, $read)) {
                            $mask |= AE_READABLE;
                        }
                        if ($event->mask & AE_WRITABLE && in_array($socketFd, $write)) {
                            $mask |= AE_WRITABLE;
                        }
                        if ($event->mask & AE_EXCEPTION && in_array($socketFd, $except)) {
                            $mask |= AE_EXCEPTION;
                        }
                        // 执行闭包
                        call_user_func($event->fileProc, $this, $socketFd, $event->clientData, $mask);
                        $processed++;
                        // 清理 read
                        if ($event->mask & AE_READABLE) {
                            $index = array_search($socketFd, $read);
                            unset($read[$index]);
                        }
                        if ($event->mask & AE_WRITABLE) {
                            $index = array_search($socketFd, $write);
                            unset($write[$index]);
                        }
                        if ($event->mask & AE_EXCEPTION) {
                            $index = array_search($socketFd, $except);
                            unset($except[$index]);
                        }
                    }
                }
            }
            return $processed;
        }
        return $processed;
    }

    public function run()
    {
        while (true) {
            if (!$this->stop) {
                $this->processEvents(AE_ALL_EVENTS);
            }
        }
    }

    public
    function createSocketEvent($fd, $mask, $fileProc, $finalizerProc, Client $clientData = null): SocketEvent
    {
        echo "创建 event mask : [$mask]" . PHP_EOL;
        $event = new SocketEvent(
            $fd,
            $mask,
            $fileProc,
            $finalizerProc,
            $clientData);
        $this->addEvent($event);
        return $event;
    }


    public function deleteSocketEventLoop($fd, $mask): void
    {
        foreach ($this->eventContainer as $key => $event) {
            if ($event->fd == $fd && $event->mask == $mask) {
                unset($this->eventContainer[$key]);
            }
        }
    }

}
<?php

class SocketEvent
{
    public Socket $fd;
    public int $mask;
    public $fileProc = null;
    public $finalizerProc = null;
    public ?Client $clientData = null;

    /**
     * @param EventLoop $eventLoop
     * @param Socket $fd
     * @param int $mask
     * @param callable $fileProc
     * @param callable|null $finalizerProc
     * @param Client|null $clientData
     */
    public function __construct(Socket $fd, int $mask, callable $fileProc, callable $finalizerProc = null, Client $clientData = null)
    {
        $this->fd = $fd;
        $this->mask = $mask;
        $this->fileProc = $fileProc;
        $this->finalizerProc = $finalizerProc;
        $this->clientData = $clientData;
    }

}
<?php

class Client
{
    public Socket $fd;
    public string $queryBuf;
}
<?php

ini_set('display_errors', 'On');
error_reporting(E_ALL | E_WARNING);

include "src/TcpSocket.php";
include "src/SocketEvent.php";
include 'src/EventLoop.php';
include "src/Client.php";

$socket = new TcpSocket('127.0.0.1', 8888);
$socketFd = $socket->getTcpServer();

$eventLoop = new EventLoop();
$sendReplyToClient = function (EventLoop $eventLoop, $socketFd, Client $clientData, int $mask) {
    socket_write($socketFd, $clientData->queryBuf, 1024);
    //发送数据完毕后关闭 socket 写事件,要不然就会一直在玄幻
    $eventLoop->deleteSocketEventLoop($socketFd, $mask);
};
$readQueryHandler = function (EventLoop $eventLoop, $socketFd, Client $clientData, int $mask) use ($sendReplyToClient) {
    $data = socket_read($socketFd, 1024);

    if (strlen($data) == 0) {
        echo "Client closed connection" . PHP_EOL;
        $eventLoop->deleteSocketEventLoop($socketFd, AE_READABLE);
        $eventLoop->deleteSocketEventLoop($socketFd, AE_WRITABLE);
        return;
    }

    if ($data) {
        $data = trim($data);
        $clientData->queryBuf = $data;
        $eventLoop->createSocketEvent($socketFd, AE_WRITABLE, $sendReplyToClient, null, $clientData);
    }
    return;
};

$acceptHandler = function (EventLoop $eventLoop, $socketFd) use ($readQueryHandler) {
    $clientFD = socket_accept($socketFd);
    if (!$clientFD) {
        echo "socket_accept_error:" . socket_strerror(socket_last_error()) . PHP_EOL;
        socket_close($socketFd);
        exit();
    }
    socket_set_nonblock($clientFD);
    socket_getpeername($clientFD, $addr, $port);
    echo "Accepted $addr $port" . PHP_EOL;
    $clientData = new Client();
    $clientData->fd = $clientFD;
    $clientData->queryBuf = '';
    $eventLoop->createSocketEvent($clientFD, AE_READABLE, $readQueryHandler, null, $clientData);
};

$eventLoop->createSocketEvent($socketFd, AE_READABLE, $acceptHandler, null, null);
$eventLoop->run();

2023-03-17T13:05:55.png

单 Reactor 单进程的方案因为全部工作都在同一个进程内完成,所以实现起来比较简单,不需要考虑进程间通信,也不用担心多进程竞争。

但是,这种方案存在 2 个缺点:

  • 第一个缺点,因为只有一个进程,无法充分利用 多核 CPU 的性能;
  • 第二个缺点,Handler 对象在业务处理时,整个进程是无法处理其他连接的事件的,如果业务处理耗时比较长,那么就造成响应的延迟;

所以,单 Reactor 单进程的方案不适用计算机密集型的场景,只适用于业务处理非常快速的场景。

Redis 是由 C 语言实现的,在 Redis 6.0 版本之前采用的正是「单 Reactor 单进程」的方案,因为 Redis 业务处理主要是在内存中完成,操作的速度是很快的,性能瓶颈不在 CPU 上,所以 Redis 对于命令的处理是单进程的方案。

由于 PHP 核心包内并没有包含多线程的扩展,所以在下面的一张,我们将实现基于单 Reactor 多进程的方案。

maksim
Maksim(一笑,吡罗),PHPer,Goper
OωO
开启隐私评论,您的评论仅作者和评论双方可见