当前位置: 首页 > 工具软件 > Apache bRPC > 使用案例 >

brpc访问MySQL_brpc学习笔记之client基础功能

罗绪
2023-12-01

https://github.com/apache/incubator-brpc/blob/master/docs/cn/client.md

1. Client端示例程序

//Licensed to the Apache Software Foundation (ASF) under one//or more contributor license agreements. See the NOTICE file//distributed with this work for additional information//regarding copyright ownership. The ASF licenses this file//to you under the Apache License, Version 2.0 (the//"License"); you may not use this file except in compliance//with the License. You may obtain a copy of the License at//

// http://www.apache.org/licenses/LICENSE-2.0

//

//Unless required by applicable law or agreed to in writing,//software distributed under the License is distributed on an//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY//KIND, either express or implied. See the License for the//specific language governing permissions and limitations//under the License.//A client sending requests to server every 1 second.

#include#include#include#include#include"echo.pb.h"DEFINE_string(attachment,"", "Carry this along with requests");

DEFINE_string(protocol,"baidu_std", "Protocol type. Defined in src/brpc/options.proto");

DEFINE_string(connection_type,"", "Connection type. Available values: single, pooled, short");

DEFINE_string(server,"0.0.0.0:8000", "IP Address of server");

DEFINE_string(load_balancer,"", "The algorithm for load balancing");

DEFINE_int32(timeout_ms,100, "RPC timeout in milliseconds");

DEFINE_int32(max_retry,3, "Max retries(not including the first RPC)");

DEFINE_int32(interval_ms,1000, "Milliseconds between consecutive requests");int main(int argc, char*argv[]) {//Parse gflags. We recommend you to use gflags as well.

GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);//A Channel represents a communication line to a Server. Notice that//Channel is thread-safe and can be shared by all threads in your program.

brpc::Channel channel;//Initialize the channel, NULL means using default options.

brpc::ChannelOptions options;

options.protocol=FLAGS_protocol;

options.connection_type=FLAGS_connection_type;

options.timeout_ms= FLAGS_timeout_ms/*milliseconds*/;

options.max_retry=FLAGS_max_retry;if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {

LOG(ERROR)<< "Fail to initialize channel";return -1;

}//Normally, you should not call a Channel directly, but instead construct//a stub Service wrapping it. stub can be shared by all threads as well.

example::EchoService_Stub stub(&channel);//Send a request and wait for the response every 1 second.

int log_id = 0;while (!brpc::IsAskedToQuit()) {//We will receive response synchronously, safe to put variables//on stack.

example::EchoRequest request;

example::EchoResponse response;

brpc::Controller cntl;

request.set_message("hello world");

cntl.set_log_id(log_id++); //set by user//Set attachment which is wired to network directly instead of//being serialized into protobuf messages.

cntl.request_attachment().append(FLAGS_attachment);//Because `done'(last parameter) is NULL, this function waits until//the response comes back or error occurs(including timedout).

stub.Echo(&cntl, &request, &response, NULL);if (!cntl.Failed()) {

LOG(INFO)<< "Received response from" <

<< cntl.response_attachment() << ")"

<< "latency=" << cntl.latency_us() << "us";

}else{

LOG(WARNING)<

}

usleep(FLAGS_interval_ms* 1000L);

}

LOG(INFO)<< "EchoClient is going to quit";return 0;

}

2. 事实速查

Channel.Init()是线程不安全的。

Channel.CallMethod()是线程安全的,一个Channel可以被所有线程同时使用。

Channel可以分配在栈上。

Channel在发送异步请求后可以析构。

没有brpc::Client这个类。

3. Channel

Client指发起请求的一端,在brpc中没有对应的实体,取而代之的是brpc::Channel,它代表和一台或一组服务器的交互通道,Client和Channel在角色上的差异在实践中并不重要,你可以把Channel视作Client。

吐槽一下:那为什么不使用Client的概念,以减少大家的学习成本呢?

Channel可以被所有线程共用,你不需要为每个线程创建独立的Channel,也不需要用锁互斥。不过Channel的创建和Init并不是线程安全的,请确保在Init成功后再被多线程访问,在没有线程访问后再析构。

一些RPC实现中有ClientManager的概念,包含了Client端的配置信息和资源管理。brpc不需要这些,以往在ClientManager中配置的线程数、长短连接等等要么被加入了brpc::ChannelOptions,要么可以通过gflags全局配置,这样做的好处:

方便。不需要在创建Channel时传入ClientManager,也不需要存储ClientManager。否则不少代码需要一层层地传递ClientManager,很麻烦。gflags使一些全局行为的配置更加简单。

共用资源。比如server和channel可以共用后台线程(bthread的工作线程)。 ???

生命周期。析构ClientManager的过程很容易出错,现在由框架负责,则不会有问题。

就像大部分类那样,Channel必须在Init之后才能使用,options为NULL时所有参数取默认值,如果你要使用非默认值,这么做就行了:

brpc::ChannelOptions options; //包含了默认值

options.xxx =yyy;

...

channel.Init(...,&options);

注意Channel不会修改options,Init结束后不会再访问options。所以options一般就像上面代码中那样放栈上。Channel.options()可以获得channel在使用的所有选项。

Init函数分为连接一台服务器和连接服务集群。

4. 连接一台服务器

//options为NULL时取默认值

int Init(EndPoint server_addr_and_port, const ChannelOptions*options);int Init(const char* server_addr_and_port, const ChannelOptions*options);int Init(const char* server_addr, int port, const ChannelOptions* options);

这类Init连接的服务器往往有固定的ip地址,不需要命名服务和负载均衡,创建起来相对轻量。但是请勿频繁创建使用域名的Channel。这需要查询dns,可能最多耗时10秒(查询DNS的默认超时)。重用它们。

合法的“server_addr_and_port”:

127.0.0.1:80

www.foo.com:8765

localhost:9000

5. 连接服务集群

int Init(const char*naming_service_url,const char*load_balancer_name,const ChannelOptions* options);

这类Channel需要定期从naming_service_url指定的命名服务中获得服务器列表,并通过load_balancer_name指定的负载均衡算法选择出一台机器发送请求。

你不应该在每次请求前动态地创建此类(连接服务集群的)Channel。因为创建和析构此类Channel牵涉到较多的资源,比如在创建时得访问一次命名服务,否则便不知道有哪些服务器可选。由于Channel可被多个线程共用,一般也没有必要动态构建。

当load_balanver_name为NULL或空时,此Init等同于连接单台server的Init,naming_service_url应该是“ip:port”或“域名:port”。你可以通过这个Init函数统一Channel的初始化方式。比如你可以把naming_service_url和load_balancer_name放在配置文件中,要连接单台server时把load_balancer_name置空,要连接服务集群时则设置一个有效的算法名称。

5.1 命名服务

命名服务把一个名字映射为可修改的机器列表,在client端的位置如下:

有了命名服务后client记录的是一个名字,而不是每一台下游机器。而当下游机器变化时,就只需要修改命名服务中的列表,而不需要逐台修改每个上游。这个过程也常被称为“解耦上下游”。当然在具体实现上,上游会记录每一台下游机器,并定期向命名服务请求或被推送最新的列表,以避免在RPC请求时采取访问命名服务。使用命名服务一般不会对性能造成影响,对命名服务的压力也很小。

naming_service_url的一般形式是“protocol://service_name”。

作者给了几种例子,这里仅列举出来,不展开了:

bns://

file://

list://,...

http://

https://

consul://

更多命名服务

用户可以通过实现brpc::NamingService来对接更多命名服务,可以参考 《brpc学习笔记之负载均衡》

命名服务中的tag

每个地址可以附带一个tag,在常见的命名服务中,如果地址后有空格,则空格之后的内容均为tag。相同地址配合不同的tag,被认为是不同的实例,brpc会建立不同的连接。用户可利用这个特性更灵活地控制与单个地址的连接方式。如果你需要“带权重的轮询”,你应当优先考虑使用wrr(weighted round roubin)算法,而不是用tag来模拟。

VIP相关的问题

TODO

命名服务过滤器

当命名服务获得机器列表后,可以自定义一个过滤器进行筛选,最后把结果传递给负载均衡:

过滤器接口如下:

//naming_service_filter.h

classNamingServiceFilter {public://Return true to take this `server' as a candidate to issue RPC//Return false to filter it out

virtual bool Accept(const ServerNode& server) const = 0;

};//naming_service.h

structServerNode {

butil::EndPoint addr;

std::stringtag;

};

常见的业务策略如根据server的tag进行过滤。

自定义的过滤器配置在ChannelOptions中,默认为NULL(不过滤)。

class MyNamingServiceFilter : publicbrpc::NamingServiceFilter {public:bool Accept(const brpc::ServerNode& server) const{return server.tag == "main";

}

};intmain() {

...

MyNamingServiceFilter my_filter;

...

brpc::ChannelOptions options;

options.ns_filter= &my_filter;

...

}

5.2 负载均衡

当下游机器超过一台时,我们需要分割流量,此过程一般称为负载均衡,在client端的位置如下图所示:

理想的算法是每个请求都得到及时的处理,且任意机器crash对全局影响较小。但由于client端无法及时获得server端的延迟或拥塞,而且负载均衡算法不能耗费太多的cpu,一般来说用户得根据具体的场景选择合适的算法,目前rpc提供的算法有(通过load_balancer_name制定):

rr

即round robin,总是选择列表中的下一台服务器,结尾的下一台是开头,无需其他设置。比如有3台机器a,b,c,那么brpc会一次想a,b,c,a,b,c,...发送请求。注意这个算法的前提是服务器的配置、网络条件、负载都是类似的。

wrr

即weighted round robin,根据服务器列表配置的权重值来选择服务器。服务器被选到的机会正比于其权重值,并且该算法能保证同一服务器被选到的结果较均衡的散开。

random

随机从列表中选择一台服务器,无需其他设置。和round robin类似,这个算法的前提也是服务器都是类似的。

la

locality-aware,优先选择延时低的下游,直到其延时高于其他机器,无需其他设置。实现原理请查看《brpc学习笔记之负载均衡》

c_murmurhash or c_md5

一致性哈希,与简单hash的不同之处在于增加和删除机器时不会使分桶结果剧烈变化,特别适合cache类服务。 ???

发起RPC前需要设置Controller.set_request_code(),否则RPC会失败。request_code一般是请求中主键部分的32位哈希值,不需要和负载均衡使用的哈希算法一致。比如用c_murmurhash算法也可以用md5计算哈希值。

src/brpc/policy/hasher.h中包含了常用的hash函数。如果用std::string key代表请求的主键,controller.set_request_code(brpc::policy::MurmurHash32(key.data(), key.size()))就正确设置了request_code。

注意甄别请求中的“主键”部分和“属性”部分,不要为了偷懒或通用,就把请求的所有内容一股脑儿计算出哈希值,属性的变化会使请求的目的地 发生剧烈的变化。另外也要注意padding的问题,比如struct Foo { int32_t a; int64_t b; }在64位机器上a和b之间有4个字节的空隙,内容未定义,如果像hash(&foo, sizeof(foo))这样计算哈希值,结果就是未定义的,得把内容紧密排列或序列化后再算。

从集群宕机后恢复时的客户端限流

集群宕机指的是集群中所有server都处于不可用的状态。由于健康检查机制,当集群恢复正常后,server会间隔性地上线。当某一个server上线后,所有的流量都会发送过去,可能导致服务再次过载。若熔断开启,则可能导致其它server上线前该server再次熔断,集群永远无法恢复。作为解决方案,brpc提供了在集群宕机后恢复时的限流机制:当集群中没有可用server时,集群进入恢复状态,假设正好能服务所有请求的server数量为min_working_instances,当前集群可用的server数量为q,则在恢复状态时,client接受请求的概率为q/min_working_instances,否则丢弃;若一段时间hold_seconds内q保持不变,则把流量重新发送到全部可用的server上,并离开恢复状态。

在恢复阶段时,可以通过判断controller.ErrorCode()是否等于brpc::ERJECT来判断该次请求是否被拒绝,被拒绝的请求不会被框架重试。

此恢复机制要求下游server的能力是类似的,所以目前只针对rr和random有效,开启方式是在load_balancer_name后面加上min_working_instances和hold_seconds参数的值,例如

channel.Init("http://...", "random:min_working_instances=6 hold_seconds=10", &options);

5.3 健康检查

连接断开的server会被暂时隔离而不会被负载均衡算法选中,brpc会定期连接被隔离的server,以检查他们是否恢复正常,间隔由参数-health_check_interval控制。

