假设我们有一个AWS FIFO SQS队列和两个消息生产者A和B。每条消息都发送了一个等于生产者名称的组ID。换句话说,生产者A将组ID“A”添加到每条消息中,生产者B将组ID“B”添加到每条消息中。我们还有3个消费者X、Y和Z正在使用可见性超时的消息。让我们假设队列中有5条消息——3条来自生产者A的消息和2条来自生产者B的消息。见下图
考虑到给定的条件,我们将有以下工作流程:
其中一个消费者,例如X,从队列中接收组id为B的消息1,这使得此消息和组B的所有其他消息不可见,直到消息1被处理并从队列中删除。
然后另一个消费者,例如Y,接收到组id为A的消息2,这使得消息2和组A的所有其他消息不可见,直到消息2被处理并从队列中删除。
现在,消费者Z无法消费任何消息,因为A组被已处理的消息2阻止,B组被已处理的消息1阻止。
是否有一种技术允许消费者Z在给定情况下使用队列中的下一条消息?
更新1:为什么我使用FIFO队列和组ID?
让我们假设生产者A和B代表两个用户,并且使用简单队列而不是FIFO。消息也没有附加组ID。
考虑这样一个场景:生产者a向队列发送100条消息,之后生产者B也只向队列发送一条消息。生产者B的这条消息必须等到A的所有消息都被处理后,这是不好的。我们需要在A和B的消息之间进行负载平衡,尽管A有100条消息,而B只有一条。
为此,让我们尝试添加组ID,并且由于只有FIFO队列支持它们,我们必须将简单队列替换为FIFO队列。现在上述问题得到了解决。当任何生产者的A消息处于飞行状态时,其中一个消费者将收到生产者B的消息,即使该消息位于队列的后面。我们现在在A和B之间进行负载平衡。
然而,当所有组都有正在传输的消息时,问题就出现了(在这种情况下,队列看起来是空的),但我们目前有更多可用的消费者无法工作,这也不太好。
更新2:建议的可能解决方案。
每个生产者有多个组ID
假设我们有10个消费者,只有一个生产者A。让我们将1到10的数字添加到每个消息组id,再加上一些表示一个批次的唯一id,如果有10条消息,那么我们将有组id“A1-batch1”、“A2-batch1”、“A3-batch1”等等,直到“A10-batch1”。如果生产者A有更多的消息,我们会增加批次号并为另一个10生成组ID,然后再为另一个10生成组ID。现在每个消费者都保证收到一条消息,这很好。但是,如果生产者B现在发送一条消息,在最坏的情况下,生产者A和生产者B之间的平衡比将是10比1,这不是很大。此外,消费者水平可扩展,因此生产者必须知道当前消费者的大致数量。
每个生产者单独的队列
生产者是当前使用该服务的用户。当用户连接到服务时,我们必须创建一个简单的队列,并将添加的队列通知消费者。消费者将必须连续拉动每个当前存在的队列,并且即使有消息正在传输,也应该能够接收新消息。在这种情况下,负载平衡很好。此解决方案增加了架构的复杂性,但应该可以工作。除非我错过了一些技术限制。
队列将按照您的描述运行,这是有意的。
只有两个不同的组ID。如果两个组ID都在运行中,则无法检索其他消息。
如果这给您带来了问题,那么您很可能不正确地使用了组ID。
组ID基本上表示“请按顺序处理这组消息”。因此,如果一条消息仍在处理中,SQS FIFO队列会阻止检索和处理来自同一组ID的另一条消息。您想要获取具有相同组ID的另一条消息的事实向我表明,您不希望该组消息按顺序处理,因此您应该使用不同的组ID。
通过使用链接到生产者的组ID,并且只有两个生产者,您将只有两个消费者处理队列。
我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。 JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该
我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。
由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。
问题内容: 因此,我已经看到了许多在Go中实现一个消费者和许多生产者的方法-Go 并发中的经典fanIn函数。 我想要的是fanOut功能。它以一个通道作为参数,它从中读取一个值,并返回一个通道片,该通道将这个值的副本写入其中。 有没有正确/推荐的方法来实现这一目标? 问题答案: 您几乎描述了执行此操作的最佳方法,但这是执行此操作的一小段代码示例。 去游乐场:https : //play.gola
我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个
我创建了一个带有三个分区的Kafka主题。使用Spring Kafka中的ProducerFactory,我可以创建一个producer实例。但是,我想创建三个生产者实例,因为我有三个分区。类似地,我想要三个consumer的实例。我该怎么做?请帮忙。