我正在运行一个Spring Boot应用程序,它使用WebClient处理非阻塞和阻塞HTTP请求。应用程序运行一段时间后,所有传出的HTTP请求似乎都被卡住了。
WebClient用于向多个主机发送请求,但作为示例,以下是它如何初始化并用于向Telegram发送请求:
WebClientConfig:
@Bean
public ReactorClientHttpConnector httpClient() {
HttpClient.create(ConnectionProvider.builder("connectionProvider").build())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.responseTimeout(Duration.ofMillis(responseTimeout));
return new ReactorClientHttpConnector(httpClient);
}
所有WebClient都使用相同的ReactorClientHttpConnector。
电报客户:
@Autowired
ReactorClientHttpConnector httpClient;
WebClient webClient;
RateLimiter rateLimiter;
@PostConstruct
public void init() {
webClient = WebClient.builder()
.clientConnector(httpClient)
.baseUrl(telegramUrl)
.build();
rateLimiter = RateLimiter.of("telegram-rate-limiter",
RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofMinutes(1))
.limitForPeriod(20)
.build());
}
public void sendMessage(@PathVariable("token") String token, @RequestParam("chat_id") long chatId, @RequestParam("text") String message) {
webClient.post().uri(String.format("/bot%s/sendMessage", token))
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromFormData("chat_id", String.valueOf(chatId))
.with("text", message))
.retrieve()
.bodyToMono(Void.class)
.transformDeferred(RateLimiterOperator.of(rateLimiter))
.block();
}
速率限制器用于确保电报API中规定的每分钟请求数不超过20。
当应用程序启动时,所有请求均按预期正常解决。但过了一段时间,所有的请求似乎都被卡住了。发生这种情况所需的时间可能从几个小时到几天不等。它发生在对不同主机的所有请求中,当来自TelegramBot的消息停止时,很容易注意到。一旦请求被卡住,它们就会被无限期地卡住,我必须重新启动应用程序才能让它再次工作。
日志中似乎没有导致这种情况的异常。由于我为我的电报消息维护了一个队列,我可以看到当队列中的消息数量稳步增加时,以及当等待请求解决的其他进程中发生错误时,请求停止的时间点。
由于我设置的连接超时和响应超时没有生效,请求似乎甚至没有发出。
我之前也尝试过将空闲时间设置为0,但这并没有解决问题
@Bean
public ReactorClientHttpConnector httpClient() {
HttpClient httpClient = HttpClient.create(ConnectionProvider.builder("connectionProvider").maxConnections(1000).maxIdleTime(Duration.ofSeconds(0)).build())
HttpClient httpClient = HttpClient.newConnection()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.responseTimeout(Duration.ofMillis(responseTimeout));
return new ReactorClientHttpConnector(httpClient);
}
更新:
我启用了指标并在它卡住时使用千分尺查看它。有趣的是,它显示Telegram有一个连接,但也显示空闲、挂起或活动时没有连接。
reactor_netty_connection_provider_idle_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 0.0
reactor_netty_connection_provider_pending_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 0.0
reactor_netty_connection_provider_active_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 0.0
reactor_netty_connection_provider_total_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 1.0
问题可能是这个缺失的连接吗?
更新2:
我认为这可能与另一个问题有关:关闭错误状态代码的Reactor网络连接
因此,我将我的HttpClient更新为:
@Bean
public ReactorClientHttpConnector httpClient() {
HttpClient httpClient = HttpClient.create(ConnectionProvider.builder("connectionProvider").metrics(true).build())
.doAfterResponseSuccess((r, c) -> c.dispose())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.responseTimeout(Duration.ofMillis(responseTimeout));
return new ReactorClientHttpConnector(httpClient);
}
但所有这些似乎都在加速问题的发生。与之前一样,活动、挂起和空闲连接加起来并不等于总连接数。总数始终大于其他3个指标加在一起的总和。
更新3:问题发生时,我进行了线程转储。总共有74个线程,所以我认为应用程序的线程不会用完。
电报线程的转储:
"TelegramBot" #20 daemon prio=5 os_prio=0 cpu=14.65ms elapsed=47154.24s tid=0x00007f6b28e73000 nid=0x1c waiting on condition [0x00007f6aed6fb000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@11.0.13/Native Method)
- parking to wait for <0x00000000fa865c80> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(java.base@11.0.13/LockSupport.java:194)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.13/AbstractQueuedSynchronizer.java:885)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.13/AbstractQueuedSynchronizer.java:1039)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.13/AbstractQueuedSynchronizer.java:1345)
at java.util.concurrent.CountDownLatch.await(java.base@11.0.13/CountDownLatch.java:232)
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)
at reactor.core.publisher.Mono.block(Mono.java:1707)
at com.moon.arbitrage.cm.feign.TelegramClient.sendMessage(TelegramClient.java:59)
at com.moon.arbitrage.cm.service.TelegramService.lambda$sendArbMessage$0(TelegramService.java:53)
at com.moon.arbitrage.cm.service.TelegramService$$Lambda$1092/0x000000084070f840.run(Unknown Source)
at com.moon.arbitrage.cm.service.TelegramService.task(TelegramService.java:82)
at com.moon.arbitrage.cm.service.TelegramService$$Lambda$920/0x0000000840665040.run(Unknown Source)
at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)
Locked ownable synchronizers:
- None
Reactor工人线程:
"reactor-http-epoll-1" #15 daemon prio=5 os_prio=0 cpu=810.44ms elapsed=47157.07s tid=0x00007f6b281c4000 nid=0x17 runnable [0x00007f6b0c46c000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait0(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:177)
at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:286)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)
Locked ownable synchronizers:
- None
"reactor-http-epoll-2" #16 daemon prio=5 os_prio=0 cpu=1312.16ms elapsed=47157.07s tid=0x00007f6b281c5000 nid=0x18 waiting on condition [0x00007f6b0c369000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@11.0.13/Native Method)
- parking to wait for <0x00000000fa865948> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(java.base@11.0.13/LockSupport.java:194)
at java.util.concurrent.CompletableFuture$Signaller.block(java.base@11.0.13/CompletableFuture.java:1796)
at java.util.concurrent.ForkJoinPool.managedBlock(java.base@11.0.13/ForkJoinPool.java:3128)
at java.util.concurrent.CompletableFuture.waitingGet(java.base@11.0.13/CompletableFuture.java:1823)
at java.util.concurrent.CompletableFuture.get(java.base@11.0.13/CompletableFuture.java:1998)
at com.moon.arbitrage.cm.service.OrderService.reconcileOrder(OrderService.java:103)
at com.moon.arbitrage.cm.service.BotService$BotTask.lambda$task$1(BotService.java:383)
at com.moon.arbitrage.cm.service.BotService$BotTask$$Lambda$1161/0x00000008400af440.accept(Unknown Source)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:171)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:400)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:419)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:473)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:702)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)
Locked ownable synchronizers:
- None
"reactor-http-epoll-3" #17 daemon prio=5 os_prio=0 cpu=171.84ms elapsed=47157.07s tid=0x00007f6b28beb000 nid=0x19 runnable [0x00007f6b0c26a000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait0(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:177)
at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:281)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)
Locked ownable synchronizers:
- None
"reactor-http-epoll-4" #18 daemon prio=5 os_prio=0 cpu=188.10ms elapsed=47157.07s tid=0x00007f6b28b7d800 nid=0x1a runnable [0x00007f6b0c169000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait0(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:177)
at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:281)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@11.0.13/Thread.java:829)
Locked ownable synchronizers:
- None
看起来其中一个被另一个任务(甚至不是来自电报服务)阻塞了,但这不应该是一个问题,因为其他三个工作线程是可运行的,对吗?
这些问题通常与spring-boot上的线程耗尽有关,应该在使用jstack发生块时分析生成线程转储:
jstack <java pid> > ThredDump.txt
生成的文件ThredDump的内容。txt通常提供有关阻塞线程的信息。
我建议看看这个方向。可能它没有按预期工作,这取决于应用程序在一段时间内执行的请求数。从Ratelimiter的Javadoc:“需要注意的是,请求的许可数量永远不会影响请求本身的限制……但它会影响下一个请求的限制。例如,如果一个昂贵的任务到达空闲速率限制器,它将立即被授予,但下一个请求将经历额外的限制,从而支付昂贵任务的成本。”这一讨论也可能有帮助:github或github
我可以想象RateLimitor中有一些节流累积或其他影响,我会尝试使用它,确保这件事真的按你想要的方式工作。或者,考虑使用Spring@Scheduled从队列中读取数据。您可能希望使用嵌入式JMS来增加它的趣味性,以获得更多好处(消息持久性等)。
最终发现并解决了问题。问题是我有一个在reactor线程上被阻塞的阻塞任务。多亏了线程转储,我才注意到这一点。阻塞任务正在等待事件,因此可能需要很长时间才能解决。因此,最终,当所有四个反应器线程都被阻塞时,所有请求自然会被阻塞,没有线程来处理它们。
简而言之:不要阻塞你的reactor线程。
我正在使用最新版本的google-cloud-pubsub,并且正在经历一个据称已经修复的bug。 我正在使用这个版本和其中的代码示例:https://pypi.org/project/google-cloud-pubsub/ 问题:因此,在我运行呼叫订阅者的订阅者工作者大约4-5小时后,它停止接收消息。 对如何修复它有什么建议吗?
本文向大家介绍防止重复发送 Ajax 请求,包括了防止重复发送 Ajax 请求的使用技巧和注意事项,需要的朋友参考一下 要考虑并理解 success, complete, error, timeout 这些事件的区别,并注册正确的事件,一旦失误,功能将不再可用; 不可避免地比普通流程要要多注册一个 complete 事件; 恢复状态的代码很容易和不相干的代码混合在一起; 推荐用主动查询状态的方式(
因此,我用于让bot问候新用户的代码停止工作,我不知道为什么或如何使用这是im用于欢迎活动本身的代码```module.exports=(client)=>{const channelId='757493821251649608'//welcome channel const targetChannelId='757521186929246219'//rules and info }``` 这就是
问题内容: 我正在使用jquery发出ajax请求 如何手动停止我的特定Ajax请求? 问题答案: 该方法返回一个可以用于执行此操作的对象: 然后,当您想停止请求时:
我有一个小的Spring Boot API在docker中运行。下面显示的是我用来升级容器的命令。 然后,我有一个停靠的JMeter,我使用下面的命令来启动它 但是所有的测试都失败了,请求没有被发送到API。这就是JMeter的CLI的外观 请求的测试配置: 协议:htttp 服务器:localhost 端口:8080 方法:GET 路径:/api/factorial 完整的bash文件如下所示:
我在创建计时器点击00:00时发送通知的方法时遇到一些问题。 我的意思是,我希望当计时器结束时,如果应用程序关闭,它会发送一个通知事件。 我已经找到了一种方法来显示通知,并使计时器即使在应用程序关闭时也能工作。 但当我把功能发送通知在计时器的末尾它只工作时,应用程序打开。 这是我的主要活动。JAVA AppJava类 AndroidManifest。xml 谢谢你的帮助! 祝你有愉快的一天 克里斯