当前位置: 首页 > 知识库问答 >
问题:

kafka批次消费者如何处理长时间运行/处理记录?

荣曾笑
2023-03-14

我正在使用spring-kafka“2.2.7.RELEASE”来创建一个批处理消费者,并且我正在尝试了解当我的记录处理时间超过 max.poll.interval.ms 时消费者重新平衡是如何工作的。

这是我的配置。

public Map<String, Object> myBatchConsumerConfigs() {
       Map<String, Object> configs = = new HashMap<>();
       configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
       configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
       configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
       sapphireKafkaConsumerConfig.setSpecificAvroReader("true");
   }

这是我的出厂设置。

@Bean
         public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
           ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
           factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(myBatchConsumerConfigs()));
           factory.getContainerProperties().setMissingTopicsFatal(false);

           factory.getContainerProperties().setAckMode(AckMode.BATCH);

           factory.setErrorHandler(myCustomKafkaSeekToCurrentErrorHandler);
          
           factory.setRecoveryCallback(myCustomKafkaRecoveryCallback);
           factory.setStatefulRetry(true);
           factory.setBatchListener(true);
           
           factory.setBatchErrorHandler(myBatchConsumerSeekToCurrentErrorHandler);
                   factory.getContainerProperties().setConsumerRebalanceListener(myBatchConsumerAwareRebalanceListener);
                   factory.setRecoveryCallback(context -> {
                       logger.logInfo("In recovery call back for KES Batch Consumer", this.getClass());
                       myBatchConsumerDeadLetterRecoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"), (Exception) context.getLastThrowable());
                       return null;
                   });
           return factory;
         }

我添加了自定义消费者监听器,如下所示。

@Component
public class MyBatchConsumerAwareRebalanceListener implements ConsumerAwareRebalanceListener {  

@Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions) {
        partitions.forEach(
                topicPartition -> {
                    System.out.println(" onPartitionsRevokedBeforeCommit - topic "+ topicPartition.topic() +" partition - "+topicPartition.partition());
                }
        );
        //springLogger.logInfo(" onPartitionsRevokedBeforeCommit", getClass());
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions) {
        partitions.forEach(
                topicPartition -> {
                    System.out.println(" onPartitionsRevokedAfterCommit - topic "+ topicPartition.topic() +" partition - "+topicPartition.partition());
                }
        );
        //springLogger.logInfo(" onPartitionsRevokedAfterCommit", getClass());
    }

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<org.apache.kafka.common.TopicPartition> partitions) {
        partitions.forEach(
                topicPartition -> {
                    System.out.println(" onPartitionsAssigned - topic - "+ topicPartition.topic() +" partition - "+topicPartition.partition());
                }
        );
        //springLogger.logInfo(" onPartitionsAssigned", getClass());
    }

}


Here is my consumer where I've added a delay of 400 sec delay/sleep which is greater than the max.poll.interval.ms ( 300 ms)    



@KafkaListener(groupId = "TestBatchConsumers", topics = TEST_KES_BATCH_CONSUMER_TOPIC, containerFactory = "myBatchConsumerContainerFactory")
       public void consumeRecords(List<ConsumerRecord<String, Organization>> consumerRecords) {

           long startTime = System.currentTimeMillis();
           System.out.println("Processing started at  "+startTime);
           consumerRecords.forEach(consumerRecord -> {
               System.out.println(
                       "Received consumerRecord on topic" + consumerRecord.topic()+" , partition "+consumerRecord.partition()
                               + ", at offset " + consumerRecord.offset() + ", with key " + consumerRecord.key() );


               try {
                   Thread.sleep(400000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           });

           System.out.println("Processing completed at "+ System.currentTimeMillis());

           long processingTimeInSec = (System.currentTimeMillis() - startTime)/1000 ;
           System.out.println(processingTimeInSec);
       }

现在我期望消费者群体能够重新平衡,因为处理时间超过 max.poll.interval.ms 但我没有看到任何这样的行为。我在这里错过了什么吗?

请建议。

共有1个答案

庄飞
2023-03-14

它的行为符合我的预期:

@SpringBootApplication
public class So67520619Application {

    public static void main(String[] args) {
        SpringApplication.run(So67520619Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("so67520619", "foo");
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so67520619").partitions(1).replicas(1).build();
    }

}

@Component
class Listener implements ConsumerAwareRebalanceListener {

    private static final Logger LOG = LoggerFactory.getLogger(Listener.class);

    @KafkaListener(id = "so67520619", topics = "so67520619")
    public void listen(List<String> in) throws InterruptedException {
        LOG.info(in.toString());
        Thread.sleep(12000);
    }

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        LOG.info("Assigned: " + partitions);
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        LOG.info("Revoked: " + partitions);
    }

}
spring.kafka.consumer.properties.max.poll.interval.ms=10000
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.type=batch
2021-05-13 11:13:16.339  INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener                : Assigned: [so67520619-0]
2021-05-13 11:13:16.358  INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener                : [foo, foo, foo, foo]
2021-05-13 11:13:26.416  INFO 20954 --- [ad | so67520619] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Member consumer-so67520619-1-c9a440bf-9076-4575-813d-3efb0054f5f7 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-05-13 11:13:28.365  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Failing OffsetCommit request since the consumer is not part of an active group
2021-05-13 11:13:28.370 ERROR 20954 --- [o67520619-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1431) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.7.jar:2.6.7]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504) ~[kafka-clients-2.6.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2396) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2391) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2377) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2191) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1149) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.7.jar:2.6.7]
    ... 3 common frames omitted

