当前位置: 首页 > 知识库问答 >
问题:

Kafka -当设置AUTO_OFFSET_RESET_CONFIG为“最新”时,为什么fresh groupId不返回主题中的所有消息

淳于泓
2023-03-14

我尝试在scala中实现一个非常简单的Kafka(0.9.0.1)消费者(代码如下)。

据我理解,Kafka(或者更好地说是Zookeeper)为每个groupId存储给定主题的最后一条消费消息的偏移量。因此,在以下情况下:

  1. 具有 groupId1 的使用者,昨天只消耗了主题中的 5 条消息。现在最后使用的消息的偏移量为 4(考虑偏移量为 0 的第一条消息)
  2. 在夜间,2条新消息到达主题
  3. 今天我重新启动消费者,使用相同的groupId1,将有两个选项:

选项1:如果我将以下属性设置为“最新”,则消费者将读取夜间到达的最后2条新消息:

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

选项2:如果我将以下属性设置为“最早”,消费者将阅读主题中的所有7条消息:

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

问题:由于某种原因,如果我将使用者的 groupId 更改为 groupId2,那就是给定主题的新 groupId,因此它以前从未使用过任何消息,其最新偏移量应为 0。我期待通过设置

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

使用者将在第一次执行期间读取存储在主题中的所有消息(相当于具有最早的消息)。然后对于后续执行,它将只消耗新的执行。然而,事实并非如此。

如果我设置一个新的 groupIdAUTO_OFFSET_RESET_CONFIG保持为最新版本,则使用者无法读取任何消息。然后我需要做的是将第一次运行设置为最早AUTO_OFFSET_RESET_CONFIG,一旦 groupID 的偏移量已经不同于 0,我就可以移动到最新版本

这就是我的消费者应该如何工作吗?有没有比在第一次运行使用者后切换AUTO_OFFSET_RESET_CONFIG更好的解决方案?

下面是我作为一个简单消费者使用的代码:

class KafkaTestings {

  val brokers = "listOfBrokers"
  val groupId = "anyGroupId"
  val topic = "anyTopic"

  val props = createConsumerConfig(brokers, groupId)

  def createConsumerConfig(brokers: String, groupId: String): Properties = {
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "12321")
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
    props
  }

  def run() = {
    consumer.subscribe(Collections.singletonList(this.topic))

    Executors.newSingleThreadExecutor.execute(    new Runnable {
      override def run(): Unit = {

        while (true) {
          val records = consumer.poll(1000)

          for (record <- records) {
            println("Record: "+record.value)
          }

        }

      }
    })
  }
}

object ScalaConsumer extends App {
  val testConsumer = new KafkaTestings()
  testConsumer.run()
} 

这被用作编写这个简单消费者的参考

共有2个答案

邹海荣
2023-03-14

在我的测试中,我通常希望从最早的偏移量读取,但正如所指出的,一旦您读取了具有给定groupId的消息,那么您的偏移量将保持在该指针上。我这样做:

< code > properties . put(consumer config。GROUP_ID_CONFIG,uuid . randomuuid());

娄德运
2023-03-14

这是按照记录工作的。

如果您启动一个新的消费者组(即 Kafka 中没有存储现有偏移量的消费者组),您必须选择消费者是从最早可能的消息(主题中仍然可用的最旧消息)还是从最新消息(仅从现在开始生成的消息)开始。

有没有比第一次运行消费者后切换AUTO_OFFSET_RESET_CONFIG更好的解决方案?

您可以将其保持在最早,因为第二次运行消费程序时,它已经存储了偏移量,只需在那里拾取即可。重置策略仅在创建新用户组时使用。

今天我重新启动消费者,使用相同的groupId1,将有两个选项:

不是真的。由于消费者组在前一天运行,因此它将找到其已提交的补偿,并从停止的位置恢复。因此,无论您将重置策略设置为什么,它都将收到这两条新消息。

尽管意识到,Kafka不会永远存储这些偏移量,我相信默认值只有一周。因此,如果你关闭消费者的时间超过了这一时间,那么补偿可能会过时,你可能会意外地重置为EARLIEST(这对于大型主题来说可能会很昂贵)。鉴于此,无论如何,将其更改为最新版本可能是明智的。

 类似资料:
  • 我正在学习Kafka,如果有人能帮助我理解一件事。“制作人”向Kafka主题发送消息。它会在那里停留一段时间(默认为7天,对吗?)。 但是“消费者”收到这样的信息,永远保持它在那里没有多大意义。我预计当消费者收到它们时,这些信息会消失。否则,当我再次连接到Kafka时,我将再次下载相同的消息。所以我必须管理重复的避免。 它背后的逻辑是什么? 问候

  • 我使用confluent .net客户端。订阅者在重启(订阅者服务重启)后始终读取 Kafka 主题的所有消息。如何提交消费者已经实现的偏移并从中读取?也许一些消费者配置可以提供帮助...

  • 问题内容: 我只是将应用程序主题设置为Synthetica Alu Oxide,但由于某些原因,JFrame不会重新绘制,而另一个Synthetica主题将重新绘制JFrame。 这就是我的样子。 http://i.imgur.com/SOBDTs4.png 这就是它的样子。 http://www.jyloo.com/images/screenshots/syntheticaAluOxide/de

  • 我有一个kafka主题,有25个分区,集群已经运行了5个月。 根据我对给定主题的每个分区的理解,偏移量从0,1,2开始...(无界) 我看到log-end-offset值很高(现在- 我创建了一个新的消费群体,偏移设置为最早;因此,我预期该消费者组的客户端将从偏移量0开始的偏移量。 我用来创建一个偏移量为最早的新消费者组的命令: 我看到正在创建消费者组。我预计当前偏移量为0;然而,当我描述消费者组

  • 我看过与此相关的类似问题,但并没有找到正确的答案。我只想从 Kafka 主题中删除消息,而不是更改保留超时。我已经安装了kafka_2.11-0.8.2.1,并使用蝙蝠文件在Windows上运行它。我想知道我是否可以删除主题中发布的所有消息,而不删除整个主题。

  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到