我正在使用Apache Camel的AMQP组件来收听来自ActiveMQ Artemis主题的消息。
此应用程序在Kubernetes上运行,有两个副本。
我已经配置了一个持久订阅,每个pod有一个唯一的clientId和一个通用订阅名称:
<route autoStartup=true" id="myRoute">
<from id="_amqp_topic" uri="amqp:topic:xxx?connectionFactory=#amqpCF&disableReplyTo=true&transacted=false&subscriptionDurable=true&clientId={{container-id}}&durableSubscriptionName=eventSubscription"/>
<log loggingLevel="INFO" message="Received event: ${body}"/>
...
</route>
问题是两个pod都接收消息,而只有一个应该接收。我正在尝试实现类似于Kafka的消费者组的东西,其中只有一个组成员接收每条消息。
如果只希望一个订阅者接收消息,则订阅者必须共享同一订阅。因此,你需要:
\u amqp\u主题中设置subscriptionShared=true
,uri
例如:
xml prettyprint-override"><route autoStartup=true" id="myRoute">
<from id="_amqp_topic" uri="amqp:topic:xxx?connectionFactory=#amqpCF&disableReplyTo=true&transacted=false&subscriptionDurable=true&clientId=myClientID&durableSubscriptionName=eventSubscription&subscriptionShared=true"/>
<log loggingLevel="INFO" message="Received event: ${body}"/>
...
</route>
另一种选择是使用队列而不是主题,例如:
<route autoStartup=true" id="myRoute">
<from id="_amqp_queue" uri="amqp:queue:xxx?connectionFactory=#amqpCF&disableReplyTo=true&transacted=false"/>
<log loggingLevel="INFO" message="Received event: ${body}"/>
...
</route>
当一个组中只有一个消费者,并且认为消费者无法在session.time.out内进行轮询时,将触发重新平衡,但是在这种情况下,组中只有一个消费者,现在假设session.time.out是30秒和消费者民意调查后50秒组协调员将识别消费者后50秒,并允许它提交偏移或协调员将断开消费者和没有偏移得到提交,并将重新平衡消费者与新的消费者标识?如果上次提交的偏移量是345678,在下一次轮询中,它处理了
我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。
null null 使用简单消费者或低级消费者可以控制分区,但如果一个实例宕机,其他三个实例将不会处理来自第一个实例中使用的分区的消息
我运行生产者,它生成N条消息,我在仪表板上看到它们。当我运行接收器时,它会接收来自队列的所有消息,并且队列为空。 我需要有多个生产者生成消息到同一个队列。多个客户从队列中接收消息。消息将被队列TTL删除。但是现在第一个接收者从队列中获取所有消息。我怎么能做到这一点?
我正在尝试用redis streams实现一个java应用程序,其中每个consomer只使用一条消息。就像管道/队列一样,每个使用者只接收一条消息,对其进行处理,完成后,使用者接收流中尚未处理的下一条消息。有效的方法是,每条消息只被一个消费者(使用xreadgroup)使用。 我从redislabs开始学习本教程 守则: 我当前的问题是,一个消费者从队列中获取多条消息,在某些情况下,其他消费者正
我有一个主应用程序将消息发送到SQS队列,希望4个消费者应用程序使用相同的消息,并按自己的意愿进行处理 我不确定用于此目的的队列体系结构。 我看到标准SQS、SQS FIFO、(SQS SNSTopic)的选项 对于我想要的功能,似乎(SQS SNS主题)或Kenesis将是一条可行的道路。 但是我也有一个关于标准SQS的问题 我想我是混淆之间的所有选项和压倒了所有的信息可用的队列但仍然感到困惑哪