当前位置: 首页 > 知识库问答 >
问题:

如何保持Mule进程,直到JMS消耗完成处理

乐正远
2023-03-14

我的mule流中有JMS,其中生产者从缓存中读取记录,放入队列,消费者消费消息并进行进一步处理。以下是理解流程。

服务1(从文件读取数据)-

在上面的流中,从JMS组件,流成为异步,因此一旦生产者将所有记录放入队列中,响应就会返回给客户端,说进程已完成,但消费者可能仍然会使用消息。

我想保持来自生产者的流程以发回响应,直到消费者消费所有消息。

你知道如何实现吗?

共有3个答案

陶鹏
2023-03-14

您可以使用交换模式值作为请求-响应,以便流将等待来自 JMS 的响应。

蔡宏大
2023-03-14

如上所述,轮询完成状态可以正常工作,但仍存在某些事务在等待时间后未完成的风险,或在所有消息处理完毕后等待很长时间的风险。

根据本练习的最终目标,您也许可以利用Mule batch,它已经实现了将入站请求分割成单独的消息,在一个或多个消费者线程中处理消息,跟踪已处理和剩余的块,并在处理完所有数据后报告结果/执行最后的步骤。

如果您不能使用batch,并且需要将处理过的消息重组到一个列表或映射中,那么您可以让收集聚合器来完成通过关联ID跟踪消息和设置超时的工作。

实现它的简单DIY方法是为JMS发布组件构建某种调度程序逻辑。它将向JMS提交所有消息,然后等待每个使用者/工作线程(通过单独的JMS队列)使用具有相同关联ID的完成消息进行响应。然后,调度器将跟踪内存或永久存储中所有提交/处理的消息,并在批处理中的最后一条消息被确认后或通过预定义的超时时间进行响应。这与Mule batch已经做的非常接近。

干杯!马頔

巫英纵
2023-03-14

由于异步获取确切线程的副本并独立处理,因此创建者可能会将消息放入队列中的速度与使用者实际能够使用它之前一样快。
我能想到的一种方法是将消息放入队列的过程,方法是在它之前放置一个 sleep()
您可以使用 Groovy 组件并在其中使用 sleep() 来保持流动或减慢流程。
例如,如果将以下内容放入:

<scripting:component doc:name="Groovy">
  <scripting:script engine="Groovy"><![CDATA[
    sleep(10000);
    return message.payload;]]>
  </scripting:script>
</scripting:component>

在将消息放入队列之前,进程将稍微减慢,并将保持流10000毫秒,直到另一端的消费者实际消费它。

 类似资料:
  • 问题内容: 我分叉了多个流程,我想衡量完成整个任务需要多长时间,即所有分叉的流程都完成了。请告知如何使父进程等待所有子进程终止?我想确保在正确的时间停止计时器。 这是我使用的代码: 问题答案: 我将在“ else // parent”行之后的所有内容都移到for循环之外。在分叉循环之后,使用waitpid进行另一个for循环,然后停止计时并执行其余操作: 我假设如果子进程无法正常退出且状态为0,则

  • 我的流使用基于cron表达式的消息,我故意添加了一个groovy代码来抛出异常以测试JMS回滚。回滚不会将消耗的消息返回队列中。我在这里错过了什么吗? 这里是mule流,它应该在遇到异常后回滚mule消息。 这是此流引发的异常-

  • 问题内容: 我有一个用于打印iReport的按钮,但是显示报告需要花费一些时间,因此我创建了一个称为“加载框架”的类 ,当我按下按钮时,我试图调用该类,但是它不能正常工作。 我的按钮代码是: 问题答案: 就像所有与在Swing中执行长时间运行或阻塞任务有关的问题一样,从Swing中的并发开始,以更好地了解您要解决的问题。接下来看看最常见的解决方案Worker Threads和SwingWorker

  • 使用Spring Cloud Stream Kafka应用程序,我们如何确保流侦听器等待处理消息,直到一些依赖任务(例如引用数据填充)完成?下面的应用程序无法处理消息,因为消息传递得太早。我们如何保证Spring Boot App中的这种排序? < li >春-云-流:2.1.0.RELEASE < li >Spring启动:2.1.2 .释放 我发现截至 2018 年 5 月 15 日,Spri

  • 问题内容: 我有一个创建两个或多个子流程的主流程,我想让主流程等到所有子流程完成操作并退出吗? 问题答案: 一个对象有这个确切定义的方法:等待一个给定的子进程的完成(和,此外,对于重新调整它的退出状态)。 如果使用此方法,则可以防止进程僵尸闲逛太久。 (或者,您可以使用或来进行调用和等待。如果您不需要该进程的IO,那可能就足够了。但这可能不是一个选择,因为您的两个子进程似乎应该并行运行,而他们不会

  • 我有一个服务员线程想使用RabbitMQ direct exchange向Java中的客户线程发送一道寿司,但是我的客户没有收到这道菜。下面是我的服务员用来发布寿司菜肴对象的方法: 请注意,<code>dishKey</code>作为参数传递,并在之前的if-else语句中被确定为<code>的“tamagoDishKey”</code>或<code>“ebiDishKey”的 以下是我的客户用来