https://github.com/apache/incubator-brpc/blob/master/docs/cn/client.md
1. Client端示例程序
//Licensed to the Apache Software Foundation (ASF) under one//or more contributor license agreements. See the NOTICE file//distributed with this work for additional information//regarding copyright ownership. The ASF licenses this file//to you under the Apache License, Version 2.0 (the//"License"); you may not use this file except in compliance//with the License. You may obtain a copy of the License at//
// http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing,//software distributed under the License is distributed on an//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY//KIND, either express or implied. See the License for the//specific language governing permissions and limitations//under the License.//A client sending requests to server every 1 second.
#include#include#include#include#include"echo.pb.h"DEFINE_string(attachment,"", "Carry this along with requests");
DEFINE_string(protocol,"baidu_std", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(connection_type,"", "Connection type. Available values: single, pooled, short");
DEFINE_string(server,"0.0.0.0:8000", "IP Address of server");
DEFINE_string(load_balancer,"", "The algorithm for load balancing");
DEFINE_int32(timeout_ms,100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry,3, "Max retries(not including the first RPC)");
DEFINE_int32(interval_ms,1000, "Milliseconds between consecutive requests");int main(int argc, char*argv[]) {//Parse gflags. We recommend you to use gflags as well.
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);//A Channel represents a communication line to a Server. Notice that//Channel is thread-safe and can be shared by all threads in your program.
brpc::Channel channel;//Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.protocol=FLAGS_protocol;
options.connection_type=FLAGS_connection_type;
options.timeout_ms= FLAGS_timeout_ms/*milliseconds*/;
options.max_retry=FLAGS_max_retry;if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
LOG(ERROR)<< "Fail to initialize channel";return -1;
}//Normally, you should not call a Channel directly, but instead construct//a stub Service wrapping it. stub can be shared by all threads as well.
example::EchoService_Stub stub(&channel);//Send a request and wait for the response every 1 second.
int log_id = 0;while (!brpc::IsAskedToQuit()) {//We will receive response synchronously, safe to put variables//on stack.
example::EchoRequest request;
example::EchoResponse response;
brpc::Controller cntl;
request.set_message("hello world");
cntl.set_log_id(log_id++); //set by user//Set attachment which is wired to network directly instead of//being serialized into protobuf messages.
cntl.request_attachment().append(FLAGS_attachment);//Because `done'(last parameter) is NULL, this function waits until//the response comes back or error occurs(including timedout).
stub.Echo(&cntl, &request, &response, NULL);if (!cntl.Failed()) {
LOG(INFO)<< "Received response from" <
<< cntl.response_attachment() << ")"
<< "latency=" << cntl.latency_us() << "us";
}else{
LOG(WARNING)<
}
usleep(FLAGS_interval_ms* 1000L);
}
LOG(INFO)<< "EchoClient is going to quit";return 0;
}
2. 事实速查
Channel.Init()是线程不安全的。
Channel.CallMethod()是线程安全的,一个Channel可以被所有线程同时使用。
Channel可以分配在栈上。
Channel在发送异步请求后可以析构。
没有brpc::Client这个类。
3. Channel
Client指发起请求的一端,在brpc中没有对应的实体,取而代之的是brpc::Channel,它代表和一台或一组服务器的交互通道,Client和Channel在角色上的差异在实践中并不重要,你可以把Channel视作Client。
吐槽一下:那为什么不使用Client的概念,以减少大家的学习成本呢?
Channel可以被所有线程共用,你不需要为每个线程创建独立的Channel,也不需要用锁互斥。不过Channel的创建和Init并不是线程安全的,请确保在Init成功后再被多线程访问,在没有线程访问后再析构。
一些RPC实现中有ClientManager的概念,包含了Client端的配置信息和资源管理。brpc不需要这些,以往在ClientManager中配置的线程数、长短连接等等要么被加入了brpc::ChannelOptions,要么可以通过gflags全局配置,这样做的好处:
方便。不需要在创建Channel时传入ClientManager,也不需要存储ClientManager。否则不少代码需要一层层地传递ClientManager,很麻烦。gflags使一些全局行为的配置更加简单。
共用资源。比如server和channel可以共用后台线程(bthread的工作线程)。 ???
生命周期。析构ClientManager的过程很容易出错,现在由框架负责,则不会有问题。
就像大部分类那样,Channel必须在Init之后才能使用,options为NULL时所有参数取默认值,如果你要使用非默认值,这么做就行了:
brpc::ChannelOptions options; //包含了默认值
options.xxx =yyy;
...
channel.Init(...,&options);
注意Channel不会修改options,Init结束后不会再访问options。所以options一般就像上面代码中那样放栈上。Channel.options()可以获得channel在使用的所有选项。
Init函数分为连接一台服务器和连接服务集群。
4. 连接一台服务器
//options为NULL时取默认值
int Init(EndPoint server_addr_and_port, const ChannelOptions*options);int Init(const char* server_addr_and_port, const ChannelOptions*options);int Init(const char* server_addr, int port, const ChannelOptions* options);
这类Init连接的服务器往往有固定的ip地址,不需要命名服务和负载均衡,创建起来相对轻量。但是请勿频繁创建使用域名的Channel。这需要查询dns,可能最多耗时10秒(查询DNS的默认超时)。重用它们。
合法的“server_addr_and_port”:
127.0.0.1:80
www.foo.com:8765
localhost:9000
5. 连接服务集群
int Init(const char*naming_service_url,const char*load_balancer_name,const ChannelOptions* options);
这类Channel需要定期从naming_service_url指定的命名服务中获得服务器列表,并通过load_balancer_name指定的负载均衡算法选择出一台机器发送请求。
你不应该在每次请求前动态地创建此类(连接服务集群的)Channel。因为创建和析构此类Channel牵涉到较多的资源,比如在创建时得访问一次命名服务,否则便不知道有哪些服务器可选。由于Channel可被多个线程共用,一般也没有必要动态构建。
当load_balanver_name为NULL或空时,此Init等同于连接单台server的Init,naming_service_url应该是“ip:port”或“域名:port”。你可以通过这个Init函数统一Channel的初始化方式。比如你可以把naming_service_url和load_balancer_name放在配置文件中,要连接单台server时把load_balancer_name置空,要连接服务集群时则设置一个有效的算法名称。
5.1 命名服务
命名服务把一个名字映射为可修改的机器列表,在client端的位置如下:
有了命名服务后client记录的是一个名字,而不是每一台下游机器。而当下游机器变化时,就只需要修改命名服务中的列表,而不需要逐台修改每个上游。这个过程也常被称为“解耦上下游”。当然在具体实现上,上游会记录每一台下游机器,并定期向命名服务请求或被推送最新的列表,以避免在RPC请求时采取访问命名服务。使用命名服务一般不会对性能造成影响,对命名服务的压力也很小。
naming_service_url的一般形式是“protocol://service_name”。
作者给了几种例子,这里仅列举出来,不展开了:
bns://
file://
list://,...
http://
https://
consul://
更多命名服务
用户可以通过实现brpc::NamingService来对接更多命名服务,可以参考 《brpc学习笔记之负载均衡》
命名服务中的tag
每个地址可以附带一个tag,在常见的命名服务中,如果地址后有空格,则空格之后的内容均为tag。相同地址配合不同的tag,被认为是不同的实例,brpc会建立不同的连接。用户可利用这个特性更灵活地控制与单个地址的连接方式。如果你需要“带权重的轮询”,你应当优先考虑使用wrr(weighted round roubin)算法,而不是用tag来模拟。
VIP相关的问题
TODO
命名服务过滤器
当命名服务获得机器列表后,可以自定义一个过滤器进行筛选,最后把结果传递给负载均衡:
过滤器接口如下:
//naming_service_filter.h
classNamingServiceFilter {public://Return true to take this `server' as a candidate to issue RPC//Return false to filter it out
virtual bool Accept(const ServerNode& server) const = 0;
};//naming_service.h
structServerNode {
butil::EndPoint addr;
std::stringtag;
};
常见的业务策略如根据server的tag进行过滤。
自定义的过滤器配置在ChannelOptions中,默认为NULL(不过滤)。
class MyNamingServiceFilter : publicbrpc::NamingServiceFilter {public:bool Accept(const brpc::ServerNode& server) const{return server.tag == "main";
}
};intmain() {
...
MyNamingServiceFilter my_filter;
...
brpc::ChannelOptions options;
options.ns_filter= &my_filter;
...
}
5.2 负载均衡
当下游机器超过一台时,我们需要分割流量,此过程一般称为负载均衡,在client端的位置如下图所示:
理想的算法是每个请求都得到及时的处理,且任意机器crash对全局影响较小。但由于client端无法及时获得server端的延迟或拥塞,而且负载均衡算法不能耗费太多的cpu,一般来说用户得根据具体的场景选择合适的算法,目前rpc提供的算法有(通过load_balancer_name制定):
rr
即round robin,总是选择列表中的下一台服务器,结尾的下一台是开头,无需其他设置。比如有3台机器a,b,c,那么brpc会一次想a,b,c,a,b,c,...发送请求。注意这个算法的前提是服务器的配置、网络条件、负载都是类似的。
wrr
即weighted round robin,根据服务器列表配置的权重值来选择服务器。服务器被选到的机会正比于其权重值,并且该算法能保证同一服务器被选到的结果较均衡的散开。
random
随机从列表中选择一台服务器,无需其他设置。和round robin类似,这个算法的前提也是服务器都是类似的。
la
locality-aware,优先选择延时低的下游,直到其延时高于其他机器,无需其他设置。实现原理请查看《brpc学习笔记之负载均衡》
c_murmurhash or c_md5
一致性哈希,与简单hash的不同之处在于增加和删除机器时不会使分桶结果剧烈变化,特别适合cache类服务。 ???
发起RPC前需要设置Controller.set_request_code(),否则RPC会失败。request_code一般是请求中主键部分的32位哈希值,不需要和负载均衡使用的哈希算法一致。比如用c_murmurhash算法也可以用md5计算哈希值。
src/brpc/policy/hasher.h中包含了常用的hash函数。如果用std::string key代表请求的主键,controller.set_request_code(brpc::policy::MurmurHash32(key.data(), key.size()))就正确设置了request_code。
注意甄别请求中的“主键”部分和“属性”部分,不要为了偷懒或通用,就把请求的所有内容一股脑儿计算出哈希值,属性的变化会使请求的目的地 发生剧烈的变化。另外也要注意padding的问题,比如struct Foo { int32_t a; int64_t b; }在64位机器上a和b之间有4个字节的空隙,内容未定义,如果像hash(&foo, sizeof(foo))这样计算哈希值,结果就是未定义的,得把内容紧密排列或序列化后再算。
从集群宕机后恢复时的客户端限流
集群宕机指的是集群中所有server都处于不可用的状态。由于健康检查机制,当集群恢复正常后,server会间隔性地上线。当某一个server上线后,所有的流量都会发送过去,可能导致服务再次过载。若熔断开启,则可能导致其它server上线前该server再次熔断,集群永远无法恢复。作为解决方案,brpc提供了在集群宕机后恢复时的限流机制:当集群中没有可用server时,集群进入恢复状态,假设正好能服务所有请求的server数量为min_working_instances,当前集群可用的server数量为q,则在恢复状态时,client接受请求的概率为q/min_working_instances,否则丢弃;若一段时间hold_seconds内q保持不变,则把流量重新发送到全部可用的server上,并离开恢复状态。
在恢复阶段时,可以通过判断controller.ErrorCode()是否等于brpc::ERJECT来判断该次请求是否被拒绝,被拒绝的请求不会被框架重试。
此恢复机制要求下游server的能力是类似的,所以目前只针对rr和random有效,开启方式是在load_balancer_name后面加上min_working_instances和hold_seconds参数的值,例如
channel.Init("http://...", "random:min_working_instances=6 hold_seconds=10", &options);
5.3 健康检查
连接断开的server会被暂时隔离而不会被负载均衡算法选中,brpc会定期连接被隔离的server,以检查他们是否恢复正常,间隔由参数-health_check_interval控制。
在默认的配置下,一旦server被连接上,它会恢复为可用状态;brpc还提供了应用层健康检查的机制,框架会发送一个HTTP GET请求到该server,请求路径通过-health_check_path设置(默认为空),只有当server返回200时,它才会恢复。在两种健康检查机制下,都可通过-health_check_timeout_ms设置超时(默认500ms)。如果在隔离过程中,server从命名服务中删除了,brpc也会停止连接尝试。
6. 发起访问
一般来说,我们不直接调用Channel.CallMethod,而是通过protobuf生成的桩XXX_Stub,过程更像是“调用函数”。stub内没什么成员变量,建议在栈上创建和使用,而不必new,当然你也可以把stub存下来复用。Channel::CallMethod和stub访问都是线程安全的,可以被所有线程同时访问。比如:
XXX_Stub stub(&channel);
stub.some_method(controller, request, response, done);
甚至
XXX_Stub(&channel).some_method(controller, request, response, done);
一个例外是http/h2 client。访问http服务和protobuf没什么关系,直接调用CallMethod即可,除了Controller和done均为NULL,详见 《brpc学习笔记之http client》。
6.1 同步访问
指的是:CallMethod会阻塞到收到server端返回response或发生错误(包括超时)。
同步访问中的response/controller不会在CallMethod后被框架使用,它们都可以分配在栈上。注意,如果request/response字段特别多、字节数特别大的话,还是更适合分配在堆上。
MyRequest request;
MyResponse response;
brpc::Controller cntl;
XXX_Stub stub(&channel);
request.set_foo(...);
cntl.set_timeout_ms(...);
stub.some_method(&cntl, &request, &response, NULL);if (cntl->Failed()) {//RPC失败了. response里的值是未定义的,勿用。
} else{//RPC成功了,response里有我们想要的回复数据。
}
6.2 异步访问
指的是:给CallMethod传递一个额外的回调对象done,CallMethod在发出request就结束了,而不是在RPC结束后。当server端返回response或发生错误(包括超时)时,done->Run()会被调用。对RPC的后续处理应该写在done->Run()里,而不是CallMethod后。
由于CallMethod结束并不意味着RPC结束,response/controller仍可能被框架及done->Run()使用,它们一般得创建在堆上,并在done->Run()中删除。如果提前删除了它们,那当done->Run()被调用时,将访问到无效内存。
你可以独立地创建这些对象,并使用NewCallback生成done,也可以把Response和Controller作为done的成员变量,一起new出来,一般使用前一种方法。
发起异步请求后Request和Channel也可以立刻析构。这两样和response/controller是不同的。注意:这是说Channel的析构可以立刻发生在CallMethod之后,并不是说析构可以和CallMethod同时发生,删除正被另一个线程使用的Channel是未定义行为(很可能crash)。
使用NewCallback
static void OnRPCDone(MyResponse* response, brpc::Controller*cntl) {//unique_ptr会帮助我们在return时自动删掉response/cntl,防止忘记。gcc 3.4下的unique_ptr是模拟版本。
std::unique_ptrresponse_guard(response);
std::unique_ptr<:controller>cntl_guard(cntl);if (cntl->Failed()) {//RPC失败了. response里的值是未定义的,勿用。
} else{//RPC成功了,response里有我们想要的数据。开始RPC的后续处理。
}//NewCallback产生的Closure会在Run结束后删除自己,不用我们做。
}
MyResponse* response = newMyResponse;
brpc::Controller* cntl = newbrpc::Controller;
MyService_Stub stub(&channel);
MyRequest request;//你不用new request,即使在异步访问中.
request.set_foo(...);
cntl->set_timeout_ms(...);
stub.some_method(cntl,&request, response, google::protobuf::NewCallback(OnRPCDone, response, cntl));
由于protobuf 3把NewCallback设置为私有,r32035后brpc把NewCallback独立于src/brpc/callback.h(并增加了一些重载)。如果你的程序出现NewCallback相关的编译错误,把google::protobuf::NewCallback替换为brpc::NewCallback就行了。
继承google::protobuf::Closure
使用NewCallback的缺点是要分配三次内存:response,controller,done。如果profiler证明这儿的内存分配有瓶颈,可以考虑自己继承Closure,把response/controller作为成员变量,这样可以把三次new合并为一次。但缺点就是代码不够美观,如果内存分配不是瓶颈,别用这种方法。
class OnRPCDone: publicgoogle::protobuf::Closure {public:voidRun() {//unique_ptr会帮助我们在return时自动delete this,防止忘记。gcc 3.4下的unique_ptr是模拟版本。
std::unique_ptr self_guard(this);if (cntl->Failed()) {//RPC失败了. response里的值是未定义的,勿用。
} else{//RPC成功了,response里有我们想要的数据。开始RPC的后续处理。
}
}
MyResponse response;
brpc::Controller cntl;
}
OnRPCDone* done = newOnRPCDone;
MyService_Stub stub(&channel);
MyRequest request;//你不用new request,即使在异步访问中.
request.set_foo(...);
done->cntl.set_timeout_ms(...);
stub.some_method(&done->cntl, &request, &done->response, done);
如果异步访问中的回调函数特别复杂会有什么影响吗?
没有特别的影响,回调会运行在独立的bthread中,不会阻塞其他的逻辑。你可以在回调中做各种阻塞操作。
rpc发送处的代码和回调函数是在同一个线程里执行吗?
一定不在同一个线程里运行,即使该次rpc调用刚进去就失败了,回调也会在另一个bthread中运行。这可以在加锁进行rpc(不推荐)的代码中避免死锁。???
6.3 等待RPC完成
注意:当你需要发起多个并发操作时,可能ParallelChannel更方便。
如下代码发起两个异步RPC后等待他们。
const brpc::CallId cid1 = controller1->call_id();const brpc::CallId cid2 = controller2->call_id();
...
stub.method1(controller1, request1, response1, done1);
stub.method2(controller2, request2, response2, done2);
...
brpc::Join(cid1);
brpc::Join(cid2);
在发起RPC前调用Controller.call_id()获得一个id,发起RPC调用后Join那个id。
Join()的行为是等到RPC结束且done->Run()运行后,一些Join的性质如下:
如果对应的RPC已经结束,Join将立刻返回。
多个线程可以Join同一个id,它们都会醒来。
同步RPC也可以在另一个线程中被Join,但一般不会这么做。
Join()在之前的版本叫做JoinResponse(),如果你在编译时被提示deprecated之类的,修改为Join()。
在RPC调用后Join(controller->call_id())是错误的行为,一定要先把call_id保存下来。因为RPC调用后controller可能被随时开始运行的done删除。下面代码的Join方式是错误的。
static void on_rpc_done(Controller* controller, MyResponse*response) {
... Handle response ...deletecontroller;deleteresponse;
}
Controller* controller1 = newController;
Controller* controller2 = newController;
MyResponse* response1 = newMyResponse;
MyResponse* response2 = newMyResponse;
...
stub.method1(controller1,&request1, response1, google::protobuf::NewCallback(on_rpc_done, controller1, response1));
stub.method2(controller2,&request2, response2, google::protobuf::NewCallback(on_rpc_done, controller2, response2));
...
brpc::Join(controller1->call_id()); //错误,controller1可能被on_rpc_done删除了
brpc::Join(controller2->call_id()); //错误,controller2可能被on_rpc_done删除了
6.4 半同步
Join可用来实现“半同步”访问:即等待多个异步访问完成。由于调用处的代码会等到所有RPC都结束后再醒来,所以controller和response都可以放栈上。
brpc::Controller cntl1;
brpc::Controller cntl2;
MyResponse response1;
MyResponse response2;
...
stub1.method1(&cntl1, &request1, &response1, brpc::DoNothing());
stub2.method2(&cntl2, &request2, &response2, brpc::DoNothing());
...
brpc::Join(cntl1.call_id());
brpc::Join(cntl2.call_id());
brpc::DoNothing()可获得一个什么都不干的done,专门用于半同步访问。它的声明周期由框架管理,用户不用关心。
注意在上面的代码中,我们在RPC结束后又访问了controller.call_id(),这是没有问题的,因为DoNothing中并不会像上节中的on_rpc_done中那样删除Controller。
6.5 取消RPC
brpc::StartCancel(call_id)可取消对应的RPC,call_id必须在发起RPC前通过Controller.call_id()获得,其他时刻都可能有race condition。
注意:是brpc::StartCancel(call_id),不是controller->StartCancel(),后者被禁用,没有效果。后者是protobuf默认提供的接口,但是在controller对象的生命周期上有严重的竞争问题。
顾名思义,StartCancel调用完成后RPC并未立刻结束,你不应该碰触Controller的任何字段或删除任何资源,它们自然会在RPC结束时被done中对应逻辑处理。如果你一定要在原地等到RPC结束(一般不需要),则可通过Join(call_id)。
关于StartCancel的一些事实:
call_id在发起RPC前就可以被取消,RPC会直接结束(done仍会被调用)。
call_id可以在另一个线程中被取消。
取消一个已经取消的call_id不会有任何效果。推论:同一个call_id可以被多个线程同时取消,但最多一次有效果。
这里的取消是纯client端的功能,server端未必会取消对应的操作,server cancelation是另一个功能。
6.6 获取Server的地址和端口
remote_side()方法可知道request被送向了哪个server,返回值类型是butil::EndPoint,包含一个ip4地址和端口。在RPC结束前调用这个方法都是没有意义的。
打印方式:
LOG(INFO) << "remote_side=" << cntl->remote_side();
printf("remote_side=%s\n", butil::endpoint2str(cntl->remote_side()).c_str());
6.7 获取Client的地址和端口
r31384后通过local_side()方法可在RPC结束后获得发起RPC的地址和端口。
打印方式:
LOG(INFO) << "local_side=" << cntl->local_side();
printf("local_side=%s\n", butil::endpoint2str(cntl->local_side()).c_str());
6.8 应该重用brpc::Controller吗?
不用刻意地重用,但Controller是个大杂烩,可能会包含一些缓存,Reset()可以避免反复地创建这些缓存。
在大部分场景下,构造Controller(snippet1)和重置Controller(snippet2)的性能差异不大。
//snippet1
for (int i = 0; i < n; ++i) {
brpc::Controller controller;
...
stub.CallSomething(...,&controller);
}//snippet2
brpc::Controller controller;for (int i = 0; i < n; ++i) {
controller.Reset();
...
stub.CallSomething(...,&controller);
}
但如果snippet1中的Controller是new出来的,那么snippet1就会多出“内存分配”的开销,在一些情况下可能会慢一些。
7. 设置
7.1 线程数
7.2 超时
7.3 重试
7.4 熔断
7.5 协议
7.6 连接方式
7.7 关闭连接池中的闲置连接
7.8 延迟关闭连接
7.9 连接的缓冲区大小
7.10 log_id
7.11 附件
7.12 开启SSL
7.13 认证
7.14 重置
7.15 压缩
8. FAQ
Q:brpc能用unix domain socket吗
Q:Fail to connect to xx.xx.xx.xx:xxxx, Connection refused
Q:经常遇到至另一个机房的Connection timedout
Q:为什么同步方式是好的,异步就crash了
Q:怎么确保请求只被处理一次
Q:Invalid address=`bns://group.user-persona.dumi.nj03'
Q:两端都用protobuf,为什么不能互相访问
Q:为什么C++ client/server能够互相通信,和其他语言的client/server通信会报序列化失败的错误
9. Client端基本流程
主要步骤:
创建一个bthread_id作为本次RPC的correlation_id。
根据Channel的创建方式,从进程级的SocketMap中或从LoadBalancer中选择一台下游server作为本次RPC发送的目的地。
根据连接方式(单连接、连接池、短连接),选择一个Socket。
如果开启验证且当前Socket没有被验证过,第一个请求进入验证分支,其余请求会阻塞直到第一个包含认证信息的请求写入Socket。server端只对第一个请求进行验证。
根据Channel的协议,选择对应的序列化函数把request序列化至IOBuf。
如果配置了超时,设置定时器。从这个点开始要避免使用Controller对象,因为在设定定时器后随时可能触发超时 -> 调用到用户的超时回调 -> 用户在回调中析构Controller。
发送准备阶段结束,若上述任何步骤出错,会调用Channel::HandleSendFailed。
将之前序列化好的IOBuf写出到Socket上,同时传入回调Channel::HandleSocketFailed,当连接断开、写失败等错误发生时会调用此回调。
如果是同步发送,Join correlation_id;否则至此CallMethod结束。
网络上发消息+收消息。
收到response后,提取出其中的correlation_id,在O(1)时间内找到对应的Controller。这个过程中不需要查找全局哈希表,有良好的多核扩展性。
根据协议格式反序列化response。
调用Controller::OnRPCReturned,可能会根据错误码判断是否需要重试,或让RPC结束。如果是异步发送,调用用户回调。最后摧毁correlation_id唤醒Join着的线程。