当前位置: 首页 > 工具软件 > ActiveJ > 使用案例 >

ActiveJ学习 RPC(8)——Benchmark

麹高远
2023-12-01

2021SC@SDUSC

前言

在上一章我们重新阅读了RpcExample类的例子描述,用最实际的方法了解了具体都干些什么,是怎么完成的。但在途中对于其中关键启动onStart()方法的使用上,我发现了一些一直被我漏掉没看到的RpcBenchmark类,作为RPC的基准,rpcclient和server的初始化离不开这几个基准类,下面我们就一起来看一下benchmark都做了些什么,他是干什么的。

RpcBenchmarkServer

首先来看一个返回RpcServer 的方法:

	public RpcServer rpcServer(@Named("server") Eventloop eventloop, Config config) {
		return RpcServer.create(eventloop)
				.withStreamProtocol(
						config.get(ofMemSize(), "rpc.defaultPacketSize", MemSize.kilobytes(256)),
						config.get(ofFrameFormat(), "rpc.compression", null))
				.withListenPort(config.get(ofInteger(), "rpc.server.port"))
				.withMessageTypes(Integer.class)
				.withHandler(Integer.class, req -> Promise.of(req * 2));

	}

在这里我们可以看到,通过传入的参数config配置了流协议,并且通过config指定监听端口,指定消息类型为int类型并创建相关的处理程序handler。
config()配置设置了端口号以供使用:

	Config config() {
		return Config.create()
				.with("rpc.server.port", "" + SERVICE_PORT)
				.overrideWith(Config.ofSystemProperties("config"));
	}

重写一个getModule()方法,同样而后config接通执行相应的配置:

	protected Module getModule() {
		return combine(
				ServiceGraphModule.create(),
				ConfigModule.create()
						.withEffectiveConfigLogger());
	}

然后是和其他创建服务器的过程相似的run()方法和main()方法:

	protected void run() throws Exception {
		awaitShutdown();
	}

	public static void main(String[] args) throws Exception {
		RpcBenchmarkServer benchmark = new RpcBenchmarkServer();
		benchmark.launch(args);
	}

服务器执行awaitShutdown()方法,等待客户端发送信息并阻塞。

RpcBenchmarkClient

首先为返回一个rpcClient的创建方法:

	public RpcClient rpcClient(@Named("client") Eventloop eventloop, Config config) {
		return RpcClient.create(eventloop)
				.withStreamProtocol(
						config.get(ofMemSize(), "rpc.defaultPacketSize", MemSize.kilobytes(256)),
						config.get(ofFrameFormat(), "rpc.frameFormat", null))
				.withMessageTypes(Integer.class)
				.withStrategy(server(new InetSocketAddress(config.get(ofInteger(), "rpc.server.port"))));
	}

使用传入的config参数,为rpcClient初始化相应的内存大小以及帧格式。指定messtype为int类型的对象。
Config()配置方法:

	Config config() {
		return Config.create()
				.with("rpc.server.port", "" + SERVICE_PORT)
				.overrideWith(Config.ofSystemProperties("config"));
	}

重写launcher中的getModule()方法:

	protected Module getModule() {
		return combine(
				ServiceGraphModule.create(),
				ConfigModule.create()
						.withEffectiveConfigLogger());
	}

下面是我们分析基准类的源头所在:

	private int warmupRounds;
	private int benchmarkRounds;
	private int totalRequests;
	private int activeRequestsMin;
	private int activeRequestsMax;

	@Override
	protected void onStart() {
		warmupRounds = config.get(ofInteger(), "benchmark.warmupRounds", WARMUP_ROUNDS);
		benchmarkRounds = config.get(ofInteger(), "benchmark.benchmarkRounds", BENCHMARK_ROUNDS);
		totalRequests = config.get(ofInteger(), "benchmark.totalRequests", TOTAL_REQUESTS);
		activeRequestsMin = config.get(ofInteger(), "benchmark.activeRequestsMin", ACTIVE_REQUESTS_MIN);
		activeRequestsMax = config.get(ofInteger(), "benchmark.activeRequestsMax", ACTIVE_REQUESTS_MAX);
	}

我们可以看到,其实这一步就是在为配置client内置的一些默认变量,将其设置为输入的int类型,指定轮次、请求等等。在这个类中,这是为了设下基准检验程序,在其他类调用该函数时,则是在没有输入参数的情况下,以默认情况初始化client。
下面是重点的重写run()方法:

	protected void run() throws Exception {
		long time = 0;
		long bestTime = -1;
		long worstTime = -1;

		System.out.println("Warming up ...");
		for (int i = 0; i < warmupRounds; i++) {
			long roundTime = round();
			long rps = totalRequests * 1000L / roundTime;
			System.out.printf("Round: %d; Round time: %dms; RPS : %d%n", i + 1, roundTime, rps);
		}

		System.out.println("Start benchmarking RPC");

		for (int i = 0; i < benchmarkRounds; i++) {
			long roundTime = round();

			time += roundTime;

			if (bestTime == -1 || roundTime < bestTime) {
				bestTime = roundTime;
			}

			if (worstTime == -1 || roundTime > worstTime) {
				worstTime = roundTime;
			}

			long rps = totalRequests * 1000L / roundTime;
			System.out.printf("Round: %d; Round time: %dms; RPS : %d%n", i + 1, roundTime, rps);
		}
		double avgTime = (double) time / benchmarkRounds;
		long requestsPerSecond = (long) (totalRequests / avgTime * 1000);
		System.out.printf("Time: %dms; Average time: %sms; Best time: %dms; Worst time: %dms; Requests per second: %d%n",
				time, avgTime, bestTime, worstTime, requestsPerSecond);
	}

我们来重点理解一下run()方法在干什么:warmupRound和BenchmarkRound都在上面onStart()中定义完了,设置需要的热身和基准测试的轮数,每一轮都调用roundCall()方法完成一个请求并且得到其时间并记录。最后得到RPC的速度,这是一个基准测试类。
round()调用了roundCall()方法,并返回了对应的进程号(可以那么理解)。

	private long round() throws Exception {
		return eventloop.submit(this::roundCall).get();
	}

roundCall()方法可以说是最关键的方法,首先他讲callback根据int进行赋值,而后根据待发送的请求个数或是请求上限发送请求,sendRequest()出去,从而使其能被服务器听到。

	private Promise<Long> roundCall() {
		SettablePromise<Long> promise = new SettablePromise<>();

		long start = System.currentTimeMillis();

		sent = 0;
		completed = 0;

		Callback<Integer> callback = new Callback<Integer>() {
			@Override
			public void accept(Integer result, @Nullable Exception e) {
				completed++;

				int active = sent - completed;

				// Stop round
				if (completed == totalRequests) {
					promise.set(null);
					return;
				}

				if (active <= activeRequestsMin) {
					for (int i = 0; i < min(activeRequestsMax - active, totalRequests - sent); i++, sent++) {
						rpcClient.sendRequest(sent, this);
					}
				}
			}
		};

		for (int i = 0; i < min(activeRequestsMax, totalRequests); i++) {
			rpcClient.sendRequest(sent, callback);
			sent++;
		}

		return promise.map($ -> System.currentTimeMillis() - start);
	}

Benchmark Testing

在完成了源代码理解后,我们知道这实际上是在做RPC速度的基准测试,我们一起来看一下测试结果。
首先打开BenchmarkServer:

rpc.compression =
rpc.defaultPacketSize = 256Kb
rpc.server.port = 25565
19:24:09.246 [main] INFO io.activej.rpc.RpcBenchmarkServer - === RUNNING APPLICATION

注意到这里打印出了config所配置的参数

然后打开BenchmarkClient:

rpc.defaultPacketSize = 256Kb
rpc.frameFormat =
rpc.server.port = 25565

同样的,出现了使用config配置的结果,并且给出了测试的整体过程结果:

19:29:04.595 [main] INFO io.activej.rpc.RpcBenchmarkClient - === RUNNING APPLICATION
Warming up …
Round: 1; Round time: 1344ms; RPS : 7440476
Round: 2; Round time: 1040ms; RPS : 9615384
Round: 3; Round time: 795ms; RPS : 12578616
Start benchmarking RPC
Round: 1; Round time: 790ms; RPS : 12658227
Round: 2; Round time: 802ms; RPS : 12468827
Round: 3; Round time: 795ms; RPS : 12578616
Round: 4; Round time: 795ms; RPS : 12578616
Round: 5; Round time: 779ms; RPS : 12836970
Round: 6; Round time: 816ms; RPS : 12254901
Round: 7; Round time: 773ms; RPS : 12936610
Round: 8; Round time: 797ms; RPS : 12547051
Round: 9; Round time: 781ms; RPS : 12804097
Round: 10; Round time: 792ms; RPS : 12626262
Time: 7920ms; Average time: 792.0ms; Best time: 773ms; Worst time: 816ms; Requests per second: 12626262

小结

通过一步步分析源代码,此类为基准测试类,用于测试RPC的具体运行的速度是如何的。我们能从测试案例中看到,RPC的平均一次收发为792ms,最佳时间为773ms,最差时间为816ms,能感受到使用RPC通信协议的快速,并且相对是稳定的,波动不超过5%。自此,基础的RPC代码分析基本就到这里结束了,后续的两章会结合着实例具体介绍一下RPC还能做些什么,RPC在使用了快速序列化器之后,能做的事和能提升的效率到底是什么样的。

 类似资料: