当前位置: 首页 > 工具软件 > TARS-PHP > 使用案例 >

tars-php 改造 invoke 支持多个tcp请求

梁浩
2023-12-01

现在“微服务”越来越少流行, 公司内部很多接口原来是CGI, 或者直接读DB,都改成用微服务接口,
不用CGI接口,其实挻好的,不好暴露接口到公网,网络延时也少,不需要做鉴权,直接调用微服务接口就可以。
但不能读DB,也调用接口,就稍微有些不便。
打个比方
有一张表,里面有3万条的数据,我要取ID和名字,两个字段,
“ select id,name from table ”就可以 了,
但接口每次返回1000条, 我就要连续查询30次,假如每个查询200ms, 全部查询出来都要6s,
这个结果肯定不能接受, 但tars-php 官方每次查询只支持一个, 所以我们就只能动手去改造了。

支持多个TCP客户端连接

在Tars\client\Communicator 添加 tcpMutipleConnect方法

 /**
     * 返回多个TCP连接
     * @param int $connection_quantity 连接数量
     * @param string $sIp               目标IP
     * @param int $iPort                机架端口
     * @param int $timeout_ms_max       连接超时时间
     * @return array                返回连接
     * @throws Exception
     */
    public function tcpMutipleConnect(int $connection_quantity, $sIp = '', $iPort = 0,$timeout_ms_max=100)
    {
        $connection_array=[];

        //循环执行连接
        for($i=0;$i<$connection_quantity;$i++)
        {
            $socket = stream_socket_client("tcp://{$sIp}:{$iPort}",$errno,$errstr,1,STREAM_CLIENT_ASYNC_CONNECT);
            stream_set_blocking($socket,0);
            $connection_array[] = array(
                'socket' => $socket
            );
        }

        //记录开始时间和剩下未成功的连接
        $time_start = microtime(true);
        $connection_array_no_ok = $connection_array;

        //逐一判断是否可用,如果出现不可用则返回false
        while(1)
        {
            //如果是第一次或者仍然需要处理,则休息一段时间
            usleep(5 * 1000);

            //逐一判断是否可用,如果出现不可用则返回false
            foreach($connection_array_no_ok as $key1 => $value1)
            {
                //尝试发送,如果成功则从数组中去掉
                if(@stream_socket_sendto($value1['socket'],'') >= 0)
                {
                    unset($connection_array_no_ok[$key1]);
                    continue;
                }
            }

            //如果已经处理完则退出
            if(empty($connection_array_no_ok))
            {
                break;
            }

            //如果已经超时也退出
            if((microtime(true) - $time_start) * 1000 >= $timeout_ms_max)
            {
                break;
            }
        }

        //如果仍然有不OK的连接则返回false
        if(empty($connection_array_no_ok) === false)
        {
            throw new Exception("TCP 多连接不成功");
        }
        //逐一设置阻塞
        foreach($connection_array as $key1 => $value1)
        {
            stream_set_blocking($value1['socket'],1);
        }
        //返回数据
        return $connection_array;
    }

异步的多个TCP请求处理

在Tars\client\Communicator 添加 invokeTcpMutiple 方法