2021-05-13 11:13:28.371  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-05-13 11:13:28.371  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Lost previously assigned partitions so67520619-0
2021-05-13 11:13:28.371  INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener                : Revoked: [so67520619-0]
2021-05-13 11:13:28.372  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 11:13:28.376  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-05-13 11:13:28.376  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 11:13:28.485  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Finished assignment for group at generation 3: {consumer-so67520619-1-15ce3150-1aa3-4b43-a892-fbd54e8ed919=Assignment(partitions=[so67520619-0])}
2021-05-13 11:13:28.486  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Successfully joined group with generation 3
2021-05-13 11:13:28.487  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Notifying assignor about the new Assignment(partitions=[so67520619-0])
2021-05-13 11:13:28.487  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Adding newly assigned partitions: so67520619-0
2021-05-13 11:13:28.488  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Found no committed offset for partition so67520619-0
2021-05-13 11:13:28.489  INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Resetting offset for partition so67520619-0 to offset 0.
2021-05-13 11:13:28.490  INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener                : Assigned: [so67520619-0]
2021-05-13 11:13:28.494  INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener                : [foo, foo, foo, foo]

编辑

在60/65秒的时间里,它仍然对我有效。。。

2021-05-13 17:24:28.111  INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener                : Assigned: [so67520619-0]
2021-05-13 17:24:28.130  INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener                : [foo, foo, foo, foo, foo, foo]
2021-05-13 17:25:28.147  INFO 37063 --- [ad | so67520619] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Member consumer-so67520619-1-269ac261-3838-4925-a9a7-fd0687db3522 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-05-13 17:25:33.135  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Failing OffsetCommit request since the consumer is not part of an active group
2021-05-13 17:25:33.141 ERROR 37063 --- [o67520619-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

...

2021-05-13 17:25:33.141  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-05-13 17:25:33.141  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Lost previously assigned partitions so67520619-0
2021-05-13 17:25:33.141  INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener                : Revoked: [so67520619-0]
2021-05-13 17:25:33.142  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 17:25:33.145  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-05-13 17:25:33.145  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 17:25:33.250  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Finished assignment for group at generation 9: {consumer-so67520619-1-bd22a252-64f2-4be3-a6eb-8371b8f95ff2=Assignment(partitions=[so67520619-0])}
2021-05-13 17:25:33.254  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Successfully joined group with generation 9
2021-05-13 17:25:33.254  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Notifying assignor about the new Assignment(partitions=[so67520619-0])
2021-05-13 17:25:33.255  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Adding newly assigned partitions: so67520619-0
2021-05-13 17:25:33.256  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Found no committed offset for partition so67520619-0
2021-05-13 17:25:33.258  INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Resetting offset for partition so67520619-0 to offset 0.
2021-05-13 17:25:33.258  INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener                : Assigned: [so67520619-0]
2021-05-13 17:25:33.261  INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener                : [foo, foo, foo, foo, foo, foo]

Kafka在60秒后投诉,5秒后提交失败。

 类似资料:
  • 我正在使用karafka阅读主题,并调用外部服务。每次调用外部服务大约需要300毫秒。在消费者组中运行3个消费者(k8s中的3个pod),我预计每秒可以实现10个事件。我看到这些日志线,这也证实了处理每个单独事件的300ms预期。然而,整体吞吐量不相加。每个karafka进程似乎在处理两批事件之间卡住了很长时间。 遵循 方法的检测,意味着使用者代码本身不会花费时间。 https://github

  • 假设我有两个经纪人。 我读到Kafka制作人创建的制作人线程等于经纪人的数量。在这种情况下,我将有两个内部线程。 假设我有5个主题,每秒只收到200条消息。Kafka如何进行批处理? 一批大小=30条消息。[topic1=5,topic2=10,topic3=3,topic4=10,topic5=2消息]这些是最重要的消息和相应的主题。 Kafka是如何执行批处理的?

  • 我尝试在使用邮件时进行以下错误处理: 如果出现序列化错误:在DLT中发送消息 我拥有的(2.5.1Kafka客户端的Spring kafka 2.5.5版本)如下: 现在,如果我发送不可序列化的消息,我的消息将不重试地发送到DLT- 在我的中,我有一个,捕获并重新捕获。 我应该没有重试,但我得到了2个重试,每个20秒(而不是10秒?),并在2次重试后向DLT发送了一条消息。 如果我删除errorH

  • 我有一个Python进程(或者更确切地说,在一个使用者组中并行运行的一组进程),它根据来自某个主题的Kafka消息输入来处理数据。通常每条消息的处理都很快,但有时,取决于消息的内容,可能需要很长时间(几分钟)。在这种情况下,Kafka broker断开客户端与组的连接,并启动重新平衡。我可以将设置为一个非常大的值,但它可能会超过10分钟,这意味着如果客户机死亡,集群在10分钟内无法正确地重新平衡。

  • 想知道Kafka使用者(Java客户端)是否可以并行读取和处理多条消息...我的意思是使用多个线程...我应该使用rxJava吗?? 1)这样做是一个好的方法吗???2)而且根据我的理解,Kafka甚至把每一个线程都当作消费者...如果我错了,请纠正我... 3)并且还想让Java客户端作为守护进程服务在Linux中运行,这样它就可以连续运行,并且轮询Kafka的消息,读取和处理都是一样的...这

  • 我有代码: 处理消费者异常的最佳方法是什么?现在,如果异常引发,它将被吞没。。。