当前位置: 首页 > 知识库问答 >
问题:

发送给具有相同消费者组名称的所有消费者的消息

陈斌蔚
2023-03-14

有以下消费者代码:

from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer

kafka = KafkaClient("localhost", 9092)

consumer = SimpleConsumer(kafka, "my-group", "my-topic")
consumer.seek(0, 2)
for message in consumer:
  print message

kafka.close()

然后我用脚本生成消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

问题是,当我将消费者作为两个不同的进程启动时,我会在每个进程中收到新消息。但是,我希望它只发送给一个消费者,而不是广播。

在Kafka的文献中(https://kafka.apache.org/documentation.html)其中写道:

如果所有使用者实例都具有相同的使用者组,则其工作原理就像在使用者之间平衡负载的传统队列一样。

我发现这些消费者的群体就是“我的同一个群体”。

如何使新消息只被一个消费者阅读,而不是广播它?

共有3个答案

郎飞龙
2023-03-14

这不应该发生——确保两个消费者都在zookeeper znodes的同一个消费者组下注册。一个主题的每条消息都应该被一个消费群消费一次,所以这个消费群中的每一个人都应该有一个人收到这条消息,而不是你所经历的。你用的是什么版本的Kafka?

轩辕华辉
2023-03-14

从上面的例子很难看出你的Zookeeper配置是什么,或者是否有一个。您将需要一个Zookeeper集群来持久化消费者组信息,WRT每个组中的消费者在给定的偏移量下消费了什么。

一个可靠的例子在这里:官方Kafka留档-消费者群体示例

端木昱
2023-03-14

直到

https://github.com/mumrah/kafka-python/blob/c9d9d0aad2447bb8bad0e62c97365e5101001e4b/kafka/consumer.py#L108-L115

 类似资料:
  • 我是Kafka的初学者。我知道具有相同组id的多个消费者不能在一个主题中使用来自同一个分区的消息。我想知道如果来自一个消费组的多个Kafka消费者从一个分区读取相同的消息会发生什么,为什么这是一件坏事。 。

  • 我对Kafka是陌生的。我用spring boot创建了一个kafka消费者(spring-kafka dependency)。在我的应用程序中,我使用了consumerFactory和producerfactory beans进行配置。所以在我的应用程序中,我创建了如下的kafka消费者。 我的配置如下 所以我想并行消费,因为我可能会收到更多的消息。关于使用并行主题,我发现我需要为一个主题创建多

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 本文向大家介绍消费者和消费者组有什么关系?相关面试题,主要包含被问及消费者和消费者组有什么关系?时的应答技巧和注意事项,需要的朋友参考一下 每个消费者从属于消费组。具体关系如下:

  • 我们运行一个集群工作线程应用程序,该应用程序依赖于 Kafka 使用高级消费者 API 使用消息。群集中的所有节点共享同一个使用者组。现在我们想要的是将该逻辑的一部分迁移到 Kafka 流处理器 API。这里的方法是什么?如果分配了相同的 groupId/clientId,流拓扑是否会与现有使用者就消息进行斗争?我们应该分配不同的 groupId/clientId 吗?流式传输拓扑?说“组”。 “

  • 我试图有多个消费者的Kafka主题的多个分区与相同的groupId,这将帮助我扩大消费的消息。 根据Kafka的文件,它说: 如何让多个消费者拥有相同的消费者groupId,以实现负载平衡?