/**
     * 异步的多个TCP请求处理
     * @param $requestPacket_list  请求列表
     * @param int $timeout      超时时间 (ms)
     * @param string $sIp           请求IP
     * @param int $iPort            请求端口
     * @param int $connection_quantity  最多并发量 (同时请求的连接数)
     * @return array                    返回结果
     * @throws Exception
     * @throws \Exception
     */
    public function invokeTcpMutiple($requestPacket_list, $timeout=30000, $sIp = '', $iPort = 0,$connection_quantity=200)
    {
        // 转换成网络需要的timeout
        $timeout = $timeout / 1000;

        $count = count($this->_routeInfo) - 1;
        if ($count === -1) {
            throw new \Exception('Rout fail', Code::ROUTE_FAIL);
        }
        $index = rand(0, $count);
        $ip = empty($sIp) ? $this->_routeInfo[$index]['sIp'] : $sIp;
        $port = empty($iPort) ? $this->_routeInfo[$index]['iPort'] : $iPort;

        $requestPacket_list_count = count($requestPacket_list) ;


        //检查KEY数量,如果超多则返回false
        if($requestPacket_list_count > $connection_quantity)
        {
            throw new \Exception("已超出最大并发请求:".$connection_quantity);
        }

        $connect_info_chunk_array = $this->tcpMutipleConnect($requestPacket_list_count,$ip,$port);

        //逐个拼装发送数据
        foreach($requestPacket_list as $key1 => $requestPacket)
        {
            $socket = $connect_info_chunk_array[$key1]['socket'];

            $requestBuf = $requestPacket->encode();

            //执行发送
            fwrite($socket,$requestBuf, strlen($requestBuf));
        }


        //执行接收
        $read = array();
        $write  = array();
        $except = array();
        $socket_remaining = array();
        $return_result=[];
        foreach($connect_info_chunk_array as $key1 => $value1)
        {
            $socket_remaining[$key1] = array(
                'socket' => $value1['socket'],
                'result' => '',
                'length' => 0,
                'is_finish' => false,
            );
            $read[] = $value1['socket'];
        }


        $time_start = microtime(true);
        while(($num_changed_streams = stream_select($read,$write,$except,0,200 * 1000)) !== false)
        {
            //如果已经超时,则退出
            if(microtime(true) - $time_start >= $timeout)
            {
                break;
            }

            //如果没有变化的则继续等
            if($num_changed_streams == 0)
            {
                $read = array();
                foreach($socket_remaining as $key1 => $value1)
                {
                    if($value1['is_finish'] === false)
                    {
                        $read[] = $value1['socket'];
                    }
                }
                $write = array();
                $except = array();
                continue;
            }

            //接收数据
            foreach($read as $key1 => $value1)
            {
                //找到这个socket对应的key
                $socket_belong_key = null;
                foreach($socket_remaining as $key2 => $value2)
                {
                    if($value1 == $value2['socket'])
                    {
                        $socket_belong_key = $key2;
                        break;
                    }
                }
                if($socket_belong_key === null)
                {
                    throw new \Exception("出现了未知的Socket");
                }

                //如果这个socket已经设置为结束,则跳过
                if($socket_remaining[$socket_belong_key]['is_finish'])
                {
                    continue;
                }
                //获取数据
                $result = fread($value1, 8192);
                if(empty($result))
                {
                    throw new \Exception("数据接收不成功");
                }


                //如果是第一次获取则解析头部
                if($socket_remaining[$socket_belong_key]['length'] == 0)
                {
                    //在这里从第一个包中获取总包长
                    $list = unpack('Nlen', substr($result, 0, 4));
                    $total_length = $list['len'];
                    //记录长度和数据
                    $socket_remaining[$socket_belong_key]['length'] = $total_length;
                    $socket_remaining[$socket_belong_key]['result'] = $result;
                }
                //如果不是第一次收包,则累加并判断长度
                else
                {
                    //记录数据
                    $socket_remaining[$socket_belong_key]['result'] .= $result;
                }


                //如果数据超长则返回错误
                if(strlen($socket_remaining[$socket_belong_key]['result']) > $socket_remaining[$socket_belong_key]['length'])
                {
                    throw new \Exception("服务器返回数据超过预计长度的数据");
                }

                //如果长度已经达到则标记为完成
                if(strlen($socket_remaining[$socket_belong_key]['result']) == $socket_remaining[$socket_belong_key]['length'])
                {
                    $socket_remaining[$socket_belong_key]['is_finish'] = true;
                }
            }
            //本次所有数据都已经处理完成,准备下一次的句柄列表
            $read = array();
            foreach($socket_remaining as $key1 => $value1)
            {
                if($value1['is_finish'] === false)
                {
                    $read[] = $value1['socket'];
                }
            }
            $write = array();
            $except = array();

            //如果已经没有要处理的句柄,则退出
            if(empty($read))
            {
                break;
            }
        }
        //过滤掉未完成的数据
        foreach($socket_remaining as $key1 => $value1)
        {
            if($value1['is_finish'] === false)
            {
                unset($socket_remaining[$key1]);
                continue;
            }
        }

        //如果全部都没有完成则直接返回结果
        if(empty($socket_remaining))
        {
            return $return_result;
        }
        //开始逐一处理
        foreach($socket_remaining as $key1 => $value1) {
            //记录
            $responseBuf = $value1['result'];
            $responsePacket = new ResponsePacket();
            $responsePacket->_responseBuf = $responseBuf;
            $responsePacket->iVersion = $this->_iVersion;
            $sBuffer = $responsePacket->decode();

            //请求统计上报
//            $endTime = $this->militime();

            if(!is_null($this->_locator))
            {
//                $this->_statF->addStat($requestPacket->_servantName,$requestPacket->_funcName, $sIp,$iPort, ($endTime - $startTime), 0, 0);
            }

            $return_result[$key1]=$sBuffer;
        }
        return $return_result;
    }

改造GET方法

(微服务的接口,tars-php根据tars配置文件自动生成)

改造, 让get 支持多个请求,打比方说
select id,name from table 0,1000;
select id,name from table 1000,1000;
select id,name from table 2000,1000;
select id,name from table 3000,1000;

		public function GetMutilple( $TegReqList,GetTegRsp &$rsp) {
		try {
			$requestPacketList =[];
			foreach ($TegReqList  as $key=> $req )
			{
				$requestPacket = new RequestPacket();
				$requestPacket->_iVersion = $this->_iVersion;
				$requestPacket->_funcName = 'Get';
				$requestPacket->_servantName = $this->_servantName;
				$encodeBufs = [];

				$__buffer = TUPAPIWrapper::putStruct("req",1,$req,$this->_iVersion);
				$encodeBufs['req'] = $__buffer;
				$requestPacket->_encodeBufs = $encodeBufs;
				$requestPacketList[$key] = $requestPacket;
			}

			$sBuffer_list = $this->_communicator->invokeTcpMutiple($requestPacketList,$this->_iTimeout);

			$result_data=[];
			foreach ($TegReqList as $key=>$value ) {
				if(empty( $sBuffer_list[$key]))
				{
					$result_data[$key]=[
						'data'=>[],
						'info'=>[],
					];
				}

				$sBuffer = $sBuffer_list[$key];
				$rsp= new GetTegRsp;
				TUPAPIWrapper::getStruct("rsp",2,$rsp,$sBuffer,$this->_iVersion);
				$request =  TUPAPIWrapper::getInt32("",0,$sBuffer,$this->_iVersion);
				$result_data[$key]=[
					'data'=>$rsp,
					'info'=>$request,
				];
			}

			return $result_data;
		}
		catch (\Exception $e) {
			throw $e;
		}
	}

校验结果, 一次请求200个,耗时422ms, 速度大增!

real	0m0.422s
user	0m0.040s
sys	0m0.052s
 类似资料: