我有一个要求,我需要保存/缓冲在通道上接收到的消息,并根据消息数量或超时意味着1分钟内没有接收到消息而持久化在数据库中。有没有办法在Spring集成中实现这一点
IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(sourceQueue))
.transform(someTransform, "transform")
.handle(someService, "save")
.get();
有一个<代码>。aggregate()操作符,基于EI模式实现。
您可以使用JdbcMessageStore对其进行配置,以缓冲消息并将其存储到数据库中。
您可以通过释放策略(根据收到的每条消息)将它们保持在那里直到出现某种情况,或者由于组超时而释放它们。
如果您对之后将它们全部作为单个聚合消息不感兴趣,您可以考虑使用SimpleMessageGroupProcess
,它只生成集合
请参阅参考手册中有关聚合器的更多信息:https://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#aggregator
我无法解决这个问题,现在已经坚持了很长时间。我是一个spring-integration-dsl的初学者,任何帮助都将非常感谢。
我已经建立了一个简单的Spring集成流程,该流程由以下步骤组成: 然后定期轮询一个rest api 对有效载荷做一些处理 并将其置于Kafka主题上。 请遵守以下代码: 这非常有效,然而,我正在努力想出一些好的测试。 我应该如何模拟外部RESTAPI
我已经使用最新的可用版本建立了一个新的Spring Boot Spring Integration Spring Integration Java DSL项目。项目构建正常,但当我运行应用程序时,我得到: 当前使用的依赖项如下: 错误可能是由于jar版本的错误组合吗?我不确定如何调试此错误。
在下面的问题上,我可以得到一些帮助吗:调用transformer将输入对象转换为Map对象并调用处理程序,处理程序缺少之前添加的头值。为什么要将有效负载转换为映射对象,以丢失所有标头? 如果转换不是map,则标头中没有问题。 谢了湿婆
我想创建一个简单的IntegrationFlow与Spring集成,我有困难。 我想创建一个集成流,从Rabbit Mq中的多个队列中获取消息,并将消息发布到不同的Restendpoint。 我想知道我是否可以并行化这个。 我想检查两个场景的可行性: 首先,我想为每个RabbitMq队列创建一个线程,该队列在接收到消息后将侦听并执行流: 场景1 第二个场景:在这个场景中,我想为每个队列创建一个动态