当前位置: 首页 > 知识库问答 >
问题:

多线程Spring集成DSL

艾宁
2023-03-14

我想创建一个简单的IntegrationFlow与Spring集成,我有困难。

我想创建一个集成流,从Rabbit Mq中的多个队列中获取消息,并将消息发布到不同的Restendpoint。

我想知道我是否可以并行化这个。

我想检查两个场景的可行性:

  • 首先,我想为每个RabbitMq队列创建一个线程,该队列在接收到消息后将侦听并执行流:

场景1

  • 第二个场景:在这个场景中,我想为每个队列创建一个动态的线程数,线程数根据消息的数量增减

场景2

 HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        RestTemplate restTemplate = new RestTemplate();
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(BOUTIQUE_QUEUE_NAME);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return IntegrationFlows.from(Amqp.inboundAdapter(container)) /* Get Message from RabbitMQ */
                .handle(msg ->
                {
                    String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
                    HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
                    restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
                    System.out.println(msgString);
                   
                })
                .get();
    }

共有1个答案

麹培
2023-03-14

对于第一个场景,只需为每个配置一个入站适配器,并将输出通道设置为下游流的公共通道。

对于第二种场景,只需在侦听器容器上设置concurrentcumers和maxconcurrentcumers,它就会根据需要放大/缩小线程。

请参阅Spring AMQP文档。

 类似资料:
  • 在我的主流程中,我有一个线程池ExecutorService,我用我调用的“已知”数量的可调用项来填充它。 此外,还有另一个ExecutorService对象称为“全球池”(我认为ExecutorService是线程安全的,我可以从不同的线程向其添加任务)。 现在,上面的每个Callable都会产生新的任务,并将它们(提交)到这个共享的“全球池”。 问题是,我无法知道(无法阻止)所有任务何时完成,

  • 我正在使用带有以下配置的spring集成kafka 1.1.0。我不太了解流的配置。当我增加这个值时,Spring会自动生成更多线程来处理消息吗?e、 g.当我有流=2时,相关的转换器和服务激活器是否都在2个线程中运行?我想错过一些线程执行器配置,但不知道如何配置。任何提示都将不胜感激。谢谢 已尝试使用消息驱动通道适配器,但无法使其工作,以下配置未拾取任何消息。还尝试了组织。springframe

  • 我想了解Spring集成中如何处理消息:串行或并行。特别是我有一个带有轮询器和HTTP出站网关的入站通道适配器。我猜拆分器、变压器、标头丰富器等不会产生自己的线程。 我可能错过了,但是这些细节在留档的某个地方指定了吗? 还可以通过编程方式获取系统中的所有频道吗?

  • 我有一个多线程应用程序,它使用RabbitTemplate(带有CachingConnectionFactory)向代理发送消息。我需要确保信息是按顺序传递的。我自己在《确保多线程环境中的消息顺序的Spring AMQP》中看到了这个答案 谢谢你!

  • 目前,我正在开发一个Spring集成应用程序,该应用程序具有以下场景。 有一个转换器可以将传入的消息转换为特定的对象类型 转换完成后,我们需要将其写入日志文件和数据库表,然后最终发送到JMS出站适配器。 null

  • null 如何在transform()步骤中添加Jaxb2Marshaller?