当前位置: 首页 > 知识库问答 >
问题:

如何使ActiveMQ检测来自消息发布者(幂等生产者)的重复消息

关宏毅
2023-03-14

ActiveMQ是否支持幂等生产者?我知道Camel有一个幂等消费者模式来检测和处理重复消息,但我想知道是否可以从源头(生产者)防止这种情况。

这里有一点背景。我有水平扩展的应用程序访问同一个数据库。有一个特定的表维护特定进程的状态。这些水平应用程序应该能够读取状态并调用另一个进程,但是只有一个应用程序能够调用它。一旦满足所需条件,该应用程序会定期轮询数据库并将消息发布到消息代理。但我希望其中一个负载平衡应用程序能够发布消息。

我认为一个粗略的方法是。。。

在机器1上:

  1. 读取数据库以检查是否满足必要条件。
  2. 在向代理发布消息之前,使用标识进程并提交的唯一密钥将记录写入另一个状态表。如果此操作因违反唯一密钥约束而失败,则表示另一台机器上的进程成功发布了消息。
  3. 将消息发布到经纪人
  4. 如果消息发布失败,由于某种原因,根据唯一键/主键对状态表执行删除操作。

在机器2、3、4等上运行的相同应用程序可以执行相同的操作。

下面是我用这种方法很快注意到的一个陷阱。

假设机器1能够完成步骤2,但执行步骤3失败,并继续执行步骤4。同时,当机器2在步骤2失败时,将继续运行,不再尝试再次读取状态并发布消息。

为了解决这个问题,我需要在第3步中设置retry,直到消息成功发布到broker。

另一个选择是使用https://camel.apache.org/components/latest/eips/idempotentConsumer-eip.html图案但这本质上是消费者方面的一个过滤器。虽然这可以达到我的目的,但在消息发布方面是否有类似的现成方法。

我想知道,这种方法是否正确或有更好的替代方法,或者任何可用于跨本地或远程JVM执行锁定机制的现有库。

共有1个答案

那弘
2023-03-14

目前尚不清楚您使用的是哪个版本的ActiveMQ(即ActiveMQ 5.x或ActiveMQ Artemis),因此我将尝试解决这两个版本的问题。

ActiveMQ 5。x没有任何内置的检测客户端发送的重复数据的支持。但是,您可以使用代理插件实现此功能。我在这里看到的唯一挑战是配置、管理和监控重复ID的缓存。

ActiveMQ Artemis确实内置了对检测从客户端发送的重复的支持。您可以在留档中阅读有关重复检测的更多信息。由于代理本身支持此行为,因此它提供了干净的配置、管理和监控。

在这两种情况下,您都需要在每条消息上设置一个特殊的标题,并使用“一个唯一的密钥来标识流程”,就像您为潜在的数据库解决方案所做的那样。此外,使用代理作为复制检测器总体上要简单得多。

如果您目前使用的是ActiveMQ 5. x,但想要迁移到ActiveMQ Artemis以使用重复检测功能,您不一定需要更新您的客户端,因为ActiveMQ Artemis完全支持5. x客户端使用的OpenWire协议。您应该能够将它们指向ActiveMQ Artemis的新实例并让一切正常工作。

 类似资料:
  • 这是我的消费者: 所以当运行我的制作人时,它最终会出错。任何人都知道这意味着什么,如果这可能是错的。

  • 我有一个JMS生产者和一个消费者,代理是ActiveMQ,参考下面的代码: 寄件人代码 接收码 问题是 ActiveMQ 队列无法接收来自发送方的消息(请参阅屏幕截图): 当我从 Web 控制台发送消息时,该消息在队列中收到,但来自创建者的消息不会进入队列。 另一个有趣的行为是(如队列接收器代码中所示,接收器在收到第一条消息后退出),同样,当我启动接收器时,它会收到相同的消息,并继续执行,直到我关

  • 我正在使用Spring Boot中的。Java 8 我的主要目的是,消费者不应重复使用信息。 1)调用表获取100行并将其发送到kafka 2) 假设我处理了70行(我得到了成功确认),然后Kafka宕机了(Kafka在RETRY机制计时内无法恢复) 因此,当我重新启动Spring启动应用程序时,我如何确保不再发送这70条消息。 一种选择是我可以在数据库表消息 中使用标志。 还有其他有效的方法吗?

  • 我们使用activemq作为Java独立应用程序的消息队列。我的问题是,基于activemq web控制台,队列有一定数量的消息排队和出列。但是,根据我在代码中添加的sysout语句,应用程序消耗的消息数似乎少于activemq web控制台上显示的消息数。例如,在activemq控制台上,没有。排队和出列的消息约为1800条。但是,在控制台上显示的出列消息数(我每接收一条消息就增加一个计数器)只

  • Kafka文件说,幂等生产者是可能的,与相同的生产者会话,我无法理解这一点。 比方说,Kafka为每条消息添加序列号,最后一个序列号在Kafka中维护(不确定它维护在哪里)。 它是如何生成序列号的,它保存在哪里? 为什么当制作人崩溃并再次出现时,它不能保持序列? 我怎样才能使它在制作人会话之间真正幂等?

  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外: