RPC 通信

优质
小牛编辑
136浏览
2023-12-01

Swoole框架提供的RPC服务器支持了单连接并发、PHP-FPM下长连接维持等特性。在车轮互联大规模应用,构建了4层架构的服务化架构。

与Http协议对比

很多企业使用Http Rest实现RPC通信,实现简单可以利用到很多现成的工具和方案。但是Http通信协议存在2个严重的缺陷。

  • Http不支持单连接并发,如果要同时并发很多请求,必须创建大量TCP连接。如果php-fpm开启500个进程,每次需要128个并发,那么就需要创建64000个TCP连接。
  • Http对长连接支持不够好,很多Http程序都是设计为短连接的,在请求时创建TCP连接、请求结束时close,这会带来额外的网络通信消耗

Swoole框架的RPC客户端使用16字节固定包头+包体的通信方式,支持单连接并发、支持在php-fpm开启长连接

php-fpm长连接

在php-fpm中维持TCP长连接主要借助swoole扩展提供的SWOOLE_KEEP选项,客户端设置此选项后,在请求结束时不会关闭连接,新的请求到来后可以复用TCP连接。另外底层内置了长连接检测的能力。

  • 在执行$client->connect()自动检测连接是否可用,如果复用的连接已经失效,底层会重新创建一个新的TCP长连接。
  • 在执行$client->connect()自动清理垃圾数据,避免上一次客户端超时残留的数据导致服务异常
$socket = new \swoole_client(SWOOLE_SOCK_TCP | SWOOLE_KEEP, WOOLE_SOCK_SYNC);
$socket->set(array(
    'open_length_check' => true,
    'package_max_length' => $this->packet_maxlen,
    'package_length_type' => 'N',
    'package_body_offset' => RPCServer::HEADER_SIZE,
    'package_length_offset' => 0,
));

TCP包头

struct
{
    uint32_t length;
    uint16_t reserved_1; //保留字段1
    uint8_t reserved_2;  //保留字段2
    uint8_t type;
    uint32_t uid;
    uint32_t serid;
    char body[0];
}
  • length:包体的长度
  • reserved_1 保留的16位字段
  • reserved_2 保留的8位字段
  • type:包体的打包格式,低4位用于表示包体打包的格式 =1使用PHP串化格式,=2使用JSON格式,其他格式暂未支持,高4位用于保存压缩格式,如gzip
  • uid:用户自定义的ID,保留字段
  • serid:Request/Response 串号

单连接并发

客户端

请求串号就是单连接并发的秘诀了,客户端即使是同一个连接,也可以同时发出多个Request,这与Http协议是不同的,Http协议即使启用了Keep-Alive单个连接只能发出一次Request,必须等到服务器端发送Response才能发送下一个Request。RPC客户端收到Response会根据其中的串号,将不同的ResponseRequest对应起来。

有些Request可能会超时,RPC客户端通过对比请求ID可以判断出哪些Response可能是上次请求超时残留的数据,并进行丢弃处理。

服务器端

在车轮互联的RPC服务器中,大部分使用了同步阻塞模式,小部分使用了异步模式。

同步服务器的实现依赖swoole扩展提供的dispatch_mode=3选项,并设置worker_num为128。swoole底层实现了连接与请求分离,同一个连接不同的Request包会被分配到不同的Worker进程并发地进行处理。Response再由swoole底层逐个发送给客户端。服务器端也可以很好低支持单连接并发,即使只有一个TCP连接也可以利用到所有128个Worker进程的处理能力。

$server = new Swoole\Server('0.0.0.0', 8888);
$server->set(array(
    'worker_num' => 128,
    'max_request' => 5000,
    'dispatch_mode' => 3,
    'open_length_check' => 1,
    'package_max_length' => $AppSvr->packet_maxlen,
    'package_length_type' => 'N',
    'package_body_offset' => \Swoole\Protocol\RPCServer::HEADER_SIZE,
    'package_length_offset' => 0,
));

串行调用

$res = $service->call('User\Info::unlock', '18958653669', 1);
$result = $res->getResult();  //如果返回NULL,表示网络调用失败了,请检查$res->code

$res = $service->call('User\Info::unlock', '18958653669', 1);
$result = $res->getResult();  

$res = $service->call('User\Info::unlock', '18958653669', 1);
$result = $res->getResult();  

并行调用

$res1 = $service->call('User\Info::unlock', '18958653669', 1);
$res2 = $service->call('User\Info::unlock', '18958653669', 1);
$res3 = $service->call('User\Info::unlock', '18958653669', 1);

//0.5表示500毫秒超时,$n表示成功返回的请求个数。如果少于发起的请求数,证明有个别请求超时了
$n = $service->wait(0.5);

$result1 = $res1->getResult();
$result2 = $res2->getResult();
$result3 = $res3->getResult();

实际上底层对于串行并行的处理方式是相同的,串行调用在执行getResult()时会自动wait一次,等待服务器端发送Response,RPC客户端的wait操作基于swoole_client_select实现。

function wait($timeout = 0.5)
{
    $st = microtime(true);
    $success_num = 0;

    while (count($this->waitList) > 0)
    {
        $write = $error = $read = array();
        foreach ($this->waitList as $obj)
        {
            /**
             * @var $obj RPC_Result
             */
            if ($obj->socket !== null)
            {
                $read[] = $obj->socket;
            }
        }
        if (empty($read))
        {
            break;
        }
        //去掉重复的socket
        Tool::arrayUnique($read);
        //等待可读事件
        $n = $this->select($read, $write, $error, $timeout);
        if ($n > 0)
        {
            //可读
            foreach($read as $connection)
            {
                $data = $this->recvPacket($connection);
                //socket被关闭了
                if ($data === "")
                {
                    foreach($this->waitList as $retObj)
                    {
                        if ($retObj->socket == $connection)
                        {
                            $retObj->code = RPC_Result::ERR_CLOSED;
                            unset($this->waitList[$retObj->requestId]);
                            $this->closeConnection($retObj->server_host, $retObj->server_port);
                        }
                    }
                    continue;
                }
                elseif ($data === false)
                {
                    continue;
                }
                $header = unpack(RPCServer::HEADER_STRUCT, substr($data, 0, RPCServer::HEADER_SIZE));
                //不在请求列表中,错误的请求串号
                if (!isset($this->waitList[$header['serid']]))
                {
                    trigger_error(__CLASS__ . " invalid responseId[{$header['serid']}].", E_USER_WARNING);
                    continue;
                }
                $retObj = $this->waitList[$header['serid']];
                //成功处理
                $this->finish(RPCServer::decode(substr($data, RPCServer::HEADER_SIZE), $header['type']), $retObj);
                $success_num++;
            }
        }
        //发生超时
        if ((microtime(true) - $st) > $timeout)
        {
            foreach ($this->waitList as $obj)
            {
                $obj->code = ($obj->socket->isConnected()) ? RPC_Result::ERR_TIMEOUT : RPC_Result::ERR_CONNECT;
                //执行after钩子函数
                $this->afterRequest($obj);
            }
            //清空当前列表
            $this->waitList = array();
            return $success_num;
        }
    }

    //未发生任何超时
    $this->waitList = array();
    $this->requestIndex = 0;
    return $success_num;
}
  • $waitList 是所有Request的集合
  • 多个Request使用的TCP连接可能是相同的几个,这里使用了Tool::arrayUnique进行去重
  • 使用swoole_client_select等待Socket可读事件
  • 在可读事件中调用recvPacket收包,并解析包头,收到Response时读取请求ID自动从waitList中移除Request
  • 循环的默认会进行时间检测,发生超时或全部成功时退出,返回Response的数量