当前位置: 首页 > 面试题库 >

存储在Zookeeper或Kafka中的偏移量?

东方淇
2023-03-14
问题内容

我对使用Kafka和Zookeeper时在哪里存储偏移量感到困惑。在某些情况下,偏移似乎存储在Zookeeper中,而在其他情况下,偏移存储在Kafka中。

是什么决定偏移量存储在Kafka还是Zookeeper中?优点和缺点是什么?

注意:当然,我也可以将偏移量单独存储在其他数据存储区中,但这并不是本文的内容。

有关我的设置的更多详细信息:

  • 我运行以下版本:KAFKA_VERSION =“ 0.10.1.0”,SCALA_VERSION =“ 2.11”
  • 我使用NodeJS应用程序中的kafka-node连接到Kafka / Zookeeper。

问题答案:

Kafka的较早版本(0.9之前的版本)仅将偏移量存储在ZK中,而默认情况下,较新版本的Kafka则将偏移量存储在内部Kafka主题中__consumer_offsets(尽管较新的版本可能仍适用于ZK)。

向经纪人提供补偿的好处是,消费者不依赖ZK,因此客户只需要与经纪人对话,这简化了总体架构。另外,对于拥有大量用户的大型部署,ZK可能成为瓶颈,而Kafka可以轻松处理此负载(提交偏移量与编写主题是同一回事,并且Kafka在此处很好地扩展-
实际上,默认情况下__consumer_offsets会创建50个分区IIRC)。

我对NodeJS或kafka-node不熟悉-它取决于客户端实现如何提交偏移量。

长话短说:如果您使用经纪人0.10.1.0,则可以向topic提交补偿__consumer_offsets。但是,它是否实现此协议取决于您的客户端。

更详细地说,这取决于您的代理和客户端版本(以及您使用的是哪个消费者API),因为较旧的客户端可以与较新的代理通信。首先,您需要具有代理和客户端版本0.9或更高版本,才能将偏移量写入Kafka主题。但是,如果较旧的客户端连接到0.9代理,它将仍然向ZK提交补偿。

对于Java使用者:

这取决于用户使用什么:0.9之前有两个“老用户”,即“高级用户”和“低级用户”。两者都直接向ZK提交偏移量。从那时起0.9,这两个消费者合并为一个单一的消费者,称为“新消费者”(它基本上将两个老消费者的低级API和高级API统一了-
这意味着0.9存在三种类型的消费者)。新消费者对经纪人的承诺抵消(即内部的Kafka主题)

为了简化升级,还可以使用旧的使用者(自0.9)开始“双重提交”补偿。如果通过启用此功能dual.commit.enabled,则偏移量将提交给ZK和__consumer_offsets主题。这样,您就可以将偏移量从ZK转移到__consumer_offsets主题,同时从旧的使用者API切换到新的使用者API



 类似资料:
  • 我对使用Kafka和动物园管理员时偏移量的存储位置有点困惑。在某些情况下,偏移量似乎存储在动物园管理员中,而在其他情况下,它们存储在Kafka中。 什么决定了偏移量是存储在Kafka中还是存储在Zookeeper中?有哪些利弊? 注意:当然,我也可以将偏移量存储在不同的数据存储中,但这不是这篇文章的内容。 有关我的设置的更多详细信息: 我运行这些版本:KAFKA_VERSION=“0.10.1.0

  • 问题内容: 在轮询Kafka时,我已经使用该功能订阅了多个主题。现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后,并从一个话题。 在轮询数据之前,是否可以迭代调用每个主题名称 来 达到结果?偏移量如何精确存储在Kafka中? 我每个主题有一个分区,并且只有一个使用者可以读取所有主题。 问题答案: Kafka如何存储每个主题的偏移量? 卡夫卡已将抵销存储从动物园管理员转移到卡夫卡经纪人

  • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。

  • 我在Kafka·吉拉也描述了这个问题:https://issues.apache.org/jira/browse/KAFKA-13014 我们有多个实例和线程的Kafka流。 这个Kafka流消耗了很多话题。 其中一个主题分区一天内无法访问,主题保留时间为4小时。 解决问题后,Kafka流正试图从不再存在的偏移量中消费: Kafka消费群体描述: 我们可以看到KS正在等待的当前偏移量是 Kafka

  • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端

  • 我有多个Kafka消费者和制作人,主题不同。使用独立应用程序,我想监控Kafka消费者的延迟。 我使用Kafka0.10.0.1,因为Kafka现在存储消费者偏移Kafka本身,所以我怎么能读到相同的。 我能够读取每个分区的主题偏移量。