当前位置: 首页 > 知识库问答 >
问题:

Reactor-Netty:使用keep-alive HTTP客户端

任昊阳
2023-03-14

我使用reactor-netty请求一组URL。大多数URL属于相同的主机。reactor-netty似乎会为每个URL创建一个全新的TCP连接,即使已经为上一个URL建立了到主机的连接。当数百个连接同时建立时,一些服务器会丢弃新的连接或开始缓慢响应。

代码示例:

    Flux.just(...)
    .groupBy(link -> {
        String host = "";
        try {
            host = new URL(link).getHost();
        } catch (MalformedURLException e) {
            LOGGER.warn("Cannot determine host {}", link, e);
        }
        return host;
    })
    .flatMap(group -> {
        HttpClient client = HttpClient.create()
                .keepAlive(true)
                .tcpConfiguration(tcp -> tcp.host(group.key()));
        return group.flatMap(link -> client.get()
            .uri(link)
            .response((resp, cont) -> resp.status().code() == 200 ? cont.aggregate().asString() : Mono.empty())
            .doOnSubscribe(s -> LOGGER.debug("Requesting {}", link))
            .timeout(Duration.ofMinutes(1))
            .doOnError(e -> LOGGER.warn("Cannot get response from {}", link, e))
            .onErrorResume(e -> Flux.empty())
            .collect(Collectors.joining())
            .filter(s -> StringUtils.isNotBlank(s)));
    })
    .blockLast();

在日志中,我看到同一个远程主机的本地端口不同,活动和非活动连接的总和远远高于不同主机的数量。这就是为什么我认为reactor-netty没有重用已经建立的连接。

DEBUG [2019-04-29 08:15:18,711] reactor-http-nio-10 r.n.r.PooledConnectionProvider: [id: 0xaed18e87, L:/192.168.1.183:56832 - R:capcp2.naad-adna.pelmorex.com/52.242.33.4:80] Releasing channel
DEBUG [2019-04-29 08:15:18,711] reactor-http-nio-10 r.n.r.PooledConnectionProvider: [id: 0xaed18e87, L:/192.168.1.183:56832 - R:capcp2.naad-adna.pelmorex.com/52.242.33.4:80] Channel cleaned, now 1 active connections and 239 inactive connections
...
DEBUG [2019-04-29 08:15:20,158] reactor-http-nio-10 r.n.r.PooledConnectionProvider: [id: 0xd6c6c5db, L:/192.168.1.183:56965 - R:capcp2.naad-adna.pelmorex.com/52.242.33.4:80] Releasing channel
DEBUG [2019-04-29 08:15:20,158] reactor-http-nio-10 r.n.r.PooledConnectionProvider: [id: 0xd6c6c5db, L:/192.168.1.183:56965 - R:capcp2.naad-adna.pelmorex.com/52.242.33.4:80] Channel cleaned, now 0 active connections and 240 inactive connections

是否可以使用keep-aliveHTTP客户端通过与主机的同一TCP连接请求同一主机上的多个URL?如果不是,我如何限制到同一主机的同时连接数或顺序执行对同一主机的请求(下一个请求仅在接收到对前一个请求的响应后才执行)?

我使用californium-sr6发布火车。

共有1个答案

公沈义
2023-03-14

是的,reactor netty支持保持活动状态、连接重用和连接池。

请注意,.flatmap是一个并行处理内部流的异步操作。因此,当您调用group.flatmap(...时,内部请求将并行执行。由于它们是并行执行的,将需要建立多个连接。

如果要顺序执行对同一主机的请求,请将示例更改为使用group.concatmap而不是.flatmap

如果您希望仍然并行执行这些请求,但将活动请求的数量限制在单个主机上,请将示例更改为使用.FlatMap的重载版本之一,该版本采用concurrency参数。

此外,由于您使用的是httpclient.create(),因此您的示例使用了默认的全局http连接池。如果希望对连接池进行更多控制,可以通过httpclient.create(ConnectionProvider)指定不同的ConnectionProvider

 类似资料:
  • 在构建Retor Netty应用程序时,我得到了两个相似的指标。但是它们之间到底有什么区别呢? vs. 我不知道他们在测量响应时间的方式/位置上有什么不同。哪种方法测量的时间更长并不一致。 Http客户端指标测量时间更长 ReactorNetty指标延长了时间

  • 我将创建一个身份验证服务器,它本身与一组不同的Oauth2.0服务器交互。Netty似乎是在这里实现网络部分的一个很好的候选者。但在开始之前,我需要澄清一些关于netty的细节,因为我是新手。例行程序如下: > < li> 服务器接受来自客户端的HTTPS连接。 然后,不关闭第一个连接,它通过HTTPS与远程OAuth2.0服务器建立另一个连接并获取数据 毕竟,服务器将结果发送回客户端,客户端应该

  • 如何在基于网络的 HTTP 客户端中重试 HTTP 请求? 请考虑以下处理程序,如果收到 HTTP 响应代码 503,它将尝试在 1 秒后重试 HTTP 请求: 在本例中,当我写入通道时,管道中的其他处理程序会看到HttpObjects,但实际上不会再次执行HttpRequest——只接收到一个HttpResponse。 我认为在这种情况下我只是滥用了 Channel,我需要创建一个新的通道(表示

  • 在Netty中创建客户端连接时,我有一个问题。 这里,为什么我们没有一个bind方法,将通道绑定到发起客户端连接的端口(在客户端)?我们唯一需要提供的就是给出服务器地址和端口如下: 这是在客户端还是服务器端创建了一个新的通道?此通道绑定在客户端的哪个端口? 我们在执行服务器端引导时进行绑定,如下所示 我很困惑,不明白客户端从哪个端口向服务器发送数据,使用的是什么通道?

  • 我正在编写一个进程,它必须连接(并保持活动)到几个(数百个)远程对等点,并管理对它们的消息传递/控制。 我制作了这个软件的两个版本:第一个版本使用经典的“每个连接线程”模型,第二个版本使用标准的javaNIO和选择器(以减少线程分配,但存在问题)。然后,环顾四周,我发现Netty在大多数情况下都能提高很多,于是我开始第三次使用它。我的目标是保持资源使用率非常低,保持快速。 一旦编写了具有自定义事件

  • 问题内容: 我需要使客户端能够建立许多连接。我使用Netty 4.0。不幸的是,所有现有示例都没有显示如何创建大量连接。 这是正确的决定吗?还是会更好? 问题答案: 是的,它几乎是正确的。您唯一需要更改的就是在每个连接上创建NioEventLoopGroup。 NioEventLoopGroup实例非常昂贵,因此应该共享它们。创建一个实例并共享它,方法是每次都将同一个实例传递给Bootstrap.