当前位置: 首页 > 工具软件 > Fast RPC > 使用案例 >

ActiveJ学习 RPC(3)——RpcServer

池砚文
2023-12-01

2021SC@SDUSC

前言

上一章我们具体分析了一些关于RpcClient的方法以及具体使用方式等。在完成了对client客户端的创建之后,在正式与服务器进行连接,我们首先要有一个服务器server,今天我们要看的主要部分也是Rpc的服务器部分RpcServer。

RpcServer

同样的,与RpcClient相同,RpcServer在rpc中起到一个什么样的的作用呢?

An RPC server that works asynchronously. This server uses fast serializers and custom optimized communication protocol, improving applicationperformance.
In order to set up a server it’s mandatory to create it using {@link #create(Eventloop)}, indicate a types of messages, and specify an appropriate {@link RpcRequestHandler request handlers} for that types.
一个异步工作的RPC服务器。该服务器使用快速序列化器和自定义优化的通信协议,提高应用程序性能。
为了建立一个服务器,必须使用、create(Eventloop)创建它,指明消息的类型,并为该类型指定适当的 RpcRequestHandler请求处理程序。

启动一个server服务器有以下两种方法:
1.手动方式:设置服务器并且调用RpcServer中的listen()方法
2.为RpcServer创建一个模块,并将其传递给一个Launcher以及ServiceGraphModule。这个我们可以看到在第一次看RPC源码时,RpcExample中就是用了这样的一种方式。

其创建的大体流程为:
1.为请求类和响应类创建一个请求处理程序。
2.创建一个RpcServer
3.启动服务器RpcServer

下面来看一下RpcServer的具体代码
首先,与RpcCilent实现接口不同,RpcServer是直接继承了一个抽象的抽象服务器类AbstractServer,而AbstractServer本质上对于EventloopServer的实现。且AbstractServer定义为一个非阻塞的服务器,工作在eventloop的顶端。其在eventloop线程中工作,所有的事件都在该线程上触发。

public final class RpcServer extends AbstractServer<RpcServer>

每次创建服务器,必须有一个eventloop才可进行创建,这个为其构造方法:

	private RpcServer(Eventloop eventloop) {
		super(eventloop);
	}

create()方法创建并返回一个带有默认socketsetting和默认处理程序handler的server。

	public static RpcServer create(Eventloop eventloop) {
		return new RpcServer(eventloop)
				.withServerSocketSettings(DEFAULT_SERVER_SOCKET_SETTINGS)
				.withSocketSettings(DEFAULT_SOCKET_SETTINGS)
				.withHandler(RpcControlMessage.class, request -> {
					if (request == RpcControlMessage.PING) {
						return Promise.of(RpcControlMessage.PONG);
					}
					return Promise.ofException(new MalformedDataException("Unknown message: " + request));
				});
	}

或是通过类加载器classloader进行创建

	public RpcServer withClassLoader(ClassLoader classLoader) {
		this.classLoader = classLoader;
		this.serializerBuilder = SerializerBuilder.create(DefiningClassLoader.create(classLoader));
		return this;
	}

下面两个方法均为创建能够处理指定消息类型的服务器。

	public RpcServer withMessageTypes(Class<?>... messageTypes) {
		return withMessageTypes(asList(messageTypes));
	}
	public RpcServer withMessageTypes(@NotNull List<Class<?>> messageTypes) {
		checkArgument(new HashSet<>(messageTypes).size() == messageTypes.size(), "Message types must be unique");
		this.messageTypes = messageTypes;
		return this;
	}

创建一个指定序列化器生成器的server。

	public RpcServer withSerializerBuilder(SerializerBuilder serializerBuilder) {
		this.serializerBuilder = serializerBuilder;
		return this;
	}

创建指定流协议的server。

	public RpcServer withStreamProtocol(MemSize defaultPacketSize) {
		this.initialBufferSize = defaultPacketSize;
		return this;
	}

	public RpcServer withStreamProtocol(MemSize defaultPacketSize, FrameFormat frameFormat) {
		this.initialBufferSize = defaultPacketSize;
		this.frameFormat = frameFormat;
		return this;
	}

在完成创建对应的server后,需要为指定的请求-响应对添加对应的处理程序,并生成一个能够处理具体类型请求的服务器实例。
变量说明:
requestClass:表示请求结构的类
handler:一个包含请求逻辑处理以及创建响应的类。
I:表示请求类
O:表示响应类

public <I, O> RpcServer withHandler(Class<I> requestClass, RpcRequestHandler<I, O> handler) {
		checkArgument(!handlers.containsKey(requestClass), "Handler for {} has already been added", requestClass);
		handlers.put(requestClass, handler);
		return this;
	}

以上就是RpcServer的一些构造方法,我也自己试着写了一个简单的RpcServer的构造来帮助自己理解对应的方法以及如何使用:

RpcServer.create(eventloop)
				.withMessageTypes(String.class)
				.withHandler(String.class,
						request -> Promise.of("Hello " + request))
				.withListenPort(SERVICE_PORT);

其中withListnPort为AbstractServer中定义的方法,用于赋值需要监听的具体地址。

通过以上构造方法可以创建一个具体的server了,下面我们需要对其进行具体的操作。
对在AbstractSever中定义的抽象方法serve进行实现,此步也是在server的具体应用中的关键方法。

	protected void serve(AsyncTcpSocket socket, InetAddress remoteAddress) {
		RpcStream stream = new RpcStream(socket, serializer, initialBufferSize,
				autoFlushInterval, frameFormat, true); // , statsSerializer, statsDeserializer, statsCompressor, statsDecompressor);
		RpcServerConnection connection = new RpcServerConnection(this, remoteAddress, handlers, stream);
		stream.setListener(connection);
		add(connection);

		// jmx
		ensureConnectStats(remoteAddress).recordEvent();
		totalConnects.recordEvent();
	}

我们可以看到serve具体都干了什么:
1.创建一个rpcstream流,作为client和server之间的传输流(需要制定套接字、序列化器等等),而后创建一个新的rpcserverconnection用于沟通,并在stream中将该connection设为listener用于后续的具体交流。
onlisten()用于判断是否为指定的额信息类型。

	protected void onListen() {
		checkState(messageTypes != null, "Message types must be specified");
		serializer = serializerBuilder.withSubclasses(RpcMessage.MESSAGE_TYPES, messageTypes).build(RpcMessage.class);
	}

onclose()用于关闭server,可强制关闭连接,有以下两种情况:
1.如果此时连接为空,直接输出“RpcServer is closing. Active connections count: 0”,并将待处理事物置空;
2.如果连接不为空,则需要输出具体的连接数量,并且依次将连接关闭。

	protected void onClose(SettablePromise<Void> cb) {
		if (connections.isEmpty()) {
			logger.info("RpcServer is closing. Active connections count: 0.");
			cb.set(null);
		} else {
			logger.info("RpcServer is closing. Active connections count: {}", connections.size());
			for (RpcServerConnection connection : new ArrayList<>(connections)) {
				connection.shutdown();
			}
			closeCallback = cb;
		}
	}

add()方法将一个连接加入到连接池中。

	void add(RpcServerConnection connection) {
		if (logger.isInfoEnabled())
			logger.info("Client connected on {}", connection);

		if (monitoring) {
			connection.startMonitoring();
		}

		connections.add(connection);
	}

remove方法将一个连接从连接池connections中删除。

	boolean remove(RpcServerConnection connection) {
		if (!connections.remove(connection)) {
			return false;
		}
		if (logger.isInfoEnabled())
			logger.info("Client disconnected on {}", connection);

		if (closeCallback != null) {
			logger.info("RpcServer is closing. One more connection was closed. " +
					"Active connections count: {}", connections.size());

			if (connections.isEmpty()) {
				closeCallback.set(null);
			}
		}
		return true;
	}

以上即为RpcServer中的方法。下面的有关RpcServer生命周期jmx相关的方法不过多介绍。

小结

本次主要分析了RpcServer并了解ServreHandler的具体作用。并且找到了server与client连接中间的重要部分——connection以及steam作为中间连接的关键类。下一次的博客中我们也将以client和server 之间的通信作为重点具体分析。

 类似资料: