嗨,我正在将kafka升级到.9,并将kafka consumer升级到与.9一起发布的新java consumer。在升级时,我使用的是现有的主题,步骤只是停止.8 kafka并开始指向相同log.dirs的.9 kafka。在消费者端,我使用的是相同的组名和主题名,但是新的消费者从主题中的起始位置再次使用消息。我已经把他们交了。我正在添加auto.offset.reset=最早。
任何想法为什么会发生,或者我们需要首先使用.8消费者消费所有消息,并让滞后变为0。谢谢Sunny
0.8个消费者将抵消存储在Zookeeper中,0.9个消费者将其存储在特定的卡夫卡主题中。因此,0.9消费者通常不会看到由0.8消费者存储的任何偏移,因此将从start开始消费。有一个FAQ可能有助于迁移偏移。引用:
1)升级代理并设置dual.Commit.enabled=false和offsets.storage=Zookeeper(仅将offsets提交到Zookeeper)。
2)设置dual.Commit.enabled=true和offsets.storage=Kafka并重新启动(提交对Zookeeper和Kafka的偏移量)。
3)设置dual.Commit.enabled=false和offsets.storage=Kafka并重新启动(仅将offsets提交到Kafka)。
(没有尝试过,只是引用。)
null null 我们正在为我们的kafka broker(kafka Version2.1.1)使用Red Hat AMQ Streams 我们在环境中唯一更改的是Spring Boot版本(以及自动拉入/更新的依赖项),以重新创建此问题 以下是更改前的: 现在,如果我们只是用如下所示的新的Spring Boot版本更新,我们的kafka度量标准就会消失: 提前感谢您的帮助!如果你需要任何其他
我正在尝试使用高级消费者批量读取Kafka主题中的消息。在这批读取期间,我的线程必须在某个时候停止。 或者,一旦主题中的所有消息都用完了。或获取消息即将被读取时的最大偏移量,并停止直到达到最大偏移量。 我尝试在高级消费者处使用代码,但 KafkaStream 上的迭代器方法似乎是一个阻塞调用,并等待另一条消息传入。 所以3个问题, > 我怎么知道没有更多消息要从该主题中读取? 如果我对上述问题有答
我正在阅读Kafka常见问题解答,他们如下所示。 •每个分区不会被每个使用者组中的多个使用者线程/进程使用。这允许每个进程以单线程方式使用,以保证分区内的使用者的顺序(如果我们将有序消息分割成一个分区并将它们传递给多个使用者,即使这些消息是按顺序存储的,它们有时也会被无序地处理)。 有没有可能,
我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者
我使用的是高级消费者,如所述:https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例 我注意到,我的消费者不会永远运行,而是在一段时间后结束。在zookeeper一侧,我看到以下内容: INFO已处理的会话终止的setsionid: 0x144a4854325004d(org.apache.zookeeper.server.准备请
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认