我想创建一个简单的IntegrationFlow与Spring集成,我有困难。
我想创建一个集成流,从Rabbit Mq中的多个队列中获取消息,并将消息发布到不同的Restendpoint。
我想知道我是否可以并行化这个。
我想检查两个场景的可行性:
场景1
场景2
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
RestTemplate restTemplate = new RestTemplate();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(BOUTIQUE_QUEUE_NAME);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
return IntegrationFlows.from(Amqp.inboundAdapter(container)) /* Get Message from RabbitMQ */
.handle(msg ->
{
String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
System.out.println(msgString);
})
.get();
}
对于第一个场景,只需为每个配置一个入站适配器,并将输出通道设置为下游流的公共通道。
对于第二种场景,只需在侦听器容器上设置concurrentcumers和maxconcurrentcumers,它就会根据需要放大/缩小线程。
请参阅Spring AMQP文档。
在我的主流程中,我有一个线程池ExecutorService,我用我调用的“已知”数量的可调用项来填充它。 此外,还有另一个ExecutorService对象称为“全球池”(我认为ExecutorService是线程安全的,我可以从不同的线程向其添加任务)。 现在,上面的每个Callable都会产生新的任务,并将它们(提交)到这个共享的“全球池”。 问题是,我无法知道(无法阻止)所有任务何时完成,
我正在使用带有以下配置的spring集成kafka 1.1.0。我不太了解流的配置。当我增加这个值时,Spring会自动生成更多线程来处理消息吗?e、 g.当我有流=2时,相关的转换器和服务激活器是否都在2个线程中运行?我想错过一些线程执行器配置,但不知道如何配置。任何提示都将不胜感激。谢谢 已尝试使用消息驱动通道适配器,但无法使其工作,以下配置未拾取任何消息。还尝试了组织。springframe
我想了解Spring集成中如何处理消息:串行或并行。特别是我有一个带有轮询器和HTTP出站网关的入站通道适配器。我猜拆分器、变压器、标头丰富器等不会产生自己的线程。 我可能错过了,但是这些细节在留档的某个地方指定了吗? 还可以通过编程方式获取系统中的所有频道吗?
我有一个多线程应用程序,它使用RabbitTemplate(带有CachingConnectionFactory)向代理发送消息。我需要确保信息是按顺序传递的。我自己在《确保多线程环境中的消息顺序的Spring AMQP》中看到了这个答案 谢谢你!
目前,我正在开发一个Spring集成应用程序,该应用程序具有以下场景。 有一个转换器可以将传入的消息转换为特定的对象类型 转换完成后,我们需要将其写入日志文件和数据库表,然后最终发送到JMS出站适配器。 null
null 如何在transform()步骤中添加Jaxb2Marshaller?