现在“微服务”越来越少流行, 公司内部很多接口原来是CGI, 或者直接读DB,都改成用微服务接口,
不用CGI接口,其实挻好的,不好暴露接口到公网,网络延时也少,不需要做鉴权,直接调用微服务接口就可以。
但不能读DB,也调用接口,就稍微有些不便。
打个比方
有一张表,里面有3万条的数据,我要取ID和名字,两个字段,
“ select id,name from table ”就可以 了,
但接口每次返回1000条, 我就要连续查询30次,假如每个查询200ms, 全部查询出来都要6s,
这个结果肯定不能接受, 但tars-php 官方每次查询只支持一个, 所以我们就只能动手去改造了。
在Tars\client\Communicator 添加 tcpMutipleConnect方法
/**
* 返回多个TCP连接
* @param int $connection_quantity 连接数量
* @param string $sIp 目标IP
* @param int $iPort 机架端口
* @param int $timeout_ms_max 连接超时时间
* @return array 返回连接
* @throws Exception
*/
public function tcpMutipleConnect(int $connection_quantity, $sIp = '', $iPort = 0,$timeout_ms_max=100)
{
$connection_array=[];
//循环执行连接
for($i=0;$i<$connection_quantity;$i++)
{
$socket = stream_socket_client("tcp://{$sIp}:{$iPort}",$errno,$errstr,1,STREAM_CLIENT_ASYNC_CONNECT);
stream_set_blocking($socket,0);
$connection_array[] = array(
'socket' => $socket
);
}
//记录开始时间和剩下未成功的连接
$time_start = microtime(true);
$connection_array_no_ok = $connection_array;
//逐一判断是否可用,如果出现不可用则返回false
while(1)
{
//如果是第一次或者仍然需要处理,则休息一段时间
usleep(5 * 1000);
//逐一判断是否可用,如果出现不可用则返回false
foreach($connection_array_no_ok as $key1 => $value1)
{
//尝试发送,如果成功则从数组中去掉
if(@stream_socket_sendto($value1['socket'],'') >= 0)
{
unset($connection_array_no_ok[$key1]);
continue;
}
}
//如果已经处理完则退出
if(empty($connection_array_no_ok))
{
break;
}
//如果已经超时也退出
if((microtime(true) - $time_start) * 1000 >= $timeout_ms_max)
{
break;
}
}
//如果仍然有不OK的连接则返回false
if(empty($connection_array_no_ok) === false)
{
throw new Exception("TCP 多连接不成功");
}
//逐一设置阻塞
foreach($connection_array as $key1 => $value1)
{
stream_set_blocking($value1['socket'],1);
}
//返回数据
return $connection_array;
}
在Tars\client\Communicator 添加 invokeTcpMutiple 方法
/**
* 异步的多个TCP请求处理
* @param $requestPacket_list 请求列表
* @param int $timeout 超时时间 (ms)
* @param string $sIp 请求IP
* @param int $iPort 请求端口
* @param int $connection_quantity 最多并发量 (同时请求的连接数)
* @return array 返回结果
* @throws Exception
* @throws \Exception
*/
public function invokeTcpMutiple($requestPacket_list, $timeout=30000, $sIp = '', $iPort = 0,$connection_quantity=200)
{
// 转换成网络需要的timeout
$timeout = $timeout / 1000;
$count = count($this->_routeInfo) - 1;
if ($count === -1) {
throw new \Exception('Rout fail', Code::ROUTE_FAIL);
}
$index = rand(0, $count);
$ip = empty($sIp) ? $this->_routeInfo[$index]['sIp'] : $sIp;
$port = empty($iPort) ? $this->_routeInfo[$index]['iPort'] : $iPort;
$requestPacket_list_count = count($requestPacket_list) ;
//检查KEY数量,如果超多则返回false
if($requestPacket_list_count > $connection_quantity)
{
throw new \Exception("已超出最大并发请求:".$connection_quantity);
}
$connect_info_chunk_array = $this->tcpMutipleConnect($requestPacket_list_count,$ip,$port);
//逐个拼装发送数据
foreach($requestPacket_list as $key1 => $requestPacket)
{
$socket = $connect_info_chunk_array[$key1]['socket'];
$requestBuf = $requestPacket->encode();
//执行发送
fwrite($socket,$requestBuf, strlen($requestBuf));
}
//执行接收
$read = array();
$write = array();
$except = array();
$socket_remaining = array();
$return_result=[];
foreach($connect_info_chunk_array as $key1 => $value1)
{
$socket_remaining[$key1] = array(
'socket' => $value1['socket'],
'result' => '',
'length' => 0,
'is_finish' => false,
);
$read[] = $value1['socket'];
}
$time_start = microtime(true);
while(($num_changed_streams = stream_select($read,$write,$except,0,200 * 1000)) !== false)
{
//如果已经超时,则退出
if(microtime(true) - $time_start >= $timeout)
{
break;
}
//如果没有变化的则继续等
if($num_changed_streams == 0)
{
$read = array();
foreach($socket_remaining as $key1 => $value1)
{
if($value1['is_finish'] === false)
{
$read[] = $value1['socket'];
}
}
$write = array();
$except = array();
continue;
}
//接收数据
foreach($read as $key1 => $value1)
{
//找到这个socket对应的key
$socket_belong_key = null;
foreach($socket_remaining as $key2 => $value2)
{
if($value1 == $value2['socket'])
{
$socket_belong_key = $key2;
break;
}
}
if($socket_belong_key === null)
{
throw new \Exception("出现了未知的Socket");
}
//如果这个socket已经设置为结束,则跳过
if($socket_remaining[$socket_belong_key]['is_finish'])
{
continue;
}
//获取数据
$result = fread($value1, 8192);
if(empty($result))
{
throw new \Exception("数据接收不成功");
}
//如果是第一次获取则解析头部
if($socket_remaining[$socket_belong_key]['length'] == 0)
{
//在这里从第一个包中获取总包长
$list = unpack('Nlen', substr($result, 0, 4));
$total_length = $list['len'];
//记录长度和数据
$socket_remaining[$socket_belong_key]['length'] = $total_length;
$socket_remaining[$socket_belong_key]['result'] = $result;
}
//如果不是第一次收包,则累加并判断长度
else
{
//记录数据
$socket_remaining[$socket_belong_key]['result'] .= $result;
}
//如果数据超长则返回错误
if(strlen($socket_remaining[$socket_belong_key]['result']) > $socket_remaining[$socket_belong_key]['length'])
{
throw new \Exception("服务器返回数据超过预计长度的数据");
}
//如果长度已经达到则标记为完成
if(strlen($socket_remaining[$socket_belong_key]['result']) == $socket_remaining[$socket_belong_key]['length'])
{
$socket_remaining[$socket_belong_key]['is_finish'] = true;
}
}
//本次所有数据都已经处理完成,准备下一次的句柄列表
$read = array();
foreach($socket_remaining as $key1 => $value1)
{
if($value1['is_finish'] === false)
{
$read[] = $value1['socket'];
}
}
$write = array();
$except = array();
//如果已经没有要处理的句柄,则退出
if(empty($read))
{
break;
}
}
//过滤掉未完成的数据
foreach($socket_remaining as $key1 => $value1)
{
if($value1['is_finish'] === false)
{
unset($socket_remaining[$key1]);
continue;
}
}
//如果全部都没有完成则直接返回结果
if(empty($socket_remaining))
{
return $return_result;
}
//开始逐一处理
foreach($socket_remaining as $key1 => $value1) {
//记录
$responseBuf = $value1['result'];
$responsePacket = new ResponsePacket();
$responsePacket->_responseBuf = $responseBuf;
$responsePacket->iVersion = $this->_iVersion;
$sBuffer = $responsePacket->decode();
//请求统计上报
// $endTime = $this->militime();
if(!is_null($this->_locator))
{
// $this->_statF->addStat($requestPacket->_servantName,$requestPacket->_funcName, $sIp,$iPort, ($endTime - $startTime), 0, 0);
}
$return_result[$key1]=$sBuffer;
}
return $return_result;
}
(微服务的接口,tars-php根据tars配置文件自动生成)
改造, 让get 支持多个请求,打比方说
select id,name from table 0,1000;
select id,name from table 1000,1000;
select id,name from table 2000,1000;
select id,name from table 3000,1000;
public function GetMutilple( $TegReqList,GetTegRsp &$rsp) {
try {
$requestPacketList =[];
foreach ($TegReqList as $key=> $req )
{
$requestPacket = new RequestPacket();
$requestPacket->_iVersion = $this->_iVersion;
$requestPacket->_funcName = 'Get';
$requestPacket->_servantName = $this->_servantName;
$encodeBufs = [];
$__buffer = TUPAPIWrapper::putStruct("req",1,$req,$this->_iVersion);
$encodeBufs['req'] = $__buffer;
$requestPacket->_encodeBufs = $encodeBufs;
$requestPacketList[$key] = $requestPacket;
}
$sBuffer_list = $this->_communicator->invokeTcpMutiple($requestPacketList,$this->_iTimeout);
$result_data=[];
foreach ($TegReqList as $key=>$value ) {
if(empty( $sBuffer_list[$key]))
{
$result_data[$key]=[
'data'=>[],
'info'=>[],
];
}
$sBuffer = $sBuffer_list[$key];
$rsp= new GetTegRsp;
TUPAPIWrapper::getStruct("rsp",2,$rsp,$sBuffer,$this->_iVersion);
$request = TUPAPIWrapper::getInt32("",0,$sBuffer,$this->_iVersion);
$result_data[$key]=[
'data'=>$rsp,
'info'=>$request,
];
}
return $result_data;
}
catch (\Exception $e) {
throw $e;
}
}
校验结果, 一次请求200个,耗时422ms, 速度大增!
real 0m0.422s
user 0m0.040s
sys 0m0.052s