当前位置: 首页 > 知识库问答 >
问题:

Kafka:消费者补偿如何与动态创建的组ID一起工作?

陈季
2023-03-14

在Kafka中,每个消费群体都由一个独特的群体来代表。id属性。每个消费者组管理自己的偏移量(存储在_消费者_偏移量主题中)。如果我总是用一个动态生成的组启动我的消费者服务,那么这个偏移量会发生什么。身份证价值?

这个新的消费群体是否总是从主题的开头开始阅读,因为它没有偏移量,还是将“自动”。抵消重置生效?

共有1个答案

王建华
2023-03-14

如果生成新的组。id每次应用程序启动时,消费者都会依赖auto。抵消重置以找到其起始位置。这是因为这是一个新组,所以不会存储任何偏移量。

使用auto。抵消重置,您可以指示消费者从日志的最早开始,或以最新结束。

请注意,在启动时,您还可以控制应用程序逻辑中的位置,并根据需要显式搜索任意位置。

一个相对常见的模式是从一个准时得到的位置开始,例如寻找到1小时前或一天的开始。这可以通过使用offsetsForTimes()seek()来实现。

 类似资料:
  • 我正在使用Kafka 0.8 最近,我们开始喂食和消耗一个行为怪异的新主题,消耗的偏移量突然被重置,它尊重我们设置的auto.offset.reset策略(实际上是最小的)但我无法理解为什么该主题会突然重置其偏移量。 我正在使用高级消费者。 这是我发现的一些错误日志: 我们有一堆这样的错误日志: 每次出现此问题时,我都会看到警告日志: 然后真正的问题发生了: 现在的问题是:有人已经经历过这种行为吗

  • 我设置了MirrorMaker2,用于在两个DC之间复制数据。 我的 mm2 属性, 看到下面的MM2创业。 我的数据正在按预期进行复制。源主题作为源在目标集群中创建..但是,消费者群体补偿并没有被复制。 已在源群集中启动使用者组。 消耗了少量消息并将其停止。在此主题中发布了新消息,镜像制造商也将数据镜像到目标集群。 我尝试使用来自目标集群的消息,如下所示。 由于我使用相同的使用者组,因此我希望我

  • 我能够使用ApacheKafka提交偏移量类,并能够使用ConsumerConnector进行提交。我查看了apache camel kafka组件,该组件的使用者选项与“auto.commit.enable”属性相同。现在,Camel Java DSL中是否有任何属性或方法,在使用消息后,我们可以手动提交偏移量(通过URL中提供的方法或消费者选项),或者我们必须再次使用Kafka消费者API提交

  • 当我向主题“Test19”发送任何消息时,配置的ServiceActivator“ProcessMessage”方法将两条消息显示为配置的两个客户,但这里的问题是,在添加到消费者上下文之前,我需要为每个客户加载入站配置文件…否则,我只能在控制台中得到一条消息…是正确的方式还是我需要在这里改变什么? 谢了。

  • 我使用MQTT消费者作为我的flink作业的数据源。我想知道如何将数据偏移保存到检查点中,以确保flink集群在故障后重新启动时不会丢失数据。我看过很多介绍apache flink如何管理kafka消费者补偿的文章。有人知道apache flink是否有自己的功能来管理MQTT使用者吗?谢谢

  • 我在一个线程中创建了一个Kafka consumer实例,作为构造函数的一部分,在thread inside run方法中,我确实调用了不同的web服务,为了保持调用的非阻塞性,我正在使用completable future。我的问题是,我无法通过调用thenApply方法并传递Kafka consumer实例来发出commit,因为这会给我一个错误,即Kafka consumer不是线程安全的。