当前位置: 首页 > 知识库问答 >
问题:

在 kafkacat 消费者之间共享消息

曹智
2023-03-14

这与以下问题几乎相同:发送给具有相同消费者组名称的所有消费者的消息。公认的答案是使用Kafka 0.8.1或更高版本,我就是这么做的。

Kafka留档说:

如果所有使用者实例都具有相同的使用者组,则其工作原理就像在使用者之间平衡负载的传统队列一样。

但是我无法使用 Kafka 0.8.2.1 和 kafkacat 观察到这种行为。

我的设置:

  • Kafka Zookeeper 运行在 spotify/kafka 容器中(通过 boot2docker)
  • 一个制片人
  • 具有相同 group.id 的两个使用者

首先,在运行Kafka容器时,我创建了一个主题:

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic beta

两位消费者订阅了同一主题:

kafkacat -C -b $(boot2docker ip):9092 -t beta -X group.id=mygroup

然后我使用kafkacat生成一条消息:

date | kafkacat -P -b $(boot2docker ip):9092 -t beta

我以为只有一个消费者收到信息,但实际上他们两个都收到了。我做错了什么?

编辑:当我尝试使用kafka-console-consumer.sh运行相同的消费者时,一切都很好:

echo "group.id=mygroupid" > /consumer.beta.properties

$KAFKA_HOME/bin/kafka-console-consumer.sh \
  --zookeeper localhost:2181 \
  --topic beta \
  --consumer.config /consumer.beta.properties

所有工作都按预期工作:消息仅使用一次。我认为问题出在Kafka特身上。

共有2个答案

西门洛城
2023-03-14

事实证明,librdkafka(kafkacat正在使用的库)不支持此功能。引用Github问题:

简答,暂时不支持这个。此外,只有当您使用JVM消费者/生产者时,您在Kafka文档中看到的内容才是真实的。假设任何其他消费者/生产者实现都不完全遵循文档。

高建本
2023-03-14

kafkacat现在支持0.9高级KafkaConsumer:

< code>kafkacat -b

 类似资料:
  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 本文向大家介绍简述消费者与消费组之间的关系相关面试题,主要包含被问及简述消费者与消费组之间的关系时的应答技巧和注意事项,需要的朋友参考一下 消费者从属与消费组,消费偏移以消费组为单位。每个消费组可以独立消费主题的所有数据,同一消费组内消费者共同消费主题数据,每个分区只能被同一消费组内一个消费者消费。

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我有2个Kafka消费者共享相同的消费者组ID,但订阅不同的主题。它们中的每一个都只能从相应的主题中阅读。 当第一个使用者运行时,会从其订阅的主题中为其分配分区。当第二个使用者也运行时,使用者组会重新平衡(导致分配给第一个使用者的分区被撤销)。到目前为止,一切顺利。这与Kafka消费群体Id和消费者再平衡问题中的讨论一致。 但是,我开始在消费者1中看到TOPIC_AUTHORIZATION_FAI

  • 我有一个Kafka主题,目前有3个分区。我希望我的消费者从同一个分区读取,但每条消息都应该以循环方式发送给不同的消费者。有可能实现吗?

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认