org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms
at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:741) ~[spring-integration-jms-5.3.2.RELEASE.jar:5.3.2.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
场景
我创建了一个使用四个网关的路由,用于在远程计算机上发出web请求的相当复杂的设置,但我
集成流程1:
入站webflux网关->出站jms网关
@Bean
public IntegrationFlow step1() {
// request-reply pattern using the jms outbound gateway
var gateway = Jms.outboundGateway(jmsConnectionFactory)
.requestDestination("inboundWebfluxQueue")
.replyDestination("outboundWebfluxQueue")
.correlationKey("JMSCorrelationID");
// send a request to jms, wait for the reply and return message payload as response
return IntegrationFlows.from(webfluxServer("/example/webflux"))
// won't work consistently without the next line
.fluxTransform(f -> f)
.handle(gateway).get();
}
@Bean
public IntegrationFlow step2_using_webflux() {
var gateway = WebFlux.outboundGateway("http://localhost:8080/actuator/health")
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class)
// ignore headers
.mappedResponseHeaders();
return IntegrationFlows.from(jmsInboundGateway())
// use webflux outbound gateway to send the request to the TEST_URL
.handle(gateway).get();
}
另一种方法是使用.channel(MessageChannels.flux())
而不是.fluxtransform(f->f)
。这样,我们实际上给WebFlux容器带来了一个反压力,使其在请求事件循环中等待可用的插槽。
因此,我们只是向JMS队列发送不尊重的反压力,而另一边的JMS消费者无法跟上。另外,我们在内部向同一个Netty服务器发送一个请求,再次为这些内部请求获取一个事件循环插槽。
如果您感兴趣,我编写了一个单元测试,看看是怎么回事:
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class IntegrationApplicationTests {
@Autowired
private TestRestTemplate template;
@Test
void testSpringIntegrationWebFlux() {
var executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(10);
executor.afterPropertiesSet();
var numberOfExecutions = new AtomicInteger();
for (var i = 0; i < 100; i++) {
executor.execute(() -> {
var responseEntity = this.template.getForEntity("/example/webflux", String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
numberOfExecutions.getAndIncrement();
}
});
}
executor.shutdown();
assertThat(numberOfExecutions.get()).isEqualTo(100);
}
}
问题内容: 结合使用带有ReactJS的ES5开发,可以将组件描述如下: 在此的示例引用对象本身,这是预期的自然行为。 题 我的问题是: 如何使用ES6创建组件? 知道在JavaScript中使用new运算符时会引用实例化对象本身,因此有人可以告诉我使用bind的真正目的是什么?这和React的内部机制有关吗? 问题答案: 只是核心javascript。这是绑定事件的工作方式。这不是一个React
本文向大家介绍为什么必须使用Twitter进行联网,包括了为什么必须使用Twitter进行联网的使用技巧和注意事项,需要的朋友参考一下 如果我们能够在线开发有价值的业务网络,那真是太棒了? 是的,通过“ Twitter”可以实现–实时通信枢纽成为一个出色的业务网络平台。通过在这种快节奏的环境中改进和实施我们的社交网络技能,Twitter可以用作构建强大的业务网络的起点。 对于Twitter的网络,
关于在最新版本的GCC和Clang中编译有几个问题:实验::filessystems链接器错误 但是现在< code>filesystem已经被c 17接受,所以不再需要< code>experimental或< code>-lstdc fs标志,对吗? 错了,我甚至不能 只给了我< code >实验版本,我怎么能包括正式接受的版本呢?
问题内容: 即使等待1的linux手册页很好地说明了您需要让子进程不使其变成僵尸,但它根本无法说明原因。 我围绕一个Ever 循环计划了我的程序(这是我的第一个多线程程序,所以请原谅我的天真),该循环启动子进程,该子进程被ed淘汰,并确保自行终止。 我无法使用,因为这使并行计算变得不可能,因此我可能不得不添加一个存储子pid的进程表,并且不得不使用-不是立即执行,而是经过一段时间- 这是一个问题,
问题内容: 我使用以下Dockerfile创建了一个Docker容器(已截断): 等等。 所有这些都可以,但是我的问题是软件包的安装方式/位置。 如果我仅使用rvm运行rvm,则会显示“无法找到rvm”,但是如果运行,它会起作用。(我在网上找到了“ -l -c”选项,但不知道它们的作用,也找不到令人满意的解释!) 这不是一个docker问题-这是一个bash / * nix问题-我认为存在一些关于
问题内容: 每个人都告诉我“使用super.viewDidLoad()因为它就是这样”或“我一直那样做,所以要保留它”,“如果不叫super就是错误的”等。 我只发现了一些有关Objective-C用例的主题,这些主题并没有那么启发性,但是我正在Swift 3中进行开发,所以有什么专家可以给我一个很好的详细解释吗? 这是一种良好实践的案例还是有任何隐藏的影响? 问题答案: 通常,最好为您覆盖的所有