当前位置: 首页 > 工具软件 > ActiveJ > 使用案例 >

ActiveJ学习 RPC(7)——具体实现

柳梓
2023-12-01

2021SC@SDUSC

前言

在前面的六章中,我们具体看了RPC实现的关键类:RpcClient、RpcServer、RpcSteam、RpcMessage以及RpcClientConnection、RpcServerConnection。在了解了一切之后,因为对于源码的解读对于理解还有些抽象,所以这一次我们结合着实际例子,即我们之前在掌握大体架构时的RpcExample,将例子中的调用一点点深入进去,来看一看到底是如何实现两端互相连接的。

RpcExample回看

大致架构:RpcExample使用了main()方法,并重载了Launcher中的run()和getModule()方法:

	@Override
	protected Module getModule() {
		return ServiceGraphModule.create();
	}

	@Override
	protected void run() throws ExecutionException, InterruptedException {
		CompletableFuture<Object> future = eventloop.submit(() ->
				client.sendRequest("World", 1000)
		);
		System.out.printf("%nRPC result: %s %n%n", future.get());
	}

	public static void main(String[] args) throws Exception {
		RpcExample example = new RpcExample();
		example.launch(args);
	}

在这里,我们能看到关键是launch,让我们再一次进入launch去解析都干了些什么:

logger.info("=== INJECTING DEPENDENCIES");
			System.out.println(args);
			System.out.println(args.getClass());
			System.out.println(args.length);
			if(args==null) {
				System.out.println("null");
			}
			for(int i=0;i<args.length;i++) {
				System.out.println("in loop");
				System.out.println("args具体内容"+args[i]);
			}
			Injector injector = createInjector(args);
			
			
			injector.getInstance(this.getClass());
			System.out.println(this.getClass());
			if (logger0.isInfoEnabled()) {
				logger0.info("Effective Injector:\n\n{}", makeGraphVizGraph(injector.getBindingsTrie()));
			}

当我们现在再看时,能发现此时正是在设置依赖,保证launcher能调用到对应的类和方法,实现相应的操作。
其对应的输出图如下:

11:04:59.156 [main] INFO RpcExample.0 - Effective Injector:

