当前位置: 首页 > 知识库问答 >
问题:

使用spring反应式webClient面临问题“WebClientRequestException:挂起的获取队列已达到其最大大小1000”

蒋栋
2023-03-14

我正在运行一个微服务API的加载,它涉及使用Spring反应式Webclient调用其他微服务API。我正在使用Postman runner选项卡来测试这一点。

首先,我以1500次迭代运行负载,然后为每个请求调用第二个微服务,一切正常。但当我以5000次迭代运行负载时,第二个微服务被调用了3500次,1500次调用由于问题而失败

WebClientRequestException:挂起的获取队列已达到其最大大小1000

使用org.springframework.web.reactive.function.client.WebClient和默认配置,下面是代码片段。

 private WebClient webClient;

    @PostConstruct
    public void init() {
        this.webClient = WebClient.builder().defaultHeader(HttpHeaders.CONTENT_TYPE,  MediaType.APPLICATION_JSON_VALUE)
                .build();
    }

如何避免这种情况?

我将最新的spring boot starter父依赖项(版本2.5.3)与spring-webflux-5.3.9一起使用。震击器震击器。

日志:

reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
        at reactor.core.Exceptions.retryExhausted(Exceptions.java:290)
        at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:67)
        at reactor.util.retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:375)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:296)
        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:885)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817)
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
        at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)
        at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
        at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

**Caused by: org.springframework.web.reactive.function.client.WebClientRequestException: Pending acquire queue has reached its maximum size of 1000; nested exception is reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 1000**
        at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
        Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
        |_ checkpoint ⇢ Request to POST http://172.20.0.2:3130/v1/login/mobile [DefaultWebClient]
Stack trace:
                at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
                at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55)
                at reactor.core.publisher.Mono.subscribe(Mono.java:4338)
                at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
                at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204)
                at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)
                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:414)
                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251)
                at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
                at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
                at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)
                at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
                at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect$ClientTransportSubscriber.onError(HttpClientConnect.java:304)
                at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
                at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onError(DefaultPooledConnectionProvider.java:172)
                at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:444)
                at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:543)
                at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:266)
                at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:399)
                at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onSubscribe(DefaultPooledConnectionProvider.java:212)
                at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:674)
                at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$1(PooledConnectionProvider.java:137)
                at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
                at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:268)
                at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
                at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77)
                at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46)
                at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
                at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:271)
                at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
                at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
                at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.resubscribe(FluxRetryWhen.java:216)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onNext(FluxRetryWhen.java:269)
                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:282)
                at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:861)
                at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
                at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
                at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
                at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
                at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
                at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)
                at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)
                at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
                at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
                at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
                at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
                at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
                at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
                at java.base/java.lang.Thread.run(Thread.java:829)
**Caused by: reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 1000
        at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.pendingOffer(SimpleDequePool.java:543)**
        at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:266)
        at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:399)
        at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onSubscribe(DefaultPooledConnectionProvider.java:212)
        at reactor.netty.internal.shaded.reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:674)
        at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$1(PooledConnectionProvider.java:137)
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
        at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:268)
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
        at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:77)
        at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:46)

共有1个答案

浦墨竹
2023-03-14

WebClient需要一个HTTP客户端库来执行请求,默认情况下,它使用Reactor Netty。

Reactor净参考文件报价

默认情况下,Reactor Netty客户端使用一个“固定”连接池,其中500个是活动通道的最大数量,1000个是允许保持在挂起状态的进一步通道获取尝试的最大数量(对于其余配置,请检查下面的系统属性或生成器配置)。这意味着,如果有人试图获取通道,只要创建的通道少于500个,并且由池管理,那么实现就会创建一个新通道。当达到池中通道的最大数量时,最多会延迟(挂起)1000次获取通道的新尝试,直到通道再次返回到池中,并且会拒绝进一步尝试,并出现错误。

您看到的是,您正在积极使用连接池中的所有500个连接,并且您已经用1000个挂起的请求填充了“挂起”队列。

你有两个选择来解决这个问题

垂直缩放

增加连接池大小和/或获取队列长度

ConnectionProvider connectionProvider = ConnectionProvider.builder("myConnectionPool")
        .maxConnections(<your_desired_max_connections>)
        .pendingAcquireMaxCount(<your_desired_pending_queue_size>)
        .build();
ReactorClientHttpConnector clientHttpConnector = new ReactorClientHttpConnector(HttpClient.create(connectionProvider));
WebClient.builder()
        .clientConnector(clientHttpConnector)
        .build();

水平缩放

创建应用程序的其他实例,并在实例之间平衡api调用的负载。

Spring参考文档

附加说明:

在计算连接池的大小时,值得考虑下游api调用的延迟。一个好的开始是

connection_pool_size=tps*downstream_api_latency

tps(每秒事务数)

 类似资料:
  • 有什么方法可以配置响应的最大标头大小吗? 我从netty框架中得到以下错误: 显然reactor为此添加了一个API,但我不知道在SpringWebFlux的WebClient中这是如何控制的。我正在使用以下版本 有什么想法吗?

  • 问题内容: 如何在0(1)时间复杂度的任何时间从队列中检索max和min元素?早些时候,我使用Collections.max和min查找元素,但这将是0(n)。 问题答案: 您只有2种方法来获得最小/最大操作的O(1): 如果结构已排序,并且您知道最大值/最小值位于何处 如果结构未排序且仅允许插入:每次插入项目并分别存储值时,您可以重新计算最小值/最大值 如果结构未排序并且允许插入和删除:我认为您

  • 问题内容: 这个问题很简单,但是Google或Pika开源代码没有帮助。有没有办法在Pika中查询当前队列大小(项目计数器)? 问题答案: 在AMQP协议中有两种获取队列大小的方法。您可以使用Queue.Declare或Basic.Get。 如果您使用的是使用Basic.Consume到达的消息,那么您将无法获得此信息,除非断开连接(超时)并重新声明队列,否则将收到一条消息但不确认。在较新版本的A

  • 问题内容: 我正在做反应,基本上我想用工具提示制作一个按钮,现在我正在制作工具提示。我正在更改css display属性,以使其在鼠标进入和离开时不可见。但是有一个错误,我不知道该怎么办… 这是我的代码: 在控制台中,我收到此错误: 我找不到问题所在。我知道这可能与调用一个函数有关,后者又调用另一个函数。但是我在代码中看不到这样的东西,而且不确定是否全部。感谢帮助 :) 问题答案: 每当您看到从函

  • 抱歉,如果标题解释得不好,我有这个页面,有两个按钮和一个文本块,一个用于增加文本大小,另一个用于减少文本大小。我想在文本达到最大或最小大小时禁用每个按钮。例如: 点击“放大”按钮,如果文本达到最大大小,则禁用该按钮。点击“较小”按钮,如果文本达到最小大小,则禁用该按钮。 我尝试在onhtml函数中使用下一个if语句: 其中66.6667pt是文本在保持固定前达到的大小。另一种情况下,最小尺寸为6.

  • 我有一个整数列表,像list1=(1,2,3)和list2=(0,1)。 我的列表包含清单1和清单2。它可以包含更多,但对于这个例子,我只取了两个列表。 注意,我在一个沉重的calcul中多次调用这个方法,所以awnser应该考虑到这一点。