当前位置: 首页 > 文档资料 > gRPC 学习笔记 >

Channel Builder设计与代码实现 - 类NettyChannelBuilder

优质
小牛编辑
153浏览
2023-12-01

在构建使用netty transport的channel的过程中,用来帮助简化d的builder。

类定义

  1. @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1784")
  2. public class NettyChannelBuilder extends AbstractManagedChannelImplBuilder<NettyChannelBuilder> {}

注: 都1.0.0-pre2了居然还是 @ExperimentalApi……

方法实现

最重要的buildTransportFactory()

  1. @Override
  2. protected ClientTransportFactory buildTransportFactory() {
  3. return new NettyTransportFactory(channelType, negotiationType, protocolNegotiator, sslContext, eventLoopGroup, flowControlWindow, maxMessageSize, maxHeaderListSize);
  4. }

new了一个NettyTransportFactory实例,然后给了一堆参数。

下面看看各个参数的含义和传递/使用方式。

channelType

使用的channel type,默认使用NioSocketChannel。貌似一般也都是用这个了。

  1. private Class<? extends Channel> channelType = NioSocketChannel.class;
  2. public final NettyChannelBuilder channelType(Class<? extends Channel> channelType) {
  3. this.channelType = Preconditions.checkNotNull(channelType);
  4. return this;
  5. }

negotiationType

协商类型,具体看 io.grpc.netty.NegotiationType 这个枚举的说明。

NegotiationType 用于定义启动HTTP/2的协商方式:

  • TLS: 使用 TLS ALPN/NPN 协商,假定是 SSL 连接
  • PLAINTEXT_UPGRADE: 使用 HTTP 升级协议为 plaintext(非SSL)从 HTTP/1.1 升级到 HTTP/2
  • PLAINTEXT: 假设连接是 plaintext(非SSL) 而且远程端点直接支持HTTP2.而不需要升级

从实现代码上看,默认是TLS,然后可以通过方法设置:

  1. private NegotiationType negotiationType = NegotiationType.TLS;
  2. public final NettyChannelBuilder negotiationType(NegotiationType type) {
  3. negotiationType = type;
  4. return this;
  5. }

此外还有一个usePlaintext()方法,指明使用 Plaintext, 然后通过参数 skipNegotiation 来指定是否需要跳过协商过程。这个方法等同于调用negotiationType(PLAINTEXT)或negotiationType(PLAINTEXT_UPGRADE),取决于参数取值:

  1. @Override
  2. public final NettyChannelBuilder usePlaintext(boolean skipNegotiation) {
  3. if (skipNegotiation) {
  4. negotiationType(NegotiationType.PLAINTEXT);
  5. } else {
  6. negotiationType(NegotiationType.PLAINTEXT_UPGRADE);
  7. }
  8. return this;
  9. }

从代码上看,只是一个简单的if判断,这个方法可以认为是 negotiationType()方法的一个可读性稍好的版本,存在的价值只是为了方便和易读。

protocolNegotiator

设置使用的protocolNegotiator,如果不是null,则覆盖negotiationType(NegotiationType) 或者 usePlaintext(boolean):

  1. private ProtocolNegotiator protocolNegotiator;
  2. @Internal
  3. public final NettyChannelBuilder protocolNegotiator(
  4. @Nullable ProtocolNegotiator protocolNegotiator) {
  5. this.protocolNegotiator = protocolNegotiator;
  6. return this;
  7. }

所谓覆盖,是这样实现的,在newClientTransport()方法中,先判断 protocolNegotiator:

  1. public ManagedClientTransport newClientTransport(
  2. SocketAddress serverAddress, String authority) {
  3. ......
  4. ProtocolNegotiator negotiator = protocolNegotiator != null ? protocolNegotiator :
  5. createProtocolNegotiator(authority, negotiationType, sslContext);
  6. return newClientTransport(serverAddress, authority, negotiator);
  7. }

只有 protocolNegotiator 为null时,才会通过使用 authority / negotiationType / sslContext 参数来创建ProtocolNegotiator。如果不为null,则直接使用,无视上述几个参数。

sslContext

可以设置 SL/TLS 上下文来使用,替代系统默认。必须已经用 GrpcSslContexts 配置过,但是选项可以被覆盖。

这个参数只是简单赋值,在上面的newClientTransport()方法中使用,注意同样的,如果protocolNegotiator被设置(不为null),则这个sslContext将被忽略。

eventLoopGroup

可以提供一个 EventGroupLoop 给netty传输使用。

这是一个可选参数。在channel构建时如果用户没有提供 EventGroupLoop ,builder将使用默认,而这个默认是静态的。注意 channel 不会为给定的 EventGroupLoop 负责,在需要时调用者有责任关闭它。

  1. @Nullable
  2. private EventLoopGroup eventLoopGroup;
  3. public final NettyChannelBuilder eventLoopGroup(@Nullable EventLoopGroup eventLoopGroup) {
  4. this.eventLoopGroup = eventLoopGroup;
  5. return this;
  6. }

所谓使用静态的默认,代码实现是这样的:

  1. usingSharedGroup = group == null;
  2. if (usingSharedGroup) {
  3. // The group was unspecified, using the shared group.
  4. this.group = SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
  5. } else {
  6. this.group = group;
  7. }

如果没有设置,则取 SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), 而DEFAULT_WORKER_EVENT_LOOP_GROUP 是这样定义的:

  1. class Utils {
  2. public static final Resource<EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP =
  3. new DefaultEventLoopGroupResource(1, "grpc-default-boss-ELG");
  4. public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP =
  5. new DefaultEventLoopGroupResource(0, "grpc-default-worker-ELG");

构造函数中 worker的 numEventLoops 参数设置为0, 而另一个 boss 的 numEventLoops 参数设置为1。这会影响两个EventLoopGroup(可以理解为netty的线程池)的线程数,具体代码如下:

  1. @Override
  2. public EventLoopGroup create() {
  3. ......
  4. int parallelism = numEventLoops == 0
  5. ? Runtime.getRuntime().availableProcessors() * 2 : numEventLoops;
  6. return new NioEventLoopGroup(parallelism, threadFactory);
  7. }

可以看到默认的 DEFAULT_WORKER_EVENT_LOOP_GROUP 的线程数为当前系统cpu数量的两倍。

注: 这个 SharedResourceHolder 的实现挺有意思,可以拿来在需要时参考一下,甚至直接搬过来用。

flowControlWindow / maxMessageSize / maxHeaderListSize

这三个参数只是简单赋值和传递,然后给出了缺省的默认值:

  1. private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
  2. private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
  3. private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;