2021SC@SDUSC
上一章我们具体分析了一些关于RpcClient的方法以及具体使用方式等。在完成了对client客户端的创建之后,在正式与服务器进行连接,我们首先要有一个服务器server,今天我们要看的主要部分也是Rpc的服务器部分RpcServer。
同样的,与RpcClient相同,RpcServer在rpc中起到一个什么样的的作用呢?
An RPC server that works asynchronously. This server uses fast serializers and custom optimized communication protocol, improving applicationperformance.
In order to set up a server it’s mandatory to create it using {@link #create(Eventloop)}, indicate a types of messages, and specify an appropriate {@link RpcRequestHandler request handlers} for that types.
一个异步工作的RPC服务器。该服务器使用快速序列化器和自定义优化的通信协议,提高应用程序性能。
为了建立一个服务器,必须使用、create(Eventloop)创建它,指明消息的类型,并为该类型指定适当的 RpcRequestHandler请求处理程序。
启动一个server服务器有以下两种方法:
1.手动方式:设置服务器并且调用RpcServer中的listen()方法
2.为RpcServer创建一个模块,并将其传递给一个Launcher以及ServiceGraphModule。这个我们可以看到在第一次看RPC源码时,RpcExample中就是用了这样的一种方式。
其创建的大体流程为:
1.为请求类和响应类创建一个请求处理程序。
2.创建一个RpcServer
3.启动服务器RpcServer
下面来看一下RpcServer的具体代码
首先,与RpcCilent实现接口不同,RpcServer是直接继承了一个抽象的抽象服务器类AbstractServer,而AbstractServer本质上对于EventloopServer的实现。且AbstractServer定义为一个非阻塞的服务器,工作在eventloop的顶端。其在eventloop线程中工作,所有的事件都在该线程上触发。
public final class RpcServer extends AbstractServer<RpcServer>
每次创建服务器,必须有一个eventloop才可进行创建,这个为其构造方法:
private RpcServer(Eventloop eventloop) {
super(eventloop);
}
create()方法创建并返回一个带有默认socketsetting和默认处理程序handler的server。
public static RpcServer create(Eventloop eventloop) {
return new RpcServer(eventloop)
.withServerSocketSettings(DEFAULT_SERVER_SOCKET_SETTINGS)
.withSocketSettings(DEFAULT_SOCKET_SETTINGS)
.withHandler(RpcControlMessage.class, request -> {
if (request == RpcControlMessage.PING) {
return Promise.of(RpcControlMessage.PONG);
}
return Promise.ofException(new MalformedDataException("Unknown message: " + request));
});
}
或是通过类加载器classloader进行创建
public RpcServer withClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
this.serializerBuilder = SerializerBuilder.create(DefiningClassLoader.create(classLoader));
return this;
}
下面两个方法均为创建能够处理指定消息类型的服务器。
public RpcServer withMessageTypes(Class<?>... messageTypes) {
return withMessageTypes(asList(messageTypes));
}
public RpcServer withMessageTypes(@NotNull List<Class<?>> messageTypes) {
checkArgument(new HashSet<>(messageTypes).size() == messageTypes.size(), "Message types must be unique");
this.messageTypes = messageTypes;
return this;
}
创建一个指定序列化器生成器的server。
public RpcServer withSerializerBuilder(SerializerBuilder serializerBuilder) {
this.serializerBuilder = serializerBuilder;
return this;
}
创建指定流协议的server。
public RpcServer withStreamProtocol(MemSize defaultPacketSize) {
this.initialBufferSize = defaultPacketSize;
return this;
}
public RpcServer withStreamProtocol(MemSize defaultPacketSize, FrameFormat frameFormat) {
this.initialBufferSize = defaultPacketSize;
this.frameFormat = frameFormat;
return this;
}
在完成创建对应的server后,需要为指定的请求-响应对添加对应的处理程序,并生成一个能够处理具体类型请求的服务器实例。
变量说明:
requestClass:表示请求结构的类
handler:一个包含请求逻辑处理以及创建响应的类。
I:表示请求类
O:表示响应类
public <I, O> RpcServer withHandler(Class<I> requestClass, RpcRequestHandler<I, O> handler) {
checkArgument(!handlers.containsKey(requestClass), "Handler for {} has already been added", requestClass);
handlers.put(requestClass, handler);
return this;
}
以上就是RpcServer的一些构造方法,我也自己试着写了一个简单的RpcServer的构造来帮助自己理解对应的方法以及如何使用:
RpcServer.create(eventloop)
.withMessageTypes(String.class)
.withHandler(String.class,
request -> Promise.of("Hello " + request))
.withListenPort(SERVICE_PORT);
其中withListnPort为AbstractServer中定义的方法,用于赋值需要监听的具体地址。
通过以上构造方法可以创建一个具体的server了,下面我们需要对其进行具体的操作。
对在AbstractSever中定义的抽象方法serve进行实现,此步也是在server的具体应用中的关键方法。
protected void serve(AsyncTcpSocket socket, InetAddress remoteAddress) {
RpcStream stream = new RpcStream(socket, serializer, initialBufferSize,
autoFlushInterval, frameFormat, true); // , statsSerializer, statsDeserializer, statsCompressor, statsDecompressor);
RpcServerConnection connection = new RpcServerConnection(this, remoteAddress, handlers, stream);
stream.setListener(connection);
add(connection);
// jmx
ensureConnectStats(remoteAddress).recordEvent();
totalConnects.recordEvent();
}
我们可以看到serve具体都干了什么:
1.创建一个rpcstream流,作为client和server之间的传输流(需要制定套接字、序列化器等等),而后创建一个新的rpcserverconnection用于沟通,并在stream中将该connection设为listener用于后续的具体交流。
onlisten()用于判断是否为指定的额信息类型。
protected void onListen() {
checkState(messageTypes != null, "Message types must be specified");
serializer = serializerBuilder.withSubclasses(RpcMessage.MESSAGE_TYPES, messageTypes).build(RpcMessage.class);
}
onclose()用于关闭server,可强制关闭连接,有以下两种情况:
1.如果此时连接为空,直接输出“RpcServer is closing. Active connections count: 0”,并将待处理事物置空;
2.如果连接不为空,则需要输出具体的连接数量,并且依次将连接关闭。
protected void onClose(SettablePromise<Void> cb) {
if (connections.isEmpty()) {
logger.info("RpcServer is closing. Active connections count: 0.");
cb.set(null);
} else {
logger.info("RpcServer is closing. Active connections count: {}", connections.size());
for (RpcServerConnection connection : new ArrayList<>(connections)) {
connection.shutdown();
}
closeCallback = cb;
}
}
add()方法将一个连接加入到连接池中。
void add(RpcServerConnection connection) {
if (logger.isInfoEnabled())
logger.info("Client connected on {}", connection);
if (monitoring) {
connection.startMonitoring();
}
connections.add(connection);
}
remove方法将一个连接从连接池connections中删除。
boolean remove(RpcServerConnection connection) {
if (!connections.remove(connection)) {
return false;
}
if (logger.isInfoEnabled())
logger.info("Client disconnected on {}", connection);
if (closeCallback != null) {
logger.info("RpcServer is closing. One more connection was closed. " +
"Active connections count: {}", connections.size());
if (connections.isEmpty()) {
closeCallback.set(null);
}
}
return true;
}
以上即为RpcServer中的方法。下面的有关RpcServer生命周期jmx相关的方法不过多介绍。
本次主要分析了RpcServer并了解ServreHandler的具体作用。并且找到了server与client连接中间的重要部分——connection以及steam作为中间连接的关键类。下一次的博客中我们也将以client和server 之间的通信作为重点具体分析。