当前位置: 首页 > 知识库问答 >
问题:

SpringKafka消费者的异常

边桐
2023-03-14

在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义

1)我的疑问是,我是否错过了来自消费者的任何信息?

2) 我需要处理这个错误吗。由于seekToCurrentErrorHandler()未报告此错误

org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。

我的SpringKafka消费者代码片段

  public class KafkaConsumerConfig implements KafkaListenerConfigurer 
    @Bean
        public SeekToCurrentErrorHandler seekToCurrentErrorHandler() {
            SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((record, e) -> {
                System.out.println("RECORD from topic " + record.topic() + " at partition " + record.partition()
                        + " at offset " + record.offset() + " did not process correctly due to a " + e.getCause());
            }, new FixedBackOff(500L, 3L));
            return seekToCurrentErrorHandler;
        }
    
        @Bean 
        public ConsumerFactory<String, ValidatedConsumerClass> consumerFactory() {
      
      ErrorHandlingDeserializer<ValidatedConsumerClass> errorHandlingDeserializer;
      errorHandlingDeserializer = new ErrorHandlingDeserializer<>( new JsonDeserializer<>(ValidatedConsumerClass.class));
      
      Map<String, Object> props = new HashMap<>();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "grpid-098");
      props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
      
      
      return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
      errorHandlingDeserializer);
      
      }
    
        @Bean
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, ValidatedConsumerClass>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, ValidatedConsumerClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.getContainerProperties().setAckMode(AckMode.RECORD);
            factory.setErrorHandler(seekToCurrentErrorHandler());
            return factory;
        }

消费者阅读信息

@Service
public class KafKaConsumerService extends  AbstractConsumerSeekAware {

@KafkaListener(id = "foo", topics = "mytopic-5", concurrency = "5", groupId = "mytopic-1-groupid")
    public void consumeFromTopic1(@Payload @Valid ValidatedConsumerClass message, ConsumerRecordMetadata c) {


        
    databaseService.save(message);
        
        System.out.println( "-- Consumer End -- "   + c.partition() + " ---consumer thread-- " + Thread.currentThread().getName());


    }

共有1个答案

柯波
2023-03-14
  1. 不,你没有遗漏任何东西
  2. 不,你不需要处理它,STCEH已经处理了它,记录将在下次投票时重新提交

在这种情况下,异常是在记录处理之外(处理完成后)引起的。由于提交由于重新平衡而失败,因此STCEH不需要reseeek(反正也不需要,因为记录不再可用)。它只是重新抛出异常。

一切按预期进行。。。

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.max.poll.interval.ms=5000
@SpringBootApplication
public class So69016372Application {

    public static void main(String[] args) {
        SpringApplication.run(So69016372Application.class, args);
    }

    @KafkaListener(id = "so69016372", topics = "so69016372")
    public void listen(String in, @Header(KafkaHeaders.OFFSET) long offset) throws InterruptedException {
        System.out.println(in + " @" + offset);
        Thread.sleep(6000);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so69016372").partitions(1).replicas(1).build();
    }

}

后果

2021-09-01 13:47:26.963  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions assigned: [so69016372-0]
foo @0
2021-09-01 13:47:31.991  INFO 13195 --- [ad | so69016372] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Member consumer-so69016372-1-f02f8d74-c2b8-47d9-92d3-bf68e5c81a8f sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-09-01 13:47:32.989  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Failing OffsetCommit request since the consumer is not part of an active group
2021-09-01 13:47:32.994 ERROR 13195 --- [o69016372-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1602) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.6.jar:2.7.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1139) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1004) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1495) ~[kafka-clients-2.7.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2710) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2705) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2691) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2489) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1235) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 3 common frames omitted

2021-09-01 13:47:32.994  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-09-01 13:47:32.994  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Lost previously assigned partitions so69016372-0
2021-09-01 13:47:32.995  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions lost: [so69016372-0]
2021-09-01 13:47:32.995  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions revoked: [so69016372-0]
...
2021-09-01 13:47:33.102  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions assigned: [so69016372-0]
foo @0
2021-09-01 13:47:38.141  INFO 13195 --- [ad | so69016372] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Member consumer-so69016372-1-e6ec685a-d9aa-43d3-b526-b04418095f09 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-09-01 13:47:39.108  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Failing OffsetCommit request since the consumer is not part of an active group
2021-09-01 13:47:39.109 ERROR 13195 --- [o69016372-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1602) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.6.jar:2.7.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1139) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1004) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1495) ~[kafka-clients-2.7.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2710) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2705) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2691) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2489) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1235) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 3 common frames omitted

2021-09-01 13:47:39.109  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-09-01 13:47:39.109  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Lost previously assigned partitions so69016372-0
2021-09-01 13:47:39.109  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions lost: [so69016372-0]
2021-09-01 13:47:39.109  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions revoked: [so69016372-0]
...
2021-09-01 13:47:39.217  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions assigned: [so69016372-0]
foo @0

它将无限期地重试。

 类似资料:
  • 我正在使用Spring Kafka consumer,它从主题中获取消息并将其保存到数据库中。如果满足故障条件,例如db不可用,kafka消费者库是否提供重试机制?如果是,是否有方法设置不同的重试间隔,如第1次重试应在5分钟后进行,第2次重试应在30分钟后进行,第3次重试应在1小时后进行等。

  • 我在站点1(3个代理)有两个集群设置cluster-1,在站点2(3个代理)有两个集群设置cluster-2。使用spring kafka(1.3.6)消费者(一台机器)并通过@KafkaListener注释收听消息。我们如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory,并同时监听来自这两个集群的数据。 我的侦听器应该同时使用来自这两个集群的消息。

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用

  • 我是Spring-Kafka的新手,在使用Spring Kafka RetryTemplate处理kafka消息期间,尝试在失败或任何异常的情况下实现重试。 我使用了以下代码: //这是KafkaListenerContainerFactory: 重试模板 这是消费者工厂 当任何异常发生时,它会按照重试策略按预期重试。一旦max重试耗尽,它就会调用恢复回调方法。但很快,它会给出“java.lang

  • 我怎样才能暗示SpringKafka把每一个话题传播给一个不同的消费者呢? 干杯

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?