当前位置: 首页 > 工具软件 > go-netty > 使用案例 >

Reactive响应式编程系列:解密reactor-netty如何实现响应式

竺鸿骞
2023-12-01

        我们都说 Netty 是一款基于异步事件驱动来设计和实现的高性能IO框架,它之所以高性能,重要的原因之一是其线程模型的设计,Netty 的线程模型是基于 Reactor 设计模式的,它主要包含两个线程池:一个是 Boss 线程池,另一个是 Worker 线程池。Boss 线程池主要负责接受客户端连接请求,并将连接请求注册到 Worker 线程池中的某个线程的 Selector 上。Boss 线程池通常只有一个线程(如果建链也成为瓶颈,那么Boss线程池也可以有多个)。Worker 线程池主要负责处理客户端连接请求,并进行网络 I/O 操作。Worker 线程池的大小通常是根据 CPU 核数和业务需求来调整的。

        Reactor 设计模式最早由 Doug Schmidt 和 Steve Vinoski 在 1995 年的一篇论文《Reactor: An Object Behavioral Pattern for Concurrent Handling of Multiple Event-based Requests》中提出,这是一篇关于并发编程模式的经典论文。这篇论文对基于 Reactor 设计模式的网络编程框架进行了探讨。Reactor 设计模式是一种用于处理事件驱动 I/O 操作的设计模式,它主要包含以下两个核心组件:

  1. Reactor:负责监听事件和分发事件,它将事件分发给对应的 Handler 处理器。
  2. Handler:负责处理具体的事件,例如读取数据、解析数据、处理业务逻辑、发送响应数据等等。

Reactor 设计模式的基本思想是:当有事件发生时,Reactor 将事件分发给对应的 Handler 处理器,由 Handler 处理器来具体处理事件。在处理器处理事件的过程中,如果继续需要进行 I/O 操作,它会将 I/O 操作交给 Reactor 处理,由 Reactor 处理器负责监听 I/O 事件并分发给对应的 Handler 处理器。它可以提供高效的事件处理和 I/O 操作,避免了使用传统的同步阻塞式 I/O 模型的性能瓶颈。

        Reactor 设计模式 和 Reactive 响应式编程有一定相似之处,两者的核心思想都是事件驱动,通过异步处理来解决高并发下阻塞操作带来的资源消耗和性能下降问题,而 reacotor-netty (GitHub - reactor/reactor-netty: TCP/HTTP/UDP/QUIC client/server with Reactor over Netty)的目标就是将 Netty 框架和响应式组件库 project-reactor (GitHub - reactor/reactor-core: Non-Blocking Reactive Foundation for the JVM)打通.

        为了简单起见,我们用HTTP请求和应答处理过程来窥探reactor-netty的实现细节,在 HTTP 协议中,请求和响应消息分为两个部分:消息头和消息体。消息头包含了请求或响应的元数据信息,如请求方法、响应状态码、内容类型、内容长度等。而消息体则包含了请求或响应的实际内容。而当一个 HTTP 请求或响应结束时,需要发送一个空的消息体(EmptyContent)表示请求或响应结束了。因为 HTTP 协议是基于流的,在传输过程中,HTTP 消息是分多次发送的,每次发送都是一部分数据。当发送完最后一部分数据后,需要告诉接收方,请求或响应已经结束了。这个信号就是一个空的消息体。

        注意,为了展示方便和理解更容易,文中展示的源代码都经过了精简,请以GitHub上最新代码为准!!!

        有了这些基础,我们看一个 reactor-netty 的官方例子,官方给了一个HTTP的client和server的例子,我们这里只分析client端的例子:

String reqStr = "Go to Zibo for barbecue";
System.out.println(Thread.currentThread().getName() + " 开始请求 " + reqStr);
HttpClient httpClient = HttpClient.create().port(8888);
httpClient.post()               // Specifies that POST method will be used
          .uri("/test/world")   // Specifies the path
          .send(ByteBufFlux.fromString(Flux.just(reqStr)))  // Sends the request body
          .responseContent()    // Receives the response body
          .aggregate()
          .asString()
          .subscribe(res -> 
                   System.out.println(Thread.currentThread().getName() + " 收到应答 " + res));

从这个client端例子来看,初步看起来和Reactive有关的有两个地方,第一个是send方法入参貌似是一个Flux,另一个是订阅方法subscribe,其中我们用Lambda表达式直接把HTTP的应答结果打印出来,需要注意的是,我们特意也打印了线程的名称,这是为了突出展示Reactive的异步事件驱动特性,即发起请求的线程和接收应答的线程不是同一个线程。这里我们先剖析下client端的实现原理。client端有两个阶段会有IO阻塞,一个是请求发送,一个等待请求应答,请求发送很明显在上述的 send 方法中来做,请求的应答目前看起来像是在responseContent 方法中来实现。

        我们先来详细看看 send 方法,按照 Reactive 的要求,调用 send 方法不会导致请求立马发送,请求的发送时机一定是 subscribe 方法调用时!!!而为了让请求发送的异步性做的更彻底,我们需要把请求的构造和包装也做成异步,所以你会看到调用了 ByteBufFlux.fromString 方法,该方法返回值是一个 Flux 类型(即 ByteBufFlux)来作为一个请求数据缓冲区的发布者对象。我们看下 send 方法的内部实现:

final class HttpClientFinalizer extends HttpClientConnect implements HttpClient.RequestSender {

	@Override
	public HttpClientFinalizer send(Publisher<? extends ByteBuf> requestBody) {
		Objects.requireNonNull(requestBody, "requestBody");
        // 使用Lambda构造匿名内部类来封装请求发送
		return send((req, out) -> out.send(requestBody));
	}

	@Override
	public HttpClientFinalizer send(
			BiFunction<? super HttpClientRequest, ? super NettyOutbound, ? extends Publisher<Void>> sender) {
		Objects.requireNonNull(sender, "requestBody");
		HttpClient dup = duplicate();
        // 将请求发送方法保存起来
		dup.configuration().body = sender;
		return (HttpClientFinalizer) dup;
	}
}

public final class HttpClientConfig extends ClientTransportConfig<HttpClientConfig> {
	BiFunction<? super HttpClientRequest, ? super NettyOutbound, ? extends Publisher<Void>> body;
}

可以看出来最终我们用 dup.configuration().body 保存了请求发送的Lambda对象(即匿名内部类,为了阐述方便,后面直接说是Lambda对象),而dup是一个 HttpClient 类型的对象,即我们给出的 client 官方例子中的流程发起类。send 方法就这么简单?直接复制一个 HttpClient 对象并将其 HttpClientConfig 的 body 属性赋值请求发送的匿名内部类?别忘记 Reactive 的核心之一就是异步事件驱动,在这个case中建立TCP连接,发送请求、等待应答这些会阻塞的操作,都由Netty框架来驱动,reactor-netty 负责将其和 reactor (即project-reactor)组件打通。那么问题来了,send 方法入参的Lamdba对象只进行了赋值,那 “out.send(requestBody)” 什么时候被调用呢?如果是当前线程来执行,那么就失去异步的特性了,所以肯定还是 Netty 来事件驱动,这里就不得不提 ChannelOperationsHandler 这个类,由于它继承自 Netty 的 ChannelInboundHandlerAdapter,所以其实现的 channelActive 方法最终会触发 HttpClientConnect.HttpIOHandlerObserver.onStateChange 方法的调用,这意味着一有连接建立成功,就会触发HTTP请求的发送,该方法最终会触发HttpClientHandler.requestWithBody 方法的调用,而这个 HttpClientHandler 对象中就包含了这个Lambda表达式对象,所以 reactor-netty 就通过 ChannelOperationsHandler 和 Netty 的事件循环打通了。

        接着我们看看 send 方法后面调用的 responseContent 方法,我们看看其实现:

final class HttpClientFinalizer extends HttpClientConnect implements HttpClient.RequestSender {
	final HttpClientConfig config;
	static final Function<ChannelOperations<?, ?>, Publisher<ByteBuf>> contentReceiver = ChannelOperations::receive;
	
    @Override
	public ByteBufFlux responseContent() {
		ByteBufAllocator alloc = (ByteBufAllocator) config.options()
		                                                           .get(ChannelOption.ALLOCATOR);
		if (alloc == null) {
			alloc = ByteBufAllocator.DEFAULT;
		}

		@SuppressWarnings("unchecked")
		Mono<ChannelOperations<?, ?>> connector = (Mono<ChannelOperations<?, ?>>) connect();
		return ByteBufFlux.fromInbound(connector.flatMapMany(contentReceiver), alloc);
	}
}

class HttpClientConnect extends HttpClient {
	@Override
	protected Mono<? extends Connection> connect() {
		HttpClientConfig config = configuration();
		return new MonoHttpConnect(config);
	}
}

public class ChannelOperations<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound>
		implements NettyInbound, NettyOutbound, Connection, CoreSubscriber<Void>, ChannelOperationsId {
	final FluxReceive inbound;

    // 异步接收数据并返回一个缓冲区的Flux
	@Override
	public ByteBufFlux receive() {
        // fromInbound 用来将一个数据的发布者和一块缓冲区来封装成一个ByteBufFlux
		return ByteBufFlux.fromInbound(inbound, connection.channel()
		                                                          .alloc());
	}
}

可以看到,这里使用了 Netty 中的 ByteBufAllocator,它是用于分配 ByteBuf 对象的工具,而ByteBuf 是 Netty 中的字节缓冲区,用于存储和操作二进制数据。接着我们发现调用了 connect 方法来获取一个通道操作的 Mono,那么很显然这个 connect 方法并没有立马去创建连接,直接返回的是一个 MonoHttpConnect 对象,所以连接的建立过程肯定是在 MonoHttpConnect 中的subscribe方法来实现,感兴趣可以去看看。responseContent 方法最复杂的部分在 return 语句里,该方法最终返回的是 ByteBufFlux 类型对象,ByteBufFlux 其实就是把 Netty 中的 ByteBuf 进行了异步封装,封装成了 Flux<ByteBuf>。也就是说 responseContent 方法把建链和接收数据的缓冲区都做了异步封装并返回。之前我们说到在连接建立成功后就会发送HTTP请求,那么我们如何知道应答数据已经准备好了?还是得看 ChannelOperationsHandler,我们看其 channelRead 方法:

final class ChannelOperationsHandler extends ChannelInboundHandlerAdapter {
	@Override
	final public void channelRead(ChannelHandlerContext ctx, Object msg) {
		if (msg == null || msg == Unpooled.EMPTY_BUFFER || msg instanceof EmptyByteBuf) {
			return;
		}
		try {
			ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
			if (ops != null) {
                // 如果是 HTTP 则 ops 是 HttpClientOperations 对象
				ops.onInboundNext(ctx, msg);
			}
		} catch (Throwable err) {
			safeRelease(msg);
			ctx.close();
			exceptionCaught(ctx, err);
		}
	}
}

该方法会从Channel中取出通道操作对象(如果是HTTP则对应的是 HttpClientOperations,它是 ChannelOperations 的子类之一)并调用其 onInboundNext 方法,该方法将 Netty 输入的 Object msg 传递到 reactor-netty 的 FluxReceive 对象的 onInboundNext 方法,从这里开始进入了响应式阶段

final class FluxReceive extends Flux<Object> implements Subscription, Disposable {
    CoreSubscriber<? super Object> receiver;
	Queue<Object>                  receiverQueue;
	final void onInboundNext(Object msg) {
		if (receiverFastpath && receiver != null) {
			try {
				receiver.onNext(msg);
			} finally {
				ReferenceCountUtil.release(msg);
			}
		} else {
			Queue<Object> q = receiverQueue;
			if (q == null) {
				// please note, in that case we are using non-thread safe, simple
				// ArrayDeque since all modifications on this queue happens withing
				// Netty Event Loop
				q = new ArrayDeque<>();
				receiverQueue = q;
			}
			
			q.offer(msg);
			drainReceiver();
		}
	}
}

我们需要知道的是,在 Netty 中 HTTP 请求和响应都是由一系列的 DefaultHttpRequest 和 DefaultHttpResponse 对象组成的,其中 DefaultHttpContent 对象就是 HTTP 请求和响应的内容部分。当接收到 HTTP 请求时,Netty 会将请求头和请求体分开处理,其中请求体就是由一系列的 DefaultHttpContent 组成的。同样地,当发送 HTTP 响应时,Netty 会将响应头和响应体分开处理,其中响应体也是由一系列的 DefaultHttpContent 组成的。当 channelRead 接收到的 msg 为 DefaultHttpResponse 类型时,表明收到了应答的HEADER部分,接着当 channelRead 接收到的 msg 为 DefaultHttpContent 类型时,表明收到了应答体内容,最后当 channelRead 接收到的 msg 为 EmptyLastHttpResponse 类型时,表明该HTTP应答完整结束。有了这一基础知识后,reactor-netty 为了管理连接的生命周期,定义了 ConnectionObserver 这一接口,该接口是一个函数接口,需要被实现的是其 onStateChange 方法:

@FunctionalInterface
public interface ConnectionObserver {
	/**
	 * React on connection state change (e.g. http request or response)
	 *
	 * @param connection the connection reference
	 * @param newState the new State
	 */
	void onStateChange(Connection connection, State newState);
}

它接收一个连接对象和一个新的状态,来承载连接状态变更,那么连接有哪些状态呢?

 ConnectionObserver,其内部的State接口定义了连接的各种状态:

	interface State {

		/**
		 * Propagated when a connection has been established and is available
         * 连接建立
		 */
		State CONNECTED = ReactorNetty.CONNECTED;

		/**
		 * Propagated when a connection is bound to a channelOperation and ready for
		 * user interaction
         * 已配置
		 */
		State CONFIGURED = ReactorNetty.CONFIGURED;

		/**
		 * Propagated when a connection has been reused / acquired
		 * (keep-alive or pooling)
         * 获得
		 */
		State ACQUIRED = ReactorNetty.ACQUIRED;

		/**
		 * Propagated when a connection has been released but not fully closed
		 * (keep-alive or pooling)
         * 已释放
		 */
		State RELEASED = ReactorNetty.RELEASED;

		/**
		 * Propagated when a connection is being fully closed
         * 断开连接
		 */
		State DISCONNECTING = ReactorNetty.DISCONNECTING;
	}

我们再回到 channelRead 方法中 "ops.onInboundNext(ctx, msg);" 部分,前面说过 ops 其实是HttpClientOperations 类型的对象,而 HttpClientOperations.onInboundNext 方法中将通过 msg 消息类型来判断连接的生命周期中所处的状态,整体过程比较复杂,这里不直接给出源码。

        接着是 aggregate 方法,从方法名来看就是做应答数据聚合使用的,它主要是从 ByteBufAllocator 取并发射数据,由于需要做成异步的,所以内部使用了Mono.defer 来实现,需要注意的时,由于该方法需要聚合数据,所以用的是 doOnNext 方法,即在数据流真正调用onNext 发射之前调用,该 doOnNext 方法将在 channelRead 方法中收到 EmptyLastHttpResponse 类型的 msg 来触发,这里暂时不给出源码。

        后面是 asString 方法根据默认的字符集来把缓冲区的数据变换为字符串。

        最后的 subscribe 方法我们都比较熟悉了,传入的 Lambda 表达式我们把应答结果打印出来。

        以上就是整个消息发送和接收应答过程,我们可以看到,一旦 subscribe 方法调用,整个建立连接、发送请求、接收应答、聚合应答数据 都是由另一个线程(reactor-http-nio-*)来完成,相当于主线程再触发完 subscribe 方法后就可以做其他事情去了,而不用等待应答,但需要注意的是,subscribe 中的Lambda表达式(即处理HTTP应答的业务逻辑)也由 reactor-http-nio-* 线程执行,请勿处理高开销操作。

        那么,站在HTTP client端的角度来看,我们使用 reactor-netty 的优势在哪里?最大的优势就是既做到了和响应式API的打通,也做到了HTTP调用的异步化(包括建链、发送请求,接收应答、组装应答数据等),我们看1个线程使用 reactor-netty 作为 HTTP client 端for循环10次调用下游的结果:

pool-1-thread-1 开始请求 abc1
pool-1-thread-1 开始请求 abc2
pool-1-thread-1 开始请求 abc3
pool-1-thread-1 开始请求 abc4
pool-1-thread-1 开始请求 abc5
pool-1-thread-1 开始请求 abc6
pool-1-thread-1 开始请求 abc7
pool-1-thread-1 开始请求 abc8
pool-1-thread-1 开始请求 abc9
pool-1-thread-1 开始请求 abc10
reactor-http-nio-5 收到应答 abc4
reactor-http-nio-3 收到应答 abc2
reactor-http-nio-2 收到应答 abc1
reactor-http-nio-11 收到应答 abc10
reactor-http-nio-9 收到应答 abc8
reactor-http-nio-8 收到应答 abc7
reactor-http-nio-10 收到应答 abc9
reactor-http-nio-7 收到应答 abc6
reactor-http-nio-6 收到应答 abc5
reactor-http-nio-4 收到应答 abc3

和之前分析的一样,发送请求的线程(pool-1-thread-1)和处理应答的线程(reactor-http-nio-*)不是同一个线程,意味着发送请求的线程(一般是业务线程或主线程)在发送请求之后可以做其他的事情(比如发送另一个请求)而不用等待下游应答!!!

        那么用 reactor-netty 是不是就完美解决了 HTTP 调用中的阻塞等待应答导致Web容器工作线程(一般也是处理业务逻辑的线程)被耗尽的问题了?确实,使用 reactor-netty 能规避Web容器工作线程被下游某个“慢”服务阻塞耗尽的问题,但我们也要考虑HTTP连接,目前在HTTP 1.1协议下,单个HTTP连接同时只能服务一个请求,该请求没有应答之前无法发送其他请求,虽然Web容器工作线程是解放了,那HTTP连接呢?前面提到,为了充分异步话,reactor-netty 中创建和获取连接也是异步话的,真正创建连接的操作在 DefaultPooledConnectionProvider.PooledConnectionAllocator 中由 connectChannel 方法完成:

static final class PooledConnectionAllocator {
	Publisher<PooledConnection> connectChannel() {
		return Mono.create(sink -> {
            // 这里可以理解成连接池
			PooledConnectionInitializer initializer = new PooledConnectionInitializer(sink);
			EventLoop callerEventLoop = sink.contextView().hasKey(CONTEXT_CALLER_EVENTLOOP) ?
					sink.contextView().get(CONTEXT_CALLER_EVENTLOOP) : null;
			if (callerEventLoop != null) {
				TransportConnector.connect(config, remoteAddress, resolver, initializer, callerEventLoop, sink.contextView())
						.subscribe(initializer);
			}
			else {
				TransportConnector.connect(config, remoteAddress, resolver, initializer, sink.contextView()).subscribe(initializer);
			}
		});
	}
}

connectChannel 中实际是调用 TransportConnector.connect 创建连接,然后 PooledConnectionInitializer 充当了订阅者,将创建的连接 Channel 放在 DefaultPooledConnectionProvider.PooledConnection 中管理起来,而连接池的各项属性由 DefaultPooledConnectionProvider.PoolFactory 来管理:

protected static final class PoolFactory<T extends Connection> {
	final Duration evictionInterval;
	final String leasingStrategy;
	final int maxConnections;
	final long maxIdleTime;
	final long maxLifeTime;
	final boolean metricsEnabled;
	final int pendingAcquireMaxCount;
	final long pendingAcquireTimeout;
	final Supplier<? extends MeterRegistrar> registrar;
	final Clock clock;
	final Duration disposeTimeout;
	final BiFunction<Runnable, Duration, Disposable> pendingAcquireTimer;
	final AllocationStrategy<?> allocationStrategy;
	final BiPredicate<Connection, ConnectionMetadata> evictionPredicate;
}

那么一旦连接数超过 maxConnections 会怎么样?没错,会阻塞,借着上述的例子来说,就是 pool-1-thread-1 线程不断的尝试发起HTTP调用,但 reactor-http-nio-* 却无法快速的发送HTTP请求和处理应答,因为使用中的HTTP连接达到最大数量导致获取HTTP连接被阻塞了。所以针对一个HTTP连接只能同时处理一个请求的场景,使用reactor-netty 虽然让业务线程(非阻塞)不成为瓶颈,但瓶颈却会在HTTP连接数量这里暴露出来。

        就像有人说“复杂度不会消失,只会转移”一样,使用 reactor-netty 时一定要多关注上下游的资源,虽然我们通过 reactor-netty 解决了部分资源在某些场景下出现瓶颈的问题,并不代表整个系统的瓶颈就消失了。

 类似资料: