当前位置: 首页 > 知识库问答 >
问题:

寻找正确的mule组件以按fifo顺序解复用消息

花稳
2023-03-14

我一直在搜索用于解决以下问题的正确消息传递模式:

我有一个队列,它包含域中每个userid的消息,每个消息都是一个userChanged事件。业务要求是必须按照FIFO顺序处理特定用户ID的所有消息,如果在处理特定用户ID的消息期间发生错误,则在将该错误标记为已成功处理之前,不应再进行处理。必须处理所有消息,并且需要将此解决方案部署在集群ESB环境中。

我想将这些事件解复用到每个用户ID的FIFO队列中,这样我就可以并行处理来自不同用户ID的消息,并按顺序处理每个用户的消息。

让一个amqp入站endpoint读取队列中的所有消息,然后“整理”路由器将具有相同用户ID的消息路由到该用户ID的可重用业务逻辑流的同一实例,如果给定用户ID的实例不存在,则创建一个实例。这个场景的悬而未决的问题是:A。这样的路由器是否存在或者是否需要开发?b.业务逻辑流将需要在单个线程中执行,因此需要以持久的方式为流的每个实例维护消息的积压。c.执行其中一个实例时的错误应该停止该实例处理更多的消息,直到它被解决为止。

我曾想过一些变通方法,比如将用户ID“分桶”到各种预定义队列(userChangedEvent-0to1000,userChangedEvent-1000to2000等),这样我们就可以预定义我们需要的所有流和相关的amqp侦听器,并消除对动态流的需要,但我认为这是一个不太好的解决方案。

我感觉到必须有一个消息传递模式来解决这个问题,但我撕掉了我的EIP副本却没有用!如有任何建议,将不胜感激。

更新:从概念上讲,这是我想要的,一个带通道的demuxer(我称之为buckets),但我认为动态通道创建(每个用户ID创建1个通道)会更好:http://www.coralblocks.com/index.php/2014/06/Demultiplexing-with-coralqueue-for-paralle-processing/

共有1个答案

孔鸿哲
2023-03-14

您不可能同时拥有多个并发消费者并遵守fifo排序。但是,您可以使用称为“独占消费者”的特性,拥有多个非并发消费者,以提高可用性。您可以使用连接器的属性exclusiveConsumers来激活它。

关于错误时停止处理,我可以建议两种不同的方法:

  • 如果此业务需求可能发生更改,则可以使用复杂事件处理引擎作为事件的信号量。
  • 否则,您可以使用这里描述的断路器模式,这可能是最简单的方法。
 类似资料:
  • 我正在解析一系列文本文件中的一些模式,因为我想将它们解压缩到其他文件中。

  • 我将Apache Camel与Oracle高级队列和JMS结合使用时遇到了问题。 它是关于一个分发消息的应用程序。消息在Camel的帮助下被接收并在Oracle高级队列中排队。然后它们被Camel使用并转发到目标系统。对于消息传递失败的情况,在高级队列中定义了重试计数,以便重复消息传递。 如果Camel现在将消息退出队列并将其发送到不可用的目标系统,则会引发HttpOperationFailedE

  • 我有一个代码库,其中需要在一些唯一函数之前和之后按顺序调用一些常见函数 如: 问题是,任何时候创建一个新的独特的函数,或任何时候添加更多的commonX函数,那么代码中需要此模式的每个位置都必须匹配。 一种解决方案是使用宏 这是可行的。问题是:有没有一种非宏C的方法可以在零开销的情况下做到这一点,并且阻止错误,并且不要过于冗长地使用。 注意:如果不明显,“不要过于冗长”的要求意味着我调用的函数可能

  • 我每个websocket接收几十条消息,这些消息可能只差几毫秒就能到达。我需要用操作来处理这些数据,这些操作有时会花费一些时间(例如,在DB中的插入)。为了处理接收到的新消息,必须完成对前一个消息的处理。 我的第一个想法是用Node.js Bull(用Redis)准备一个队列,但恐怕太长了,无法运行。这些消息的处理必须保持快速。 我尝试使用JS迭代器/生成器(直到现在我还从未使用过),我测试了如下

  • 我计划使用AWS FIFO SQS在我的数据存储中记录每个项目的当前状态。我将使用每个项目的唯一标识符作为messageGroupId,以确保对每个项目的消息进行严格排序。 SQS是否确保如果属于特定组的消息在DLQ中,则在DLQ消息被删除或重新驱动回主队列之前,该组的消息对消费者不可见? 例如,可以按顺序接收以下三条消息: 我的轮询器成功地消耗了M1,但无法处理M2。它一直尝试,直到maxRec