当RabbitMq消息到达队列时,我目前正在使用IntegrationFlow来触发作业执行。IntegrationFlow的AmqpInFronChannelAdapter和作业的第一步的ItemReader都配置为从同一队列中读取消息。
我遇到的问题是IntegrationFlow的AmqpInboundChannelAdapter读取RabbitMQ消息,然后ItemReader再也找不到该消息。可能是因为IntegrationFlow在作业启动之前确认了该消息。
是否有办法防止IntegrationFlow消费/确认消息,将其留在队列中,以便ItemReader可以按预期工作?我尝试配置AmqpInboundChannelAdapter来重新获取消息,但这只会导致适配器无限循环地重新读取它自己的消息。
这个问题在某种程度上描述了我的问题,除了我没有进行任何处理之外,我只是试图将IntegrationFlow用作JobLaunching触发器。因此,解决方案似乎是一种反模式。
Spring Batch集成-传递数据b/w集成和批处理
任何帮助都将不胜感激
如果批处理作业只需要来自该消息的信息,我建议绑定第二个队列,使用相同的路由密钥;一个队列用于触发器,一个队列用于项目读取器。
如果第一条消息是触发器并且项目阅读器随后读取多条消息,您可以将消息内容添加到JobParameters
。您还需要将适配器的预取设置为1,以便在处理此消息时不会发送任何其他消息。
我有一个spring批处理应用程序,它从文件中读取数据,进行一些处理,最后编写一个定制的输出。这一切都是一步到位的。在下一步中,我将使用一个tasklet来归档输入文件(移动到另一个文件夹)。这个应用程序运行良好。但是,现在我需要在远程服务器上对sftp输出文件进行进一步处理。我找到了一种使用spring integration实现sftp的方法,在这里我创建了一个输入通道,该通道将反馈给outb
我能有一份只有奴隶而没有主人的工作,听rabbitmq队列吗?我想在spring boot应用程序中使用spring批处理和spring集成来侦听队列并以面向块的方式处理消息。 我想使用Michael Minella(https://www.youtube.com/watch?v=30Tdp1mfR0g)在Spring批处理的远程组块示例中解释的配置,但没有主配置。 下面是我的工作配置。 下面是我
我正在尝试将BeanIO与spring Batch集成。使用BeanIO,我正在读取一个固定长度的流文件。我已经测试并验证了使用独立类读取平面文件的代码,它可以无缝地工作,但是当我试图将它与Spring Batch集成时,BeanIOFlatFileItemReader的doRead()方法没有被调用,而且我编写的RedemptionEventCustomProcessor是如何直接被调用的。 我
Spring Integration Java DSL Reference和Spring Batch Java配置文档说明了如何将Java配置用于Spring Integration和Spring Batch。 但它们没有说明如何为Spring批处理集成配置它。如何使用DSL配置JobLaunchingGateway? 干杯,曼诺
我为RabbitMQ制作了一个消费者,作为一个用C#.NET编写的控制台应用程序。它被编程为永久监听队列,每当它在队列中发现消息时,它就处理它。使用者平均每秒处理35条消息。使用者被安排在系统启动时在任务计划程序中运行。消费者运行良好的3-4天。但是,它们继续运行,但不处理任何消息,尽管队列中有消息。当使用者停止并再次启动时,它再次开始正确处理消息。但是,当您手动重新启动时,数以百万计的消息排在队
我们一直在使用SI Kafka进行一个新项目,并取得了很大成功。在最近的一次切换之前,我们使用KafkaTopicOffsetManager来管理我们的消费者主题偏移量。为了避免每个消费者/主题对都有额外的主题,并使用Burrow或lag监控,我们决定使用最新的KafkaNativeOffsetManager,它使用Kafka提供的本机偏移管理。但在切换之后,我们注意到目标主题的消息消耗持续滞后。