当前位置: 首页 > 知识库问答 >
问题:

如何解决队列多消耗并发问题?

孔鹤龄
2023-03-14

我们的程序正在使用队列。多个消费者正在处理消息。

消费者执行以下操作:

  1. 从队列接收打开或关闭状态消息。
  2. 从存储库中获取最新状态。
  3. 比较存储库的状态和从消息接收到的状态。
  4. 如果开/关状态不同,则更新数据。(此时其他相关数据也更新。)

假设此过程由多个使用者处理,则预计会出现以下问题。

  1. 生产者发送消息1: on、2: off和3: on。
  2. 消费者A接收消息#1并将消息#1存储在存储中,因为没有最新数据。
  3. 消费者A收到消息#2。
  4. 此时,消费者B同时接收到消息#3。
  5. 消费者A和B同时从存储中读取最新数据(消息1)。
  6. 消费者B首先完成处理。不要更新存储库,因为开/关状态不变。(1: on,3: on)
  7. 然后消费者A完成处理。开/关状态发生了变化,因此它处理并保存了工作。(1: on,2: off)

在正常情况下,html" target="_blank">数据库中剩余的最新数据应处于打开状态
(这是因为消息是按on的顺序发送的-

有什么好办法解决这个问题吗?作为参考,我们使用的队列使用AWS Amazon MQ,存储使用AWS dynamoDB。使用Spring靴。

共有1个答案

湛光华
2023-03-14

这里的基本问题是,您需要按顺序使用这些“状态”消息,但您使用的是并发消费者,这会导致竞争条件和无序的消息处理。简而言之,您使用并发使用者的基本架构导致了这个问题。

您可能会在数据库中使用注释中建议的时间戳来制定某种解决方案,但这对客户机和存储在数据库中的额外数据来说是额外的工作,而这并不是严格必要的。

解决这个问题的最简单方法是串行地而不是并发地使用消息。有几种不同的方法可以做到这一点,例如:

  • 仅为具有“状态”消息的队列定义1个消费者。
  • 使用ActiveMQ的“独家消费者”功能来确保只有一个消费者接收消息。
  • 使用消息组将所有“状态”消息分组在一起,以确保它们被串行处理(即按顺序)。
 类似资料:
  • 本文向大家介绍JAVA如何解决并发问题,包括了JAVA如何解决并发问题的使用技巧和注意事项,需要的朋友参考一下 并发问题的根源在哪 首先,我们要知道并发要解决的是什么问题?并发要解决的是单进程情况下硬件资源无法充分利用的问题。而造成这一问题的主要原因是CPU-内存-磁盘三者之间速度差异实在太大。如果将CPU的速度比作火箭的速度,那么内存的速度就像火车,而最惨的磁盘,基本上就相当于人双腿走路。 这样

  • 这种需求类似于通过公开的REST服务API(Spring Boot)处理来自死信队列的消息。以便一旦调用REST服务,就会从DL队列中消耗一条消息,并将再次发布到主队列中进行处理。@RabbitListener(queues=“queue_name”)立即使用消息,这在场景中是不需要的。该消息只需由REST服务API使用。有什么建议或解决办法吗?

  • 本文向大家介绍如何解决 Redis 的并发竞争 Key 问题?相关面试题,主要包含被问及如何解决 Redis 的并发竞争 Key 问题?时的应答技巧和注意事项,需要的朋友参考一下 所谓 Redis 的并发竞争 Key 的问题也就是多个系统同时对一个 key 进行操作,但是最后执行的顺序和我们期望的顺序不同,这样也就导致了结果的不同! 推荐一种方案:分布式锁(zookeeper 和 redis 都可

  • 我使用Spring JMS和ActiveMQ,其中有一个客户机将消息推送到队列,有多个使用者线程监听并从队列中删除消息。有些时候,相同的消息会被两个使用者从队列中出列。我不希望这种行为,并希望确保仅有的一条消息由一个消费者线程处理。你知道我哪里出了问题吗? ActiveMQ 5.9.1配置:

  • 因此,我正在阅读Maged M.Michael和Michael L.Scott的文章《简单、快速、实用的非阻塞和阻塞并发队列算法》,有一个小问题我不明白: 假设我们有两个并发线程,它们在队列初始化后立即被触发。其中一个线程调用< code>enqueue,另一个调用< code>dequeue。是什么阻止它们同时访问虚拟节点的< code>next字段?当< code>enqueue线程写入< c

  • 问题内容: 也许这是一个愚蠢的问题,但我似乎找不到一个明显的答案。 我需要一个仅包含唯一值的并发FIFO队列。尝试添加队列中已经存在的值只会忽略该值。如果不是为了线程安全,那将是微不足道的。在Java中是否存在数据结构,或者在Interweb上是否存在代码snipit表现出这种行为? 问题答案: 如果您想要比完全同步更好的并发性,那么我知道有一种方法可以使用ConcurrentHashMap作为支