在轮询Kafka时,我已经使用该subscribe()
功能订阅了多个主题。现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后seek()
,并poll()
从一个话题。
在轮询数据之前seek()
,是否可以迭代调用每个主题名称 来 达到结果?偏移量如何精确存储在Kafka中?
我每个主题有一个分区,并且只有一个使用者可以读取所有主题。
Kafka如何存储每个主题的偏移量?
卡夫卡已将抵销存储从动物园管理员转移到卡夫卡经纪人。原因如下:
Zookeeper不是服务于高写入负载(例如偏移量更新)的好方法,因为Zookeeper会将每个写入路由到每个节点,因此无法分区或扩展写入。
我们一直都知道这一点,但是由于已经依赖zk,因此选择此实现作为一种“方便婚姻”。
Kafka将偏移量提交存储在主题中,当使用者提交偏移量时,kafka将提交偏移量消息发布到“ commit-
log”主题,并保留将组/主题/分区映射到最新偏移量的内存结构,以便快速检索。可以在此页面中找到有关偏移管理的更多设计信息。
现在,我要设置要从每个主题读取的偏移量,而无需在一个主题的每个seek()和poll()之后重新订阅。
kafka管理工具有一个新功能可重置偏移量。
kafka-consumer-group.sh --bootstrap-server 127.0.0.1:9092 --group
your-consumer-group **--reset-offsets** --to-offset 1 --all-topics --execute
还有,你可以使用更多的选择。
我有一个问题,假设有一个TOPIC T1,有两个消费者C1和C2属于两个不同的组,电流偏移量是0.我们知道Kafka维护消费者的偏移量。因此,如果 C1 使用消息并且 Offset 变为 1,那么如果 C2 使用消息,它将从 1 偏移量开始,还是从 0 偏移量开始使用消息,会发生什么情况?表示两个不同的消费群体将如何维持抵消? 谢啦
问题内容: 我对使用Kafka和Zookeeper时在哪里存储偏移量感到困惑。在某些情况下,偏移似乎存储在Zookeeper中,而在其他情况下,偏移存储在Kafka中。 是什么决定偏移量存储在Kafka还是Zookeeper中?优点和缺点是什么? 注意:当然,我也可以将偏移量单独存储在其他数据存储区中,但这并不是本文的内容。 有关我的设置的更多详细信息: 我运行以下版本:KAFKA_VERSION
我想打印Flink已开始读取的Kafka主题的每个分区的起始偏移量?
问题内容: 我正在使用Java 编写使用者。我想保持消息的实时性,因此,如果有太多消息在等待使用,例如1000条或更多,我应该放弃未使用的消息,并从最后一个偏移量开始使用。 对于此问题,我尝试比较主题的最后提交的偏移量和主题的结束偏移量(仅1个分区),如果这两个偏移量之间的差大于某个值,则将主题的最后提交的偏移量设置为下一个偏移量,这样我就可以放弃那些多余的消息。 现在我的问题是如何获得主题的最终
现在我的问题是如何得到一个主题的结束偏移量,有人说我可以用老消费者,但是太复杂了,新消费者有这个功能吗?
有一种情况,当消费者1阅读来自Kafka主题的消息时。当使用相同的groupId连接第二个用户2时,需要重新平衡分区。有没有可能以某种方式重置偏移,以便在重新平衡过程之后,两个消费者都从头开始阅读主题?