2021SC@SDUSC
在上一章我们重新阅读了RpcExample类的例子描述,用最实际的方法了解了具体都干些什么,是怎么完成的。但在途中对于其中关键启动onStart()方法的使用上,我发现了一些一直被我漏掉没看到的RpcBenchmark类,作为RPC的基准,rpcclient和server的初始化离不开这几个基准类,下面我们就一起来看一下benchmark都做了些什么,他是干什么的。
首先来看一个返回RpcServer 的方法:
public RpcServer rpcServer(@Named("server") Eventloop eventloop, Config config) {
return RpcServer.create(eventloop)
.withStreamProtocol(
config.get(ofMemSize(), "rpc.defaultPacketSize", MemSize.kilobytes(256)),
config.get(ofFrameFormat(), "rpc.compression", null))
.withListenPort(config.get(ofInteger(), "rpc.server.port"))
.withMessageTypes(Integer.class)
.withHandler(Integer.class, req -> Promise.of(req * 2));
}
在这里我们可以看到,通过传入的参数config配置了流协议,并且通过config指定监听端口,指定消息类型为int类型并创建相关的处理程序handler。
config()配置设置了端口号以供使用:
Config config() {
return Config.create()
.with("rpc.server.port", "" + SERVICE_PORT)
.overrideWith(Config.ofSystemProperties("config"));
}
重写一个getModule()方法,同样而后config接通执行相应的配置:
protected Module getModule() {
return combine(
ServiceGraphModule.create(),
ConfigModule.create()
.withEffectiveConfigLogger());
}
然后是和其他创建服务器的过程相似的run()方法和main()方法:
protected void run() throws Exception {
awaitShutdown();
}
public static void main(String[] args) throws Exception {
RpcBenchmarkServer benchmark = new RpcBenchmarkServer();
benchmark.launch(args);
}
服务器执行awaitShutdown()方法,等待客户端发送信息并阻塞。
首先为返回一个rpcClient的创建方法:
public RpcClient rpcClient(@Named("client") Eventloop eventloop, Config config) {
return RpcClient.create(eventloop)
.withStreamProtocol(
config.get(ofMemSize(), "rpc.defaultPacketSize", MemSize.kilobytes(256)),
config.get(ofFrameFormat(), "rpc.frameFormat", null))
.withMessageTypes(Integer.class)
.withStrategy(server(new InetSocketAddress(config.get(ofInteger(), "rpc.server.port"))));
}
使用传入的config参数,为rpcClient初始化相应的内存大小以及帧格式。指定messtype为int类型的对象。
Config()配置方法:
Config config() {
return Config.create()
.with("rpc.server.port", "" + SERVICE_PORT)
.overrideWith(Config.ofSystemProperties("config"));
}
重写launcher中的getModule()方法:
protected Module getModule() {
return combine(
ServiceGraphModule.create(),
ConfigModule.create()
.withEffectiveConfigLogger());
}
下面是我们分析基准类的源头所在:
private int warmupRounds;
private int benchmarkRounds;
private int totalRequests;
private int activeRequestsMin;
private int activeRequestsMax;
@Override
protected void onStart() {
warmupRounds = config.get(ofInteger(), "benchmark.warmupRounds", WARMUP_ROUNDS);
benchmarkRounds = config.get(ofInteger(), "benchmark.benchmarkRounds", BENCHMARK_ROUNDS);
totalRequests = config.get(ofInteger(), "benchmark.totalRequests", TOTAL_REQUESTS);
activeRequestsMin = config.get(ofInteger(), "benchmark.activeRequestsMin", ACTIVE_REQUESTS_MIN);
activeRequestsMax = config.get(ofInteger(), "benchmark.activeRequestsMax", ACTIVE_REQUESTS_MAX);
}
我们可以看到,其实这一步就是在为配置client内置的一些默认变量,将其设置为输入的int类型,指定轮次、请求等等。在这个类中,这是为了设下基准检验程序,在其他类调用该函数时,则是在没有输入参数的情况下,以默认情况初始化client。
下面是重点的重写run()方法:
protected void run() throws Exception {
long time = 0;
long bestTime = -1;
long worstTime = -1;
System.out.println("Warming up ...");
for (int i = 0; i < warmupRounds; i++) {
long roundTime = round();
long rps = totalRequests * 1000L / roundTime;
System.out.printf("Round: %d; Round time: %dms; RPS : %d%n", i + 1, roundTime, rps);
}
System.out.println("Start benchmarking RPC");
for (int i = 0; i < benchmarkRounds; i++) {
long roundTime = round();
time += roundTime;
if (bestTime == -1 || roundTime < bestTime) {
bestTime = roundTime;
}
if (worstTime == -1 || roundTime > worstTime) {
worstTime = roundTime;
}
long rps = totalRequests * 1000L / roundTime;
System.out.printf("Round: %d; Round time: %dms; RPS : %d%n", i + 1, roundTime, rps);
}
double avgTime = (double) time / benchmarkRounds;
long requestsPerSecond = (long) (totalRequests / avgTime * 1000);
System.out.printf("Time: %dms; Average time: %sms; Best time: %dms; Worst time: %dms; Requests per second: %d%n",
time, avgTime, bestTime, worstTime, requestsPerSecond);
}
我们来重点理解一下run()方法在干什么:warmupRound和BenchmarkRound都在上面onStart()中定义完了,设置需要的热身和基准测试的轮数,每一轮都调用roundCall()方法完成一个请求并且得到其时间并记录。最后得到RPC的速度,这是一个基准测试类。
round()调用了roundCall()方法,并返回了对应的进程号(可以那么理解)。
private long round() throws Exception {
return eventloop.submit(this::roundCall).get();
}
roundCall()方法可以说是最关键的方法,首先他讲callback根据int进行赋值,而后根据待发送的请求个数或是请求上限发送请求,sendRequest()出去,从而使其能被服务器听到。
private Promise<Long> roundCall() {
SettablePromise<Long> promise = new SettablePromise<>();
long start = System.currentTimeMillis();
sent = 0;
completed = 0;
Callback<Integer> callback = new Callback<Integer>() {
@Override
public void accept(Integer result, @Nullable Exception e) {
completed++;
int active = sent - completed;
// Stop round
if (completed == totalRequests) {
promise.set(null);
return;
}
if (active <= activeRequestsMin) {
for (int i = 0; i < min(activeRequestsMax - active, totalRequests - sent); i++, sent++) {
rpcClient.sendRequest(sent, this);
}
}
}
};
for (int i = 0; i < min(activeRequestsMax, totalRequests); i++) {
rpcClient.sendRequest(sent, callback);
sent++;
}
return promise.map($ -> System.currentTimeMillis() - start);
}
在完成了源代码理解后,我们知道这实际上是在做RPC速度的基准测试,我们一起来看一下测试结果。
首先打开BenchmarkServer:
rpc.compression =
rpc.defaultPacketSize = 256Kb
rpc.server.port = 25565
19:24:09.246 [main] INFO io.activej.rpc.RpcBenchmarkServer - === RUNNING APPLICATION
注意到这里打印出了config所配置的参数
然后打开BenchmarkClient:
rpc.defaultPacketSize = 256Kb
rpc.frameFormat =
rpc.server.port = 25565
同样的,出现了使用config配置的结果,并且给出了测试的整体过程结果:
19:29:04.595 [main] INFO io.activej.rpc.RpcBenchmarkClient - === RUNNING APPLICATION
Warming up …
Round: 1; Round time: 1344ms; RPS : 7440476
Round: 2; Round time: 1040ms; RPS : 9615384
Round: 3; Round time: 795ms; RPS : 12578616
Start benchmarking RPC
Round: 1; Round time: 790ms; RPS : 12658227
Round: 2; Round time: 802ms; RPS : 12468827
Round: 3; Round time: 795ms; RPS : 12578616
Round: 4; Round time: 795ms; RPS : 12578616
Round: 5; Round time: 779ms; RPS : 12836970
Round: 6; Round time: 816ms; RPS : 12254901
Round: 7; Round time: 773ms; RPS : 12936610
Round: 8; Round time: 797ms; RPS : 12547051
Round: 9; Round time: 781ms; RPS : 12804097
Round: 10; Round time: 792ms; RPS : 12626262
Time: 7920ms; Average time: 792.0ms; Best time: 773ms; Worst time: 816ms; Requests per second: 12626262
通过一步步分析源代码,此类为基准测试类,用于测试RPC的具体运行的速度是如何的。我们能从测试案例中看到,RPC的平均一次收发为792ms,最佳时间为773ms,最差时间为816ms,能感受到使用RPC通信协议的快速,并且相对是稳定的,波动不超过5%。自此,基础的RPC代码分析基本就到这里结束了,后续的两章会结合着实例具体介绍一下RPC还能做些什么,RPC在使用了快速序列化器之后,能做的事和能提升的效率到底是什么样的。