我有一个用例,其中消息需要广播到水平可扩展、无状态的应用程序集群中的所有节点,我正在考虑Kafka。由于集群的每个节点都需要接收主题中的所有消息,因此集群的每个节点都需要有自己的消费者组。
这里可以假设消息量不是很高,以至于每个节点都无法处理所有消息。
为了用Kafka实现这一点,当从主题消费时,我最终会使用消费者流程的instanceId(或某个唯一标识符)作为消费者组id。这将推高消费群体的数量。随着重新部署的完成,新的消费群体将开始出现。
自我回答我的问题:进一步研究得出的一个解决方案是使用kafka分配()
API而不是订阅()
API来消费。前者不需要消费者组。我只是配置每个节点来消费来自主题所有分区的消息。
感谢伊戈尔·苏亚雷斯,他在评论中提出了不需要消费者群体消费的想法。
我是Kafka的新手,正在学习Kafka内部知识。请根据需要随时更正我的理解。。 这是我的实时场景..感谢所有的回复: 我有一个接收数据文件的实时FTP服务器…比如索赔文件。 我将把这些数据发布到一个主题中.让我们把这个主题称为claims_topic(2个分区). 我需要订阅这个claims_topic,阅读消息并将它们写入Oracle和Postgres表。让我们将oracle表称为Otable
我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群
我们希望获得Kafka消费群体指标(例如,节流和字节率)。 我们已经使用以下工具完成了此操作: Kafka消费者Java应用程序的JMX Mbean CLI实用程序: bin/kafka-consumer-groups.sh--描述--组group_name--bootstrap-serverlocalhost: port . 问题:这可以通过使用一些Java库以编程方式完成吗? 到目前为止,我们
我们在Kubernetes中基于<code>gcr.io/google_containers/Kubernetes-Kafka:1.0-10.2.1</code>docker映像运行一个Kafka集群,使用<code>gcr.io/google_containers/Kubernetes-zookeeper:1.0-3.4.10</code>,使用三个Kafka和zookeer实例。 我们有几个不
在一个消费者群体中的所有消费者都失败后,kafka会将该消费者群体的补偿存储多长时间?是否有此配置变量?
我运行这个命令: kafka使用者组--引导服务器localhost:9092--组我的使用者组--重置偏移量--最早--执行--主题my-topic-1 它给出了错误: 命令的语法不正确。 根据此命令的帮助结果,我键入的内容似乎是正确的。 我在这里犯了什么错误?