当前位置: 首页 > 知识库问答 >
问题:

如何使应用程序的一个实例只接收每个AMQP主题消息(消费者群体行为)

戴博
2023-03-14

我正在使用Apache Camel的AMQP组件来收听来自ActiveMQ Artemis主题的消息

此应用程序在Kubernetes上运行,有两个副本。

我已经配置了一个持久订阅,每个pod有一个唯一的clientId和一个通用订阅名称:

<route autoStartup=true" id="myRoute">
    <from id="_amqp_topic" uri="amqp:topic:xxx?connectionFactory=#amqpCF&amp;disableReplyTo=true&amp;transacted=false&amp;subscriptionDurable=true&amp;clientId={{container-id}}&amp;durableSubscriptionName=eventSubscription"/>
    <log loggingLevel="INFO" message="Received event: ${body}"/>
    ...
</route>

问题是两个pod都接收消息,而只有一个应该接收。我正在尝试实现类似于Kafka的消费者组的东西,其中只有一个组成员接收每条消息。

共有1个答案

凌俊名
2023-03-14

如果只希望一个订阅者接收消息,则订阅者必须共享同一订阅。因此,你需要:

  • \u amqp\u主题中设置subscriptionShared=trueuri

例如:

xml prettyprint-override"><route autoStartup=true" id="myRoute">
    <from id="_amqp_topic" uri="amqp:topic:xxx?connectionFactory=#amqpCF&amp;disableReplyTo=true&amp;transacted=false&amp;subscriptionDurable=true&amp;clientId=myClientID&amp;durableSubscriptionName=eventSubscription&amp;subscriptionShared=true"/>
    <log loggingLevel="INFO" message="Received event: ${body}"/>
    ...
</route>

另一种选择是使用队列而不是主题,例如:

<route autoStartup=true" id="myRoute">
    <from id="_amqp_queue" uri="amqp:queue:xxx?connectionFactory=#amqpCF&amp;disableReplyTo=true&amp;transacted=false"/>
    <log loggingLevel="INFO" message="Received event: ${body}"/>
    ...
</route>

 类似资料:
  • 当一个组中只有一个消费者,并且认为消费者无法在session.time.out内进行轮询时,将触发重新平衡,但是在这种情况下,组中只有一个消费者,现在假设session.time.out是30秒和消费者民意调查后50秒组协调员将识别消费者后50秒,并允许它提交偏移或协调员将断开消费者和没有偏移得到提交,并将重新平衡消费者与新的消费者标识?如果上次提交的偏移量是345678,在下一次轮询中,它处理了

  • 我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。

  • null null 使用简单消费者或低级消费者可以控制分区,但如果一个实例宕机,其他三个实例将不会处理来自第一个实例中使用的分区的消息

  • 我运行生产者,它生成N条消息,我在仪表板上看到它们。当我运行接收器时,它会接收来自队列的所有消息,并且队列为空。 我需要有多个生产者生成消息到同一个队列。多个客户从队列中接收消息。消息将被队列TTL删除。但是现在第一个接收者从队列中获取所有消息。我怎么能做到这一点?

  • 我正在尝试用redis streams实现一个java应用程序,其中每个consomer只使用一条消息。就像管道/队列一样,每个使用者只接收一条消息,对其进行处理,完成后,使用者接收流中尚未处理的下一条消息。有效的方法是,每条消息只被一个消费者(使用xreadgroup)使用。 我从redislabs开始学习本教程 守则: 我当前的问题是,一个消费者从队列中获取多条消息,在某些情况下,其他消费者正

  • 我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是