当前位置: 首页 > 知识库问答 >
问题:

作为批处理的AckMode如何与max.poll.interval.ms工作和作为“false”的enable.auto.commit一起工作?

宫子晋
2023-03-14

我使用的是spring-kafka“2.1.7.Release”,我试图理解max.poll.interval.ms是如何将AckMode作为批处理而将enable.auto.commit作为“false”工作的。这里是我的消费者设置。

    public Map<String, Object> setConsumerConfigs() {

           Map<String, Object> configs = = new HashMap<>();

           configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
           configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

           configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "400000");

           configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
           configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);

           configs.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, stringDeserializerClass);
           configs.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, kafkaAvroDeserializerClass.getName());

           configs.setPartitionAssignmentStrategyConfig(Collections.singletonList(RoundRobinAssignor.class));

           // Set this to true so that you will have consumer record value coming as your pre-defined contract instead of a generic record
           sapphireKafkaConsumerConfig.setSpecificAvroReader("true");
       }

这是我的出厂设置

        @Bean
         public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
           ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
           factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getConsumerConfigs));
           factory.getContainerProperties().setMissingTopicsFatal(false);

           factory.getContainerProperties().setAckMode(AckMode.BATCH);

           factory.setErrorHandler(myCustomKafkaSeekToCurrentErrorHandler);
           factory.setRetryTemplate(retryTemplate());
           factory.setRecoveryCallback(myCustomKafkaRecoveryCallback);
           factory.setStatefulRetry(true);
           return factory;
         }

         public RetryTemplate retryTemplate() {
            RetryTemplate retryTemplate = new RetryTemplate();
            retryTemplate.setListeners(new RetryListener[]{myCustomKafkaRetryListener});
            retryTemplate.setRetryPolicy(myCustomKafkaConsumerRetryPolicy);

            FixedBackOffPolicy backOff = new FixedBackOffPolicy();
            backOff.setBackOffPeriod(1000);
            retryTemplate.setBackOffPolicy(backOff);


            return retryTemplate;
          }

这是我的消费者,我增加了2分钟的延迟

@KafkaListener(topics = TestConsumerConstants.CONSUMER_LONGRUNNING_RECORDS_PROCESSSING_TEST_TOPIC
      , clientIdPrefix = "CONSUMER_LONGRUNNING_RECORDS_PROCESSSING"
      , groupId = "kafka-lib-comp-test-consumers")
  public void consumeLongRunningRecord(ConsumerRecord message) throws InterruptedException {
    System.out.println(String.format("\n \n Received message at %s offset %s of partition %s of topic %s with key %s \n\n", DateTime.now(),
        message.offset(), message.partition(), message.topic(), message.key()));

    TimeUnit.MINUTES.sleep(2);

    System.out.println(String.format("\n \n Processing done for the message at %s offset %s of partition %s of topic %s with key %s \n\n", DateTime.now(),
        message.offset(), message.partition(), message.topic(), message.key()));
  }

现在,我发布了5条消息,并观察到它处理了所有记录,没有任何问题。但是,如果我将AckMode设置为记录它在处理第4条消息后提交偏移量时抛出以下错误,然后两次处理同一条消息(这是意料之中的)。

根据spring-kafka文档,AckMode=BATCH将在poll()返回的所有记录都已处理完毕时提交偏移量。

现在,我的问题是,AckMode如何在传递max.poll.interval.ms之后改变行为而不引起重新平衡?请帮我理解一下。

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

共有1个答案

仇和蔼
2023-03-14

我认为这条线将有助于澄清一些事情如果不是完全的,

Kafka 0.10.0.0和更高版本的session.timeout.ms和max.poll.interval.ms之间的差异

 类似资料:
  • 我知道ConcurrentKafkaListenerContainerFactory具有属性“setBatchListener(true)”,但就我对Kafka文档的理解而言,这种类型的工厂配置仅与@KafKalistener注释一起使用。 任何帮助都将不胜感激。

  • 问题内容: 我们有一些使用方路径(ftp,file,smb)从远程系统读取文件。简化了直接路径测试,但与批处理使用者的行为类似: 转换后,一次轮询的所有结果将在单独的路径中按批汇总: 如果每个消费者分开运行,则一切正常。但是,如果多个使用者并行运行,聚合将拆分民意测验。例如,如果文件消费者轮询500条消息,并且第二条路线开始从ftp读取6个文件,则期望得到2个聚合1,其中包含来自文件的500条消息

  • 我对Lombok和JSTL如何处理getters和setters有点困惑。我有以下@data类: 并且我可以访问.jsp中的私有布尔值,如下所示: 同时我也可以将私有布尔值声明为 并以相同的方式在JSTL中访问它。但是,以下代码将引发PropertyNotFoundException: 谁能澄清一下我在同时使用JSTL和Lombok时应该使用什么命名约定吗?提前道谢!

  • 我不明白为什么第一个例子有效,但第二个不有效。我相信这与调用json将响应解析为javascript对象有关?那么它返回一个promise,必须放在一个然后函数中?我得到这个是因为在第三个例子中抛出的错误。#json到底是做什么的?

  • 我有怪异的工作行为,不明白为什么会这样。我有以下spring批处理配置: 因此,我有一个step:throttle-limit=5和commit-interval=“100”,我假设writer将接收100个项目的块来编写,所有块都将被逐个处理。 但我有以下流程: 如果作业在流中有4个项目,那么我会看到writer收到1个项目并调用4次,而不是单次调用4个项目。此外,所有这4个调用都是并发执行的,

  • 我设置了一个HTTPS代理,这样HTTP客户端就可以安全地向代理发送普通HTTP请求。例如,客户机可以向代理发送加密的HTTP GET请求,代理将删除加密并将普通HTTP GET请求发送到终端站点。 我了解到这不是一个常见的设置,只有GoogleChrome具有内置功能来支持这样的场景。(信息在这里-http://wiki.squid-cache.org/Features/HTTPS#Encryp