当前位置: 首页 > 知识库问答 >
问题:

在SQS队列中使用多个消费者

云胤
2023-03-14

我知道可以使用多个线程使用SQS队列。我想保证每封邮件都会被消费一次。我知道可以更改消息的可见性超时,例如,等于我的处理时间。如果我的进程花费的时间超过可见性超时时间(例如连接速度慢),则其他线程可以使用相同的消息。

保证消息只处理一次的最佳方法是什么?

共有3个答案

颜实
2023-03-14

AWS SQS API在您使用API等读取消息时不会自动“消费”消息。开发人员需要自己打电话删除消息。

SQS确实有一个称为“重新驱动策略”的功能,作为“死信队列设置”的一部分。您只需将读取请求设置为1。如果消费进程崩溃,对同一消息的后续读取将把消息放入死信队列。

SQS队列可见性超时可设置为12小时。除非您有特殊需要,否则您需要实现进程将消息处理程序存储在数据库中以允许其进行检查。

公西嘉玉
2023-03-14

收到消息时,将消息或消息引用存储在对消息ID具有唯一约束的数据库中。如果该ID存在于表中,则表示您已经收到它,并且数据库将不允许您再次插入它——因为存在唯一约束。

乌骏
2023-03-14

保证消息只处理一次的最佳方法是什么?

你要求的是保证——你得不到保证。您可以将消息被多次处理的概率降低到非常小的程度,但您不会得到保证。

我将解释原因,以及减少重复的策略。

  • 当您将消息放入SQS中时,SQS可能会多次收到该消息
    • 例如:在发送消息时,一个轻微的网络故障导致了一个瞬时错误,该错误被自动重试-从消息发送者的角度来看,它失败了一次,成功发送了一次,但SQS同时收到了这两条消息
    • 与第一个示例类似——有很多计算机在幕后处理消息,SQS需要确保没有任何信息丢失——消息存储在多个服务器上,这会导致重复吗

    在大多数情况下,通过利用SQS消息可见性超时,来自这些来源的重复机会已经非常小——就像百分之一的一小部分。

    如果处理重复项真的没有那么糟糕(努力使消息消费幂等!),我认为这已经足够好了——进一步减少重复的机会是复杂的,而且潜在的成本高昂。。。

    好了,我们从兔子洞下去。。。在较高级别上,您希望为消息分配唯一的ID,并在开始处理之前检查正在进行或已完成的ID的原子缓存:

    • 确保您的消息在插入时提供唯一标识符
      • 没有这个,你将无法区分重复的。
      • 如果您的消息接收者需要开箱发送消息以进行进一步处理,那么它可能是另一个重复源(原因与上述类似)
      • InProgress条目应根据处理失败时恢复的速度设置超时
      • 根据您希望重复数据消除窗口的时间,完成的条目应该有一个超时
      • 最简单的可能是番石榴缓存,但只适用于单个处理应用程序。如果您有大量消息或分布式消费,请考虑为该作业建立一个数据库(使用后台进程来扫描过期的条目)
      • 不过,你可能负担不起无限的存储空间。
      • 请记住,如果没有所有这些,复制的机会已经相当低。根据邮件重复数据消除对您来说值多少时间和金钱,您可以跳过或修改任何步骤
      • 您的应用程序可以在处理消息后立即崩溃/挂起/执行很长的GC,但在MessageId为“已完成”之前(也许您正在为此存储使用数据库并且与它的连接已关闭)
      • 在这种情况下,“处理”最终会过期,另一个线程可以处理此消息(要么是在SQS可见性超时也过期之后,要么是因为SQS中有重复项)。

 类似资料:
  • 我用java编写我所有的微服务。我想在Amazon SQS中使用多个消费者,但每个消费者在负载均衡器后面的AWS上有多个实例。 我使用SNS作为输入流 我在SNS之后使用SQS标准队列。 我在stackoverflow上发现了同样的问题(使用多个消费者的Amazon SQS) 此示例为 https://aws.amazon.com/fr/blogs/aws/queues-and-notificat

  • 我有一个基于服务的应用程序,它使用Amazon SQS,具有多个队列和多个消费者。我这样做是为了实现一个基于事件的架构,并解耦所有服务,其中不同的服务对其他系统状态的变化做出反应。例如: 注册服务: 当新用户注册时,发出事件“registration new” 在用户更新时发出事件'user-更新'。 从队列“registration new”(注册新)中读取,并为搜索中的用户编制索引 从“注册-

  • 我有以下场景:有3个rabbitmq队列,生产者根据消息的优先级将消息推送到这些队列。(myqueue_high,myqueue_medium,myqueue_low)我希望有一个可以按顺序或优先级从这些队列中提取的单一使用者,即只要消息在那里,它就一直从高队列中提取。它是从介质中拉出来的。如果medium也是空的,它从Low拉出。 我如何实现这一点?我需要编写自定义组件吗?

  • 我有一个主应用程序将消息发送到SQS队列,希望4个消费者应用程序使用相同的消息,并按自己的意愿进行处理 我不确定用于此目的的队列体系结构。 我看到标准SQS、SQS FIFO、(SQS SNSTopic)的选项 对于我想要的功能,似乎(SQS SNS主题)或Kenesis将是一条可行的道路。 但是我也有一个关于标准SQS的问题 我想我是混淆之间的所有选项和压倒了所有的信息可用的队列但仍然感到困惑哪

  • 我有一个由第三方发布的JMS队列。我想在不同的机器上设置多个使用者,只有一台特定机器的使用者确认该队列上的消息。简而言之,如果特定机器的使用者没有接收到消息,那么该消息不应从队列中删除。这是可以实现的吗?

  • 我正在查看关于使用Quarkus从SQS消费的指南。 问题是我想在无休止的循环中执行它,例如每10秒获取一次新消息,并使用Hibernate Reactive从消息中插入一些数据到数据库中。 我创建了一个Quarkus调度程序,但由于它不支持返回Uni,我不得不阻止Hibernate Responsive的响应,因此出现了这个错误 使用Quarkus和reactive实现我所需的最佳方法是什么?