我正在尝试配置一个轮询器,该轮询器每X秒查询一个bean,以将列表放入一个通道。该通道有一个下游流,该流将列表拆分并输出到发布/子通道(进一步的异步流)。我如何确保在任何给定的时间,只有在流执行过程中,轮询器必须等待/阻止流完成,直到为下一次轮询做好准备(固定速率/延迟)?
<int:channel id="configListChannel" />
<task:executor id="pollExecutor" pool-size="1" queue-capacity="1" rejection-policy="ABORT" />
<int:inbound-channel-adapter expression="configMap().values()" auto-startup="true" channel="configListChannel">
<int:poller fixed-delay="30" time-unit="SECONDS" task-executor="pollExecutor"/>
</int:inbound-channel-adapter>
<task:executor id="configExecutor" pool-size="5"/>
<int:channel id="configChannel" >
<int:dispatcher task-executor="configExecutor"/>
</int:channel>
<int:chain input-channel="configListChannel" output-channel="configChannel" id="configChain">
<int:splitter/>
<int:filter expression="payload.enablePolling"/>
</int:chain>
...在confiChannel
上进一步异步流以发送出站消息
有没有使用异步切换的阻塞轮询器示例,以及使用barrier向轮询器线程发送完成流信号的示例?而且一次只能进行一次投票。
我建议您实现一个ReceiveMessageAdvice
(从5.3或AbstractMessageSourceAdvice
开始)。它是afterReceive()
应该按原样返回消息,但是beforeceive()
应该检查一些状态,如果此时无法轮询,则返回false
。
您可能不需要为该任务设置障碍,但只需简单的原子布尔
bean即可检查该中的状态,然后在下游完成任务时将其返回到
true
。
我目前正致力于提高集成流的性能,试图实现消息处理的并行化。我已经实现了所有使用Java DSL。 当前的集成流使用固定的轮询器从队列通道获取消息,然后依次通过多个处理程序对消息进行串行处理,直到消息到达最终的处理程序,该处理程序将考虑上一个处理程序的每个输出进行一些最终计算。它们都连接在同一个集成流中。基本上,这些处理程序将调用打包到外部系统。这里我需要保留的重要一点是,在完成前一个消息的所有下游
本文向大家介绍请你说一下阻塞,非阻塞,同步,异步相关面试题,主要包含被问及请你说一下阻塞,非阻塞,同步,异步时的应答技巧和注意事项,需要的朋友参考一下 参考回答: 阻塞和非阻塞:调用者在事件没有发生的时候,一直在等待事件发生,不能去处理别的任务这是阻塞。调用者在事件没有发生的时候,可以去处理别的任务这是非阻塞。 同步和异步:调用者必须循环自去查看事件有没有发生,这种情况是同步。调用者不用自己去查看
如何使用java dsl Integrationflows从spring集成触发spring批处理作业。 我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。
我正在使用下面的测试用例运行这个程序: 我得到这个错误: 当我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和StachOverflow的建议,所有这些都导致了不同的其他错误,我似乎从主要问题上转移了注意力。 最常见的建议是添加一个全局轮询器,但这会导致以下错误: 原因:java.lang.IllegalArgumentException:不应为终结点“org.SpringFramewo
我阅读了Spring集成指南和这里的示例,获得了Spring集成SFTP程序的工作示例。我已经有了一个可以工作的Spring批处理程序,它可以读取大量文件并转储到数据库中。 我现在正试图通过查看Spring文档来集成Spring批处理和Spring集成程序,我创建了以下配置。 我使用下面的测试用例运行这个程序: 我得到了这个错误: 在我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和St
我认为下面的流量链将通过事件循环放置/执行(像JS)。因此,运行下面的代码将首先打印阻塞循环&然后将执行通量链。 但是,整个通量总是先执行,然后才移动到循环。[我确实有一些语句正在阻塞。但是有两个阶段] 当我们使用reactor时,通过使用一些调度程序来实现异步/非阻塞行为的唯一方法? 如果我不使用任何调度器,并让代码使用当前线程执行,那么即使对于IO密集型应用程序,使用WebFlux而不是Spr