digraph {
	rankdir=BT;
	"()->java.util.Set<io.activej.common.initializer.Initializer<io.activej.service.ServiceGraphModuleSettings>>" [label="Set<Initializer<ServiceGraphModuleSettings>>"];
	"()->io.activej.inject.InstanceInjector<RpcExample>" [label="InstanceInjector<RpcExample>"];
	"()->io.activej.eventloop.Eventloop" [label="Eventloop"];
	"()->interface io.activej.launcher.annotation.OnRun java.util.concurrent.CompletionStage<java.lang.Void>" [label="@OnRun CompletionStage<Void>"];
	"()->interface io.activej.launcher.annotation.OnStart java.util.concurrent.CompletionStage<java.lang.Void>" [label="@OnStart CompletionStage<Void>"];
	"()->java.util.Set<io.activej.launcher.LauncherService>" [label="Set<LauncherService>"];
	"()->java.util.Set<io.activej.inject.InstanceInjector<?>>" [label="Set<InstanceInjector<?>>"];
	"()->io.activej.inject.Injector" [label="Injector" style=bold];
	"()->@4411d970 io.activej.launcher.LauncherService" [label="@4411d970 LauncherService"];
	"()->RpcExample" [label="RpcExample"];
	"()->io.activej.rpc.client.RpcClient" [label="RpcClient"];
	"()->@3c5a99da io.activej.common.initializer.Initializer<io.activej.service.ServiceGraphModuleSettings>" [label="@3c5a99da Initializer<ServiceGraphModuleSettings>"];
	"()->io.activej.launcher.Launcher" [label="Launcher"];
	"()->interface io.activej.launcher.annotation.Args java.lang.String[]" [label="@Args String[]"];
	"()->io.activej.rpc.server.RpcServer" [label="RpcServer"];
	"()->interface io.activej.launcher.annotation.OnComplete java.util.concurrent.CompletionStage<java.lang.Void>" [label="@OnComplete CompletionStage<Void>"];
	"()->io.activej.service.ServiceGraph" [label="ServiceGraph"];

	{ rank=same; "()->io.activej.eventloop.Eventloop" "()->@3c5a99da io.activej.common.initializer.Initializer<io.activej.service.ServiceGraphModuleSettings>" "()->interface io.activej.launcher.annotation.Args java.lang.String[]" "()->interface io.activej.launcher.annotation.OnRun java.util.concurrent.CompletionStage<java.lang.Void>" "()->interface io.activej.launcher.annotation.OnStart java.util.concurrent.CompletionStage<java.lang.Void>" "()->io.activej.inject.Injector" "()->interface io.activej.launcher.annotation.OnComplete java.util.concurrent.CompletionStage<java.lang.Void>" "()->RpcExample" }

	"()->java.util.Set<io.activej.common.initializer.Initializer<io.activej.service.ServiceGraphModuleSettings>>" -> "()->@3c5a99da io.activej.common.initializer.Initializer<io.activej.service.ServiceGraphModuleSettings>" [];
	"()->io.activej.inject.InstanceInjector<RpcExample>" -> "()->io.activej.rpc.server.RpcServer" [];
	"()->io.activej.inject.InstanceInjector<RpcExample>" -> "()->io.activej.rpc.client.RpcClient" [];
	"()->io.activej.inject.InstanceInjector<RpcExample>" -> "()->io.activej.eventloop.Eventloop" [];
	"()->java.util.Set<io.activej.launcher.LauncherService>" -> "()->@4411d970 io.activej.launcher.LauncherService" [];
	"()->java.util.Set<io.activej.inject.InstanceInjector<?>>" -> "()->io.activej.inject.InstanceInjector<RpcExample>" [];
	"()->@4411d970 io.activej.launcher.LauncherService" -> "()->io.activej.service.ServiceGraph" [];
	"()->@4411d970 io.activej.launcher.LauncherService" -> "()->io.activej.inject.Injector" [];
	"()->@4411d970 io.activej.launcher.LauncherService" -> "()->java.util.Set<io.activej.common.initializer.Initializer<io.activej.service.ServiceGraphModuleSettings>>" [style=dashed,];
	"()->io.activej.rpc.client.RpcClient" -> "()->io.activej.eventloop.Eventloop" [];
	"()->io.activej.launcher.Launcher" -> "()->RpcExample" [];
	"()->io.activej.rpc.server.RpcServer" -> "()->io.activej.eventloop.Eventloop" [];
	"()->io.activej.service.ServiceGraph" -> "()->io.activej.inject.Injector" [];
}

然后是初始化injector部分

			onInit(injector);

			injector.createEagerInstances();
			logger0.info("Created eager singletons");

			Set<LauncherService> services = injector.getInstanceOr(new Key<Set<LauncherService>>() {}, emptySet());
			Set<LauncherService> startedServices = new HashSet<>();
			
			logger0.info("Post-injected instances: {}", postInjectInstances(injector));

在这个地方将rpcClient和rpcServer做好绑定,方便后续调用。并且在这之中,我们能从输出中看到这里已经在getInstanceOr()方法中调用到了RpcExample内的rpcClient和rpcServer方法
下面这一部分是rpcClient()方法的代码

	RpcClient rpcClient(Eventloop eventloop) {
		System.out.println("running init rpcclient");
		StackTraceElement stack[] = (new Throwable()).getStackTrace();
		for (int i = 0; i < stack.length; i++) {
			StackTraceElement ste = stack[i];
			System.out.println(ste.getClassName() + "." + ste.getMethodName() + "(...)");
			System.out.println(i+"--"+ste.getMethodName()+"--"+ste.getFileName() + "--" + ste.getLineNumber());
		}
		return RpcClient.create(eventloop)
				.withMessageTypes(String.class)
				.withStrategy(server(new InetSocketAddress(SERVICE_PORT)));
	}

对应该方法的输出为:

running init rpcclient
RpcExample.rpcClient(...)
0--rpcClient--RpcExample.java--53
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(...)
1--invoke0--NativeMethodAccessorImpl.java---2
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(...)
2--invoke--NativeMethodAccessorImpl.java--62
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(...)
3--invoke--DelegatingMethodAccessorImpl.java--43
java.lang.reflect.Method.invoke(...)
4--invoke--Method.java--567
io.activej.inject.util.ReflectionUtils.lambda$15(...)
5--lambda$15--ReflectionUtils.java--339
io.activej.inject.binding.Binding$2$1.doCreateInstance(...)
6--doCreateInstance--Binding.java--134
io.activej.inject.impl.AbstractRootCompiledBinding.getInstance(...)
7--getInstance--AbstractRootCompiledBinding.java--40
io.activej.inject.util.ReflectionUtils$5.initInstance(...)
8--initInstance--ReflectionUtils.java--273
io.activej.inject.impl.BindingInitializer$11.initInstance(...)
9--initInstance--BindingInitializer.java--99
io.activej.inject.module.DefaultModule$InstanceInjectorImpl.injectInto(...)
10--injectInto--DefaultModule.java--201
io.activej.launcher.Launcher.postInjectInstances(...)
11--postInjectInstances--Launcher.java--233
io.activej.launcher.Launcher.launch(...)
12--launch--Launcher.java--164
RpcExample.main(...)
13--main--RpcExample.java--86

此时rpcClient初始化完成,接下来要继续通过之前初始化过的eventloop初始化rpcServer:

	RpcServer rpcServer(Eventloop eventloop) {
		System.out.println("running init rpcserver");
		return RpcServer.create(eventloop)
				.withMessageTypes(String.class)
				.withHandler(String.class,
						request -> Promise.of("Hello " + request))
				.withListenPort(SERVICE_PORT);
	}

此时已经设置好了Handler,即对应处理String class的请求,并且向客户端准备发送“HELLO”请求,同时可以看到server和client之间通过同一个port连接,进而实现通信。
下面就是我们之前有看过的三步:
STARTING APPLICATION
RUNNING APPLICATION
STOPPING APPLICATION

			logger.info("=== STARTING APPLICATION");
			try {
				instantOfStart = Instant.now();
				logger0.info("Starting Root Services: {}", services);
				startServices(services, startedServices);
				onStart();
				onStartFuture.complete(null);
			} catch (Exception e) {
				applicationError = e;
				logger.error("Start error", e);
				onStartFuture.completeExceptionally(e);
			}
			if (applicationError == null) {
				logger.info("=== RUNNING APPLICATION");
				try {
					instantOfRun = Instant.now();
					run();
					onRunFuture.complete(null);
				} catch (Exception e) {
					applicationError = e;
					logger.error("Error", e);
					onRunFuture.completeExceptionally(e);
				}
			} else {
				onRunFuture.completeExceptionally(applicationError);
			}
			logger.info("=== STOPPING APPLICATION");
			instantOfStop = Instant.now();
			if (!onStartFuture.isCompletedExceptionally()) {
				try {
					onStop();
				} catch (Exception e) {
					logger.error("Stop error", e);
				}
			}

			stopServices(startedServices);

当我们再次回看并逐层迭代查看调用的方法和具体实现的时候,我们能发现其实真正在起作用的是三个方法:onStart(),run(),onStop()方法。
onStart()使用的是rpc的benchmark作为默认值进行初始化,由于篇幅较长我们放在下一章进行讲解。
run()方法我们之前提过,他会运行RpcExample内重写的run()方法进行具体的调用:

	protected void run() throws ExecutionException, InterruptedException {
		CompletableFuture<Object> future = eventloop.submit(() ->
				client.sendRequest("World", 1000)
		);
		System.out.printf("%nRPC result: %s %n%n", future.get());
	}

通过调用eventloop的submit()方法,返回一个异步多线程对象CompletableFuture的对象,该对象的get()方法有两种情况:
1.当运行完毕时,可以直接get()
2.当为运行试,get()会阻塞直到运行完毕。
这样我们就能实现client和server之间的通信,并且在这里打印出具体的结果“Hello World”。完成本次例子的所有工作。

小结

以上就是本次对于RpcExample的回看和理解,关于onStart()方法所调用的部分会留到下一章进行对于相关的默认参数的分析和解析。
但这一次,当终于将前面的分析融入进这一次的具体实例的实现中时,还是感觉很奇妙,其中间通过了各种调用后终于完成传递的过程完整地解析了在我眼前,帮助我对其有了更深刻的理解和感悟。

 类似资料: