RPC 通信
Swoole框架提供的RPC服务器支持了单连接并发、PHP-FPM下长连接维持等特性。在车轮互联大规模应用,构建了4层架构的服务化架构。
- 服务器端代码:http://git.oschina.net/swoole/swoole_framework/blob/master/libs/Swoole/Protocol/RPCServer.php
- 客户端代码:http://git.oschina.net/swoole/swoole_framework/blob/master/libs/Swoole/Client/RPC.php
与Http协议对比
很多企业使用Http Rest实现RPC通信,实现简单可以利用到很多现成的工具和方案。但是Http通信协议存在2个严重的缺陷。
- Http不支持单连接并发,如果要同时并发很多请求,必须创建大量TCP连接。如果
php-fpm
开启500个进程,每次需要128个并发,那么就需要创建64000
个TCP连接。 - Http对长连接支持不够好,很多Http程序都是设计为短连接的,在请求时创建TCP连接、请求结束时close,这会带来额外的网络通信消耗
Swoole框架的RPC客户端使用16字节固定包头+包体的通信方式,支持单连接并发、支持在php-fpm
开启长连接。
php-fpm长连接
在php-fpm中维持TCP长连接主要借助swoole扩展提供的SWOOLE_KEEP
选项,客户端设置此选项后,在请求结束时不会关闭连接,新的请求到来后可以复用TCP连接。另外底层内置了长连接检测的能力。
- 在执行
$client->connect()
自动检测连接是否可用,如果复用的连接已经失效,底层会重新创建一个新的TCP长连接。 - 在执行
$client->connect()
自动清理垃圾数据,避免上一次客户端超时残留的数据导致服务异常
$socket = new \swoole_client(SWOOLE_SOCK_TCP | SWOOLE_KEEP, WOOLE_SOCK_SYNC);
$socket->set(array(
'open_length_check' => true,
'package_max_length' => $this->packet_maxlen,
'package_length_type' => 'N',
'package_body_offset' => RPCServer::HEADER_SIZE,
'package_length_offset' => 0,
));
TCP包头
struct
{
uint32_t length;
uint16_t reserved_1; //保留字段1
uint8_t reserved_2; //保留字段2
uint8_t type;
uint32_t uid;
uint32_t serid;
char body[0];
}
- length:包体的长度
- reserved_1 保留的16位字段
- reserved_2 保留的8位字段
- type:包体的打包格式,低4位用于表示包体打包的格式 =1使用PHP串化格式,=2使用JSON格式,其他格式暂未支持,高4位用于保存压缩格式,如gzip
- uid:用户自定义的ID,保留字段
- serid:Request/Response 串号
单连接并发
客户端
请求串号就是单连接并发的秘诀了,客户端即使是同一个连接,也可以同时发出多个Request
,这与Http协议是不同的,Http协议即使启用了Keep-Alive
单个连接只能发出一次Request
,必须等到服务器端发送Response
才能发送下一个Request
。RPC客户端收到Response
会根据其中的串号,将不同的Response
和Request
对应起来。
有些Request
可能会超时,RPC客户端通过对比请求ID可以判断出哪些Response
可能是上次请求超时残留的数据,并进行丢弃处理。
服务器端
在车轮互联的RPC服务器中,大部分使用了同步阻塞模式,小部分使用了异步模式。
同步服务器的实现依赖swoole扩展提供的dispatch_mode=3
选项,并设置worker_num
为128。swoole底层实现了连接与请求分离,同一个连接不同的Request
包会被分配到不同的Worker进程
并发地进行处理。Response
再由swoole底层逐个发送给客户端。服务器端也可以很好低支持单连接并发,即使只有一个TCP连接也可以利用到所有128个Worker进程的处理能力。
$server = new Swoole\Server('0.0.0.0', 8888);
$server->set(array(
'worker_num' => 128,
'max_request' => 5000,
'dispatch_mode' => 3,
'open_length_check' => 1,
'package_max_length' => $AppSvr->packet_maxlen,
'package_length_type' => 'N',
'package_body_offset' => \Swoole\Protocol\RPCServer::HEADER_SIZE,
'package_length_offset' => 0,
));
串行调用
$res = $service->call('User\Info::unlock', '18958653669', 1);
$result = $res->getResult(); //如果返回NULL,表示网络调用失败了,请检查$res->code
$res = $service->call('User\Info::unlock', '18958653669', 1);
$result = $res->getResult();
$res = $service->call('User\Info::unlock', '18958653669', 1);
$result = $res->getResult();
并行调用
$res1 = $service->call('User\Info::unlock', '18958653669', 1);
$res2 = $service->call('User\Info::unlock', '18958653669', 1);
$res3 = $service->call('User\Info::unlock', '18958653669', 1);
//0.5表示500毫秒超时,$n表示成功返回的请求个数。如果少于发起的请求数,证明有个别请求超时了
$n = $service->wait(0.5);
$result1 = $res1->getResult();
$result2 = $res2->getResult();
$result3 = $res3->getResult();
实际上底层对于串行并行的处理方式是相同的,串行调用在执行getResult()
时会自动wait
一次,等待服务器端发送Response
,RPC客户端的wait
操作基于swoole_client_select
实现。
function wait($timeout = 0.5)
{
$st = microtime(true);
$success_num = 0;
while (count($this->waitList) > 0)
{
$write = $error = $read = array();
foreach ($this->waitList as $obj)
{
/**
* @var $obj RPC_Result
*/
if ($obj->socket !== null)
{
$read[] = $obj->socket;
}
}
if (empty($read))
{
break;
}
//去掉重复的socket
Tool::arrayUnique($read);
//等待可读事件
$n = $this->select($read, $write, $error, $timeout);
if ($n > 0)
{
//可读
foreach($read as $connection)
{
$data = $this->recvPacket($connection);
//socket被关闭了
if ($data === "")
{
foreach($this->waitList as $retObj)
{
if ($retObj->socket == $connection)
{
$retObj->code = RPC_Result::ERR_CLOSED;
unset($this->waitList[$retObj->requestId]);
$this->closeConnection($retObj->server_host, $retObj->server_port);
}
}
continue;
}
elseif ($data === false)
{
continue;
}
$header = unpack(RPCServer::HEADER_STRUCT, substr($data, 0, RPCServer::HEADER_SIZE));
//不在请求列表中,错误的请求串号
if (!isset($this->waitList[$header['serid']]))
{
trigger_error(__CLASS__ . " invalid responseId[{$header['serid']}].", E_USER_WARNING);
continue;
}
$retObj = $this->waitList[$header['serid']];
//成功处理
$this->finish(RPCServer::decode(substr($data, RPCServer::HEADER_SIZE), $header['type']), $retObj);
$success_num++;
}
}
//发生超时
if ((microtime(true) - $st) > $timeout)
{
foreach ($this->waitList as $obj)
{
$obj->code = ($obj->socket->isConnected()) ? RPC_Result::ERR_TIMEOUT : RPC_Result::ERR_CONNECT;
//执行after钩子函数
$this->afterRequest($obj);
}
//清空当前列表
$this->waitList = array();
return $success_num;
}
}
//未发生任何超时
$this->waitList = array();
$this->requestIndex = 0;
return $success_num;
}
- $waitList 是所有
Request
的集合 - 多个
Request
使用的TCP连接可能是相同的几个,这里使用了Tool::arrayUnique
进行去重 - 使用
swoole_client_select
等待Socket可读事件 - 在可读事件中调用
recvPacket
收包,并解析包头,收到Response
时读取请求ID自动从waitList
中移除Request
- 循环的默认会进行时间检测,发生超时或全部成功时退出,返回Response的数量