当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者自动提交是如何工作的?

赖翰
2023-03-14

我正在读这篇:

自动提交提交偏移量最简单的方法是允许消费者为您执行。如果您配置启用。汽车commit=true,则每五秒钟消费者将提交客户端从poll()收到的最大偏移量。五秒钟的间隔是默认值,由设置“自动”控制。犯罪间隔ms.与消费者中的其他所有内容一样,自动提交由轮询循环驱动。无论何时进行轮询,使用者都会检查是否到了提交的时间,如果是,它将提交上次轮询中返回的偏移量。

也许问题是我的英语不好,但我不完全理解这个描述。

假设我使用自动提交,默认间隔为5秒,轮询每7秒进行一次。在这种情况下,是每5秒还是每7秒进行一次提交?

如果投票每3秒进行一次,你能澄清一下行为吗?提交是每5秒还是每6秒发生一次<我读过这篇:

自动提交:您可以设置自动。提交至true并设置自动。犯罪间隔ms属性,其值以毫秒为单位。启用此功能后,Kafka使用者将提交为响应其poll()调用而收到的最后一条消息的偏移量。poll()调用在后台以set auto发出。犯罪间隔太太

这与答案相矛盾。

你能详细解释一下这东西吗?

假设我有这样的图表:

0秒-轮询
4秒-轮询
8秒-轮询

什么时候提交抵消,什么时候提交?

共有3个答案

拓拔耀
2023-03-14

这里有一个简单的代码来测试它是如何工作的。

医生-

public class KafkaTest {
    
    public static final String KAFKA_TOPIC_NAME = "kafka-xx-test-topic";
    public static final String CONSUMER_GROUP_ID = "test-consumer-xx";
    public static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        final KafkaProducer<Object, Object> kafkaProducer = new KafkaProducer<>(getProps());
        for (int i = 0; i < 1000; i++) {
            kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC_NAME, "Data_" + i));
        }
        final Consumer<Long, String> consumer = new KafkaConsumer<>(getProps());
        consumer.subscribe(Collections.singletonList(KAFKA_TOPIC_NAME));
        TopicPartition actualTopicPartition = new TopicPartition(KAFKA_TOPIC_NAME, 0);
        while (true) {
            final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(60));
            consumerRecords.forEach(record -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(200);
                } catch (InterruptedException e) {
                }
            });
            final long committedOffset = consumer.committed(Collections.singleton(actualTopicPartition)).get(actualTopicPartition).offset();
            final long consumerCurrentOffset = consumer.position(actualTopicPartition);
            System.out.println("Poll finish.. consumer-offset: " + consumerCurrentOffset + " - committed-offset: " + committedOffset + " " + LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")));
        }
    }

    private static Map<String, Object> getProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); //  Default: latest
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // Default: true
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); // Default: 500
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Default: 5000
        return props;
    }
}
  • 每2秒轮询一次
  • 每5秒自动提交一次

像下面这样输出

Poll finish.. consumer-offset: 1010 - committed-offset: 1000 17:07:05
Poll finish.. consumer-offset: 1020 - committed-offset: 1000 17:07:07
Poll finish.. consumer-offset: 1030 - committed-offset: 1000 17:07:09
Poll finish.. consumer-offset: 1040 - committed-offset: 1030 17:07:11 -> commit when poll finish because of elapsed time(6 sec) > commit interval(5 sec)
Poll finish.. consumer-offset: 1050 - committed-offset: 1030 17:07:13
Poll finish.. consumer-offset: 1060 - committed-offset: 1030 17:07:15
Poll finish.. consumer-offset: 1070 - committed-offset: 1060 17:07:17 -> auto commit 
Poll finish.. consumer-offset: 1080 - committed-offset: 1060 17:07:19
Poll finish.. consumer-offset: 1090 - committed-offset: 1060 17:07:21
Poll finish.. consumer-offset: 1100 - committed-offset: 1090 17:07:23 -> auto commit 
于意智
2023-03-14

它会在轮询完成后尝试尽快自动提交。您可以查看消费者协调器的源代码,其中包含在类级别定义的一组本地字段,以了解是否启用了自动提交、间隔是多少以及下一个执行自动提交的截止日期是什么。

https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L625

轮询中执行调用进行存储的地方之一https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L279

也就是说,例如每7秒执行一次轮询,并将自动提交设置为5:

0-投票,将截止日期设置为第5秒

7-投票提交截止日期,将截止日期更新为7 5=12

14-投票提交截止日期,将截止日期更新为12 5=17

但是,如果轮询设置为每3秒一次,并且自动提交设置为5:

0-投票,将截止日期设置为第5秒

3-投票,不提交

6-投票提交截止日期,将截止日期更新为6 5=11

钮善
2023-03-14

每次轮询都会调用自动提交检查,它会检查经过的时间是否大于配置的时间。如果是这样,则提交偏移量。

如果提交间隔为5秒,而轮询在7秒内发生,则提交将仅在7秒后发生。

 类似资料:
  • 我有一个Kafka消费者,其中消息通过HTTP POST调用传递给另一个应用程序。我还使用手动提交偏移量 确认。确认(); 有一些HTTP返回错误代码,我们忽略错误并提交偏移量,还有一些错误代码我们不提交偏移量。问题是,kafka使用者仅在我重新启动使用者时才轮询未提交的消息。如果分区中有未提交的消息,是否还有轮询消息的地方?

  • 谁能请解释和指导我链接或资源阅读关于Kafka消费者如何在下面的场景下工作。 > 一个有5个消费者的消费者组和3个分区的主题(Kafka是如何决定的) 一个消费者组有5个消费者,主题有10个分区(kafka如何分担负载) 两个消费者组和两个服务器的kafka集群,其中一个主题被划分在节点1和节点2之间,当来自不同组的消费者订阅到一个分区时,如何避免重复。 上面可能不是配置kafka时的最佳实践,但

  • 我要求从主题中读取消息,对它们进行批处理,然后将批处理推送到外部系统。如果批处理因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批处理,每个分区的 from 和 to 偏移量都存储在数据库中。为了实现这一点,我通过向读取器分配分区来为每个分区创建一个Kafka使用者,基于先前存储的偏移量,使用者寻求该位置并开始读取。我已关闭自动提交,并且不提交来自使用者的偏移量。对于每个批处理

  • Kafka新手。 Kafka版本:2.3.1 我正在尝试使用Spring cloud使用来自两个主题的Kafka消息。除了kafka活页夹和下面的一些简单配置之外,我没有做太多配置。每当(组协调器lbbb111a.uat.pncint.net:9092(id:2147483641机架:null)不可用或无效时,将尝试重新发现)发生时,已经处理的一堆消息会再次被处理。不确定发生了什么。

  • 本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费

  • 本文向大家介绍python kafka 多线程消费者&手动提交实例,包括了python kafka 多线程消费者&手动提交实例的使用技巧和注意事项,需要的朋友参考一下 官方文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html 以上这篇python kafka 多线程消费者&手动提交实例就是小编分享给大家