当前位置: 首页 > 知识库问答 >
问题:

为什么在使用Java DSL时必须在入站webflux网关上使用。fluxtransform(F->f)?

轩辕修能
2023-03-14
org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms
    at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:741) ~[spring-integration-jms-5.3.2.RELEASE.jar:5.3.2.RELEASE]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
    null

场景
我创建了一个使用四个网关的路由,用于在远程计算机上发出web请求的相当复杂的设置,但我

集成流程1:

入站webflux网关->出站jms网关

  @Bean
  public IntegrationFlow step1() {
    // request-reply pattern using the jms outbound gateway
    var gateway = Jms.outboundGateway(jmsConnectionFactory)
        .requestDestination("inboundWebfluxQueue")
        .replyDestination("outboundWebfluxQueue")
        .correlationKey("JMSCorrelationID");

    // send a request to jms, wait for the reply and return message payload as response
    return IntegrationFlows.from(webfluxServer("/example/webflux"))
        // won't work consistently without the next line
        .fluxTransform(f -> f)
        .handle(gateway).get();
  }
  @Bean
  public IntegrationFlow step2_using_webflux() {
    var gateway = WebFlux.outboundGateway("http://localhost:8080/actuator/health")
        .httpMethod(HttpMethod.GET)
        .expectedResponseType(String.class)
        // ignore headers
        .mappedResponseHeaders();

    return IntegrationFlows.from(jmsInboundGateway())
        // use webflux outbound gateway to send the request to the TEST_URL
        .handle(gateway).get();
  }

共有1个答案

董子平
2023-03-14

另一种方法是使用.channel(MessageChannels.flux())而不是.fluxtransform(f->f)。这样,我们实际上给WebFlux容器带来了一个反压力,使其在请求事件循环中等待可用的插槽。

因此,我们只是向JMS队列发送不尊重的反压力,而另一边的JMS消费者无法跟上。另外,我们在内部向同一个Netty服务器发送一个请求,再次为这些内部请求获取一个事件循环插槽。

如果您感兴趣,我编写了一个单元测试,看看是怎么回事:

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class IntegrationApplicationTests {


    @Autowired
    private TestRestTemplate template;

    @Test
    void testSpringIntegrationWebFlux() {
        var executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(100);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(10);
        executor.afterPropertiesSet();

        var numberOfExecutions = new AtomicInteger();

        for (var i = 0; i < 100; i++) {
            executor.execute(() -> {
                var responseEntity = this.template.getForEntity("/example/webflux", String.class);
                if (responseEntity.getStatusCode().is2xxSuccessful()) {
                    numberOfExecutions.getAndIncrement();
                }
            });
        }

        executor.shutdown();

        assertThat(numberOfExecutions.get()).isEqualTo(100);
    }

}
 类似资料:
  • 问题内容: 结合使用带有ReactJS的ES5开发,可以将组件描述如下: 在此的示例引用对象本身,这是预期的自然行为。 题 我的问题是: 如何使用ES6创建组件? 知道在JavaScript中使用new运算符时会引用实例化对象本身,因此有人可以告诉我使用bind的真正目的是什么?这和React的内部机制有关吗? 问题答案: 只是核心javascript。这是绑定事件的工作方式。这不是一个React

  • 本文向大家介绍为什么必须使用Twitter进行联网,包括了为什么必须使用Twitter进行联网的使用技巧和注意事项,需要的朋友参考一下 如果我们能够在线开发有价值的业务网络,那真是太棒了? 是的,通过“ Twitter”可以实现–实时通信枢纽成为一个出色的业务网络平台。通过在这种快节奏的环境中改进和实施我们的社交网络技能,Twitter可以用作构建强大的业务网络的起点。 对于Twitter的网络,

  • 关于在最新版本的GCC和Clang中编译有几个问题:实验::filessystems链接器错误 但是现在< code>filesystem已经被c 17接受,所以不再需要< code>experimental或< code>-lstdc fs标志,对吗? 错了,我甚至不能 只给了我< code >实验版本,我怎么能包括正式接受的版本呢?

  • 问题内容: 即使等待1的linux手册页很好地说明了您需要让子进程不使其变成僵尸,但它根本无法说明原因。 我围绕一个Ever 循环计划了我的程序(这是我的第一个多线程程序,所以请原谅我的天真),该循环启动子进程,该子进程被ed淘汰,并确保自行终止。 我无法使用,因为这使并行计算变得不可能,因此我可能不得不添加一个存储子pid的进程表,并且不得不使用-不是立即执行,而是经过一段时间- 这是一个问题,

  • 问题内容: 我使用以下Dockerfile创建了一个Docker容器(已截断): 等等。 所有这些都可以,但是我的问题是软件包的安装方式/位置。 如果我仅使用rvm运行rvm,则会显示“无法找到rvm”,但是如果运行,它会起作用。(我在网上找到了“ -l -c”选项,但不知道它们的作用,也找不到令人满意的解释!) 这不是一个docker问题-这是一个bash / * nix问题-我认为存在一些关于

  • 问题内容: 每个人都告诉我“使用super.viewDidLoad()因为它就是这样”或“我一直那样做,所以要保留它”,“如果不叫super就是错误的”等。 我只发现了一些有关Objective-C用例的主题,这些主题并没有那么启发性,但是我正在Swift 3中进行开发,所以有什么专家可以给我一个很好的详细解释吗? 这是一种良好实践的案例还是有任何隐藏的影响? 问题答案: 通常,最好为您覆盖的所有