在默认的配置下,一旦server被连接上,它会恢复为可用状态;brpc还提供了应用层健康检查的机制,框架会发送一个HTTP GET请求到该server,请求路径通过-health_check_path设置(默认为空),只有当server返回200时,它才会恢复。在两种健康检查机制下,都可通过-health_check_timeout_ms设置超时(默认500ms)。如果在隔离过程中,server从命名服务中删除了,brpc也会停止连接尝试。

6. 发起访问

一般来说,我们不直接调用Channel.CallMethod,而是通过protobuf生成的桩XXX_Stub,过程更像是“调用函数”。stub内没什么成员变量,建议在栈上创建和使用,而不必new,当然你也可以把stub存下来复用。Channel::CallMethod和stub访问都是线程安全的,可以被所有线程同时访问。比如:

XXX_Stub stub(&channel);

stub.some_method(controller, request, response, done);

甚至

XXX_Stub(&channel).some_method(controller, request, response, done);

一个例外是http/h2 client。访问http服务和protobuf没什么关系,直接调用CallMethod即可,除了Controller和done均为NULL,详见 《brpc学习笔记之http client》。

6.1 同步访问

指的是:CallMethod会阻塞到收到server端返回response或发生错误(包括超时)。

同步访问中的response/controller不会在CallMethod后被框架使用,它们都可以分配在栈上。注意,如果request/response字段特别多、字节数特别大的话,还是更适合分配在堆上。

MyRequest request;

MyResponse response;

brpc::Controller cntl;

XXX_Stub stub(&channel);

request.set_foo(...);

cntl.set_timeout_ms(...);

stub.some_method(&cntl, &request, &response, NULL);if (cntl->Failed()) {//RPC失败了. response里的值是未定义的,勿用。

} else{//RPC成功了,response里有我们想要的回复数据。

}

6.2 异步访问

指的是:给CallMethod传递一个额外的回调对象done,CallMethod在发出request就结束了,而不是在RPC结束后。当server端返回response或发生错误(包括超时)时,done->Run()会被调用。对RPC的后续处理应该写在done->Run()里,而不是CallMethod后。

由于CallMethod结束并不意味着RPC结束,response/controller仍可能被框架及done->Run()使用,它们一般得创建在堆上,并在done->Run()中删除。如果提前删除了它们,那当done->Run()被调用时,将访问到无效内存。

你可以独立地创建这些对象,并使用NewCallback生成done,也可以把Response和Controller作为done的成员变量,一起new出来,一般使用前一种方法。

发起异步请求后Request和Channel也可以立刻析构。这两样和response/controller是不同的。注意:这是说Channel的析构可以立刻发生在CallMethod之后,并不是说析构可以和CallMethod同时发生,删除正被另一个线程使用的Channel是未定义行为(很可能crash)。

使用NewCallback

static void OnRPCDone(MyResponse* response, brpc::Controller*cntl) {//unique_ptr会帮助我们在return时自动删掉response/cntl,防止忘记。gcc 3.4下的unique_ptr是模拟版本。

std::unique_ptrresponse_guard(response);

std::unique_ptr<:controller>cntl_guard(cntl);if (cntl->Failed()) {//RPC失败了. response里的值是未定义的,勿用。

} else{//RPC成功了,response里有我们想要的数据。开始RPC的后续处理。

}//NewCallback产生的Closure会在Run结束后删除自己,不用我们做。

}

MyResponse* response = newMyResponse;

brpc::Controller* cntl = newbrpc::Controller;

MyService_Stub stub(&channel);

MyRequest request;//你不用new request,即使在异步访问中.

request.set_foo(...);

cntl->set_timeout_ms(...);

stub.some_method(cntl,&request, response, google::protobuf::NewCallback(OnRPCDone, response, cntl));

由于protobuf 3把NewCallback设置为私有,r32035后brpc把NewCallback独立于src/brpc/callback.h(并增加了一些重载)。如果你的程序出现NewCallback相关的编译错误,把google::protobuf::NewCallback替换为brpc::NewCallback就行了。

继承google::protobuf::Closure

使用NewCallback的缺点是要分配三次内存:response,controller,done。如果profiler证明这儿的内存分配有瓶颈,可以考虑自己继承Closure,把response/controller作为成员变量,这样可以把三次new合并为一次。但缺点就是代码不够美观,如果内存分配不是瓶颈,别用这种方法。

class OnRPCDone: publicgoogle::protobuf::Closure {public:voidRun() {//unique_ptr会帮助我们在return时自动delete this,防止忘记。gcc 3.4下的unique_ptr是模拟版本。

std::unique_ptr self_guard(this);if (cntl->Failed()) {//RPC失败了. response里的值是未定义的,勿用。

} else{//RPC成功了,response里有我们想要的数据。开始RPC的后续处理。

}

}

MyResponse response;

brpc::Controller cntl;

}

OnRPCDone* done = newOnRPCDone;

MyService_Stub stub(&channel);

MyRequest request;//你不用new request,即使在异步访问中.

request.set_foo(...);

done->cntl.set_timeout_ms(...);

stub.some_method(&done->cntl, &request, &done->response, done);

如果异步访问中的回调函数特别复杂会有什么影响吗?

没有特别的影响,回调会运行在独立的bthread中,不会阻塞其他的逻辑。你可以在回调中做各种阻塞操作。

rpc发送处的代码和回调函数是在同一个线程里执行吗?

一定不在同一个线程里运行,即使该次rpc调用刚进去就失败了,回调也会在另一个bthread中运行。这可以在加锁进行rpc(不推荐)的代码中避免死锁。???

6.3 等待RPC完成

注意:当你需要发起多个并发操作时,可能ParallelChannel更方便。

如下代码发起两个异步RPC后等待他们。

const brpc::CallId cid1 = controller1->call_id();const brpc::CallId cid2 = controller2->call_id();

...

stub.method1(controller1, request1, response1, done1);

stub.method2(controller2, request2, response2, done2);

...

brpc::Join(cid1);

brpc::Join(cid2);

在发起RPC前调用Controller.call_id()获得一个id,发起RPC调用后Join那个id。

Join()的行为是等到RPC结束且done->Run()运行后,一些Join的性质如下:

如果对应的RPC已经结束,Join将立刻返回。

多个线程可以Join同一个id,它们都会醒来。

同步RPC也可以在另一个线程中被Join,但一般不会这么做。

Join()在之前的版本叫做JoinResponse(),如果你在编译时被提示deprecated之类的,修改为Join()。

在RPC调用后Join(controller->call_id())是错误的行为,一定要先把call_id保存下来。因为RPC调用后controller可能被随时开始运行的done删除。下面代码的Join方式是错误的。

static void on_rpc_done(Controller* controller, MyResponse*response) {

... Handle response ...deletecontroller;deleteresponse;

}

Controller* controller1 = newController;

Controller* controller2 = newController;

MyResponse* response1 = newMyResponse;

MyResponse* response2 = newMyResponse;

...

stub.method1(controller1,&request1, response1, google::protobuf::NewCallback(on_rpc_done, controller1, response1));

stub.method2(controller2,&request2, response2, google::protobuf::NewCallback(on_rpc_done, controller2, response2));

...

brpc::Join(controller1->call_id()); //错误,controller1可能被on_rpc_done删除了

brpc::Join(controller2->call_id()); //错误,controller2可能被on_rpc_done删除了

6.4 半同步

Join可用来实现“半同步”访问:即等待多个异步访问完成。由于调用处的代码会等到所有RPC都结束后再醒来,所以controller和response都可以放栈上。

brpc::Controller cntl1;

brpc::Controller cntl2;

MyResponse response1;

MyResponse response2;

...

stub1.method1(&cntl1, &request1, &response1, brpc::DoNothing());

stub2.method2(&cntl2, &request2, &response2, brpc::DoNothing());

...

brpc::Join(cntl1.call_id());

brpc::Join(cntl2.call_id());

brpc::DoNothing()可获得一个什么都不干的done,专门用于半同步访问。它的声明周期由框架管理,用户不用关心。

注意在上面的代码中,我们在RPC结束后又访问了controller.call_id(),这是没有问题的,因为DoNothing中并不会像上节中的on_rpc_done中那样删除Controller。

6.5 取消RPC

brpc::StartCancel(call_id)可取消对应的RPC,call_id必须在发起RPC前通过Controller.call_id()获得,其他时刻都可能有race condition。

注意:是brpc::StartCancel(call_id),不是controller->StartCancel(),后者被禁用,没有效果。后者是protobuf默认提供的接口,但是在controller对象的生命周期上有严重的竞争问题。

顾名思义,StartCancel调用完成后RPC并未立刻结束,你不应该碰触Controller的任何字段或删除任何资源,它们自然会在RPC结束时被done中对应逻辑处理。如果你一定要在原地等到RPC结束(一般不需要),则可通过Join(call_id)。

关于StartCancel的一些事实:

call_id在发起RPC前就可以被取消,RPC会直接结束(done仍会被调用)。

call_id可以在另一个线程中被取消。

取消一个已经取消的call_id不会有任何效果。推论:同一个call_id可以被多个线程同时取消,但最多一次有效果。

这里的取消是纯client端的功能,server端未必会取消对应的操作,server cancelation是另一个功能。

6.6 获取Server的地址和端口

remote_side()方法可知道request被送向了哪个server,返回值类型是butil::EndPoint,包含一个ip4地址和端口。在RPC结束前调用这个方法都是没有意义的。

打印方式:

LOG(INFO) << "remote_side=" << cntl->remote_side();

printf("remote_side=%s\n", butil::endpoint2str(cntl->remote_side()).c_str());

6.7 获取Client的地址和端口

r31384后通过local_side()方法可在RPC结束后获得发起RPC的地址和端口。

打印方式:

LOG(INFO) << "local_side=" << cntl->local_side();

printf("local_side=%s\n", butil::endpoint2str(cntl->local_side()).c_str());

6.8 应该重用brpc::Controller吗?

不用刻意地重用,但Controller是个大杂烩,可能会包含一些缓存,Reset()可以避免反复地创建这些缓存。

在大部分场景下,构造Controller(snippet1)和重置Controller(snippet2)的性能差异不大。

//snippet1

for (int i = 0; i < n; ++i) {

brpc::Controller controller;

...

stub.CallSomething(...,&controller);

}//snippet2

brpc::Controller controller;for (int i = 0; i < n; ++i) {

controller.Reset();

...

stub.CallSomething(...,&controller);

}

但如果snippet1中的Controller是new出来的,那么snippet1就会多出“内存分配”的开销,在一些情况下可能会慢一些。

7. 设置

7.1 线程数

7.2 超时

7.3 重试

7.4 熔断

7.5 协议

7.6 连接方式

7.7 关闭连接池中的闲置连接

7.8 延迟关闭连接

7.9 连接的缓冲区大小

7.10 log_id

7.11 附件

7.12 开启SSL

7.13 认证

7.14 重置

7.15 压缩

8. FAQ

Q:brpc能用unix domain socket吗

Q:Fail to connect to xx.xx.xx.xx:xxxx, Connection refused

Q:经常遇到至另一个机房的Connection timedout

Q:为什么同步方式是好的,异步就crash了

Q:怎么确保请求只被处理一次

Q:Invalid address=`bns://group.user-persona.dumi.nj03'

Q:两端都用protobuf,为什么不能互相访问

Q:为什么C++ client/server能够互相通信,和其他语言的client/server通信会报序列化失败的错误

9. Client端基本流程

主要步骤:

创建一个bthread_id作为本次RPC的correlation_id。

根据Channel的创建方式,从进程级的SocketMap中或从LoadBalancer中选择一台下游server作为本次RPC发送的目的地。

根据连接方式(单连接、连接池、短连接),选择一个Socket。

如果开启验证且当前Socket没有被验证过,第一个请求进入验证分支,其余请求会阻塞直到第一个包含认证信息的请求写入Socket。server端只对第一个请求进行验证。

根据Channel的协议,选择对应的序列化函数把request序列化至IOBuf。

如果配置了超时,设置定时器。从这个点开始要避免使用Controller对象,因为在设定定时器后随时可能触发超时 -> 调用到用户的超时回调 -> 用户在回调中析构Controller。

发送准备阶段结束,若上述任何步骤出错,会调用Channel::HandleSendFailed。

将之前序列化好的IOBuf写出到Socket上,同时传入回调Channel::HandleSocketFailed,当连接断开、写失败等错误发生时会调用此回调。

如果是同步发送,Join correlation_id;否则至此CallMethod结束。

网络上发消息+收消息。

收到response后,提取出其中的correlation_id,在O(1)时间内找到对应的Controller。这个过程中不需要查找全局哈希表,有良好的多核扩展性。

根据协议格式反序列化response。

调用Controller::OnRPCReturned,可能会根据错误码判断是否需要重试,或让RPC结束。如果是异步发送,调用用户回调。最后摧毁correlation_id唤醒Join着的线程。

 类似资料: