当前位置: 首页 > 知识库问答 >
问题:

如果消费者(库贝pod)重新启动,则Spring云流手动偏移提交行为

狄易安
2023-03-14

嗨,我们一直在使用旧的spring版本和kafka 1.1,并有以下依赖项

+--- org.springframework.boot:spring-boot-starter-web: -> 1.5.4.RELEASE (*)
+--- org.springframework.cloud:spring-cloud-starter-config: -> 1.3.1.RELEASE (*)
+--- org.springframework.cloud:spring-cloud-starter-bus-kafka: -> 1.3.1.RELEASE (*)
+--- org.springframework.cloud:spring-cloud-stream-binder-kafka: -> 1.2.1.RELEASE (*)
+--- org.apache.kafka:kafka-clients:0.11.0.2 (*)
+--- org.apache.kafka:kafka_2.11:0.11.0.2 (*)

我在应用程序中有以下配置。yaml文件

spring:
  kafka:
    properties.security.protocol: SSL
    ssl:
      key-password: ${ABC_KEY_PASSWORD}
      keystore-location: file:${ABC_KEYSTORE}
      keystore-password: ${ABC_KEYSTORE_PASSWORD}
      truststore-location: file:${ABC_TRUSTSTORE}
      truststore-password: ${ABC_TRUSTSTORE_PASSWORD}
    consumer:
      enable-auto-commit: false
  cloud:
    stream:
      bindings:
        feed_fcpostprocessor_event:
          destination: app-fc-notification
          binder: abckafka
          group: app-fc-group
          consumer:
            partitioned: true
            concurrency: 2
      kafka:
        binder:
          brokers: ${ABC_BROKER_HOST:abc-kafka.aws.local:9092}
          autoCreateTopics: true
          autoAddPartitions: true
          replicationFactor: 3
          partitionCount: 2
        bindings:
          feed_fcpostprocessor_event:
            consumer:
              autoCommitOffset: false
      binders:
        abckafka:
          type: kafka

我在stackoverflow中找到了链接。答案有

首先,我不确定您对SpringApplication.ext(applicationContext,())的期望是什么-

这是否意味着Kafka不知道消费者已关闭,在这种情况下没有来自消费者的通信。我认为会发生重新平衡,此外,由于kafka级别的enable-自动提交和绑定级别的自动提交偏移设置为假。偏移量将不会提交,如果应用程序再次启动,它将从它错过的最后一个偏移量开始,或者如果重新平衡发生,那么其他消费者将从其他消费者失败的分区偏移量中读取。

我相信我的问题不是重复的,必须提出中肯的问题。如果有问题,请告诉我答案。否则,请澄清行为。谢谢

共有1个答案

裴经义
2023-03-14

首先,即使Boot 1.5不再受支持;您应该使用Springkafka 1.3.11而不是1.1;由于KIP-62,它有一个更简单的线程模型。在1.3之前,为了避免重新平衡,事情非常复杂。

但是,更好的是,您应该升级到所有项目的受支持版本,但至少,将sping-kafka升级到1.3。

你的期望是正确的;具有未提交偏移量的记录将被重新提交。

 类似资料:
  • 我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?

  • 我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。 总共有5000条信息, 重新启动前-消耗2900条消息和提交的偏移量 kafka版本(strimzi)>2.0.0 kafka-python==2.0.1

  • 我对SpringBoot中的Kafka批处理侦听器有问题。 这是@KafkaListener 对于我的问题,这个解决方案不起作用,因为提交批处理。对于我的解决方案,我需要提交单个消息的偏移量。 我尝试使用

  • 我正在尝试找出使用Spring-Kafka(1.1.0. RELEASE)在Kafka消费者中手动提交偏移的方法。我明白,最好将这些偏移提交给健壮的客户端实现,这样其他消费者就不会处理重复的事件,这些事件最初可能是由现已死亡的消费者处理的,或者因为重新平衡被触发了。 我知道有两种方法可以解决这个问题- > 将ACK_MODE设置为MANUAL_IMMEDIATE,并在侦听器实现中调用ack.ack

  • null 当侦听器处理记录后返回时提交偏移量。 如果侦听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的code/config/command组合对其进行测试时,情况并非如此。偏移量仍然会得到更新,并且继续处理下一条消息。 我的配置: 验证偏移量的命令: 我使用的是kafka2.12-0.10.2.0和org.springframework.kafka:spring-kafka:1.1