假设我在一个2Kafka分区中有一个主题(test),并且一个2consumer-group(X,Y),每个consumer-group中只有一个consumer-group是消费主题。
现在,我想知道同一分区中其他使用者组的偏移量。下面的伪代码将解释需要
*** Let's assume this is running in the context of consumer group X
TOPIC = “test”
// consumer for group x
Consumer<K, V> consumerX = new KafkaConsumer<>(consumerProperties);
consumerX.subscribe(TOPIC, new ReportOnRebalance(……..));
// Get the current assigned partition, could be null but keep searching
// until partition got assigned to the consumerX
Set<TopicPartition> topicPartition = consumerX.assignment();
// Get the last committed offset
offsetAndMetadataX = consumerX.committed(topicPartition)
// consumer for group y
Consumer<K, V> consumerY = new KafkaConsumer<>(consumerProperties);
// manually assign because I am interested in the offset for the
// partition consumerX is going to serve for
consumerY.assign(topicPartition)
// Get the last committed offset
offsetAndMetadataY = consumerY.committed(topicPartition)
// Do require logic with offsetAndMetadataC and offsetAndMetadataY
newOffset = foo(offsetAndMetadataX, offsetAndMetadataY)
// want to reset the offset for this consumerY and in this
// partition
consumerY.seek(topicPartition, bar(newOffset))
// Change offset for consumerX and starting polling for messages
consumerX.seek(topicPartition, newOffset)
while(...) {
consumerX.poll(..)
....
}
*** Now the same code will run in the context of consumer group Y, but the role will be reversed
consumerY.subscribe()
consumerX.assign()
...
consumerY.seek(topicPartition, bar(newOffset))
...
// Change offset for consumerY and starting polling for messages
consumerY.seek(topicPartition, newOffset)
while(...) {
consumerY.poll(..)
....
}
我没有尝试过您在这里描述的内容,但从官方文档来看,这似乎应该像您希望的那样工作:
https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html#ManualAssignment
此处突出显示的关键部分:
手动分区分配不使用组协调,因此使用者故障不会导致分配的分区重新平衡。每个使用者独立行动,即使它与另一个使用者共享一个groupId。为了避免偏移量提交冲突,您通常应该确保groupId对于每个使用者实例都是唯一的。
基本上,如果您开始手动为使用者分配分区,那么所有的动态重新平衡都会自动关闭。所以,你应该小心,但Kafka似乎确实考虑到了你描述的场景。
我们在Kubernetes中基于<code>gcr.io/google_containers/Kubernetes-Kafka:1.0-10.2.1</code>docker映像运行一个Kafka集群,使用<code>gcr.io/google_containers/Kubernetes-zookeeper:1.0-3.4.10</code>,使用三个Kafka和zookeer实例。 我们有几个不
在一个消费者群体中的所有消费者都失败后,kafka会将该消费者群体的补偿存储多长时间?是否有此配置变量?
在Kafka中创建主题后,您可以创建任意数量的消费者组,只需尝试使用这些组来阅读主题。 我想创建一个额外的消费者组来监控真实消费者组的消息内容——一个用来偷看他们消息的组。因此,GUI会让您单击任何消费者组的“偷看”,“偷看”组的偏移量将更新为被监控组的偏移量,然后它会向您显示该偏移量中的消息。 不过我很困惑,因为你不能在第一次就明确地创建一个消费者群体。您似乎必须阅读一条消息才能获得在动物园管理
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群
我正在阅读Kafka常见问题解答,他们如下所示。 •每个分区不会被每个使用者组中的多个使用者线程/进程使用。这允许每个进程以单线程方式使用,以保证分区内的使用者的顺序(如果我们将有序消息分割成一个分区并将它们传递给多个使用者,即使这些消息是按顺序存储的,它们有时也会被无序地处理)。 有没有可能,
我是Kafka的新手,正在学习Kafka内部知识。请根据需要随时更正我的理解。。 这是我的实时场景..感谢所有的回复: 我有一个接收数据文件的实时FTP服务器…比如索赔文件。 我将把这些数据发布到一个主题中.让我们把这个主题称为claims_topic(2个分区). 我需要订阅这个claims_topic,阅读消息并将它们写入Oracle和Postgres表。让我们将oracle表称为Otable
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka