当前位置: 首页 > 知识库问答 >
问题:

消费者重启后,将重播topic中的Kafka消息

融修平
2023-03-14

我在kafka中面临一个奇怪的问题,即在消费者应用程序重新启动后,所有来自主题的kafka消息都在重播。有人能帮我我在这里做错了什么吗?

这是我的配置:

spring.kafka.consumer.auto-偏移-重置=最早

spring.kafka.enable.auto。提交=false

我的生产者配置:

        producerconfigs.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
        producerconfigs.put(ACKS_CONFIG, "all");
        producerconfigs.put(ProducerConfig.CLIENT_ID_CONFIG, "client.id");
        producerconfigs.put(RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        producerconfigs.put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        producerconfigs.put(TRANSACTIONAL_ID_CONFIG, "V1-"+ UUID.randomUUID().toString());

消费者配置:

        consumerconfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        consumerconfigs.put(SESSION_TIMEOUT_MS_CONFIG, "10000");
        consumerconfigs.put("isolation.level", "read_committed");

消费者代码:

@KafkaListener(topics = "TOPIC-1", groupId = "TOPIC-GRP", containerFactory = "kaListenerContainerFactory",concurrency = "10", autoStartup = "true")
public String processMesage(@Payload String message,@Header(value = KafkaHeaders.CORRELATION_ID, required = false) String correlationId,@Header(value = KafkaHeaders.OFFSET, required = false) String offset) throws JsonProcessingException {//business logic goes here }

集装箱代码

     @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
                kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
        ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryString());
        factory.setBatchListener(true);
        return factory;
    }

消费者配置

            Map<String, Object> getConsumerProperties() {
         Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          environment.getProperty("spring.kafka.consumer.bootstrap-servers"));
        config.put(ConsumerConfig.GROUP_ID_CONFIG, 
       environment.getProperty("spring.kafka.consumer.group-id"));
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
        environment.getProperty("spring.kafka.consumer.auto-offset-reset"));
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
       environment.getProperty("spring.kafka.enable.auto.commit"));
        config.put(KEY_DESERIALIZER_CLASS_CONFIG, 
      environment.getProperty("spring.kafka.consumer.key-deserializer"));
        config.put(VALUE_DESERIALIZER_CLASS_CONFIG,  
      environment.getProperty("spring.kafka.consumer.value-deserializer"));
        config.put("isolation.level", "read_committed");
        return config;
    }

应用程序.属性

spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.consumer.group-id=consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.enable.auto.commit= false

共有1个答案

巫马翰翮
2023-03-14

我不确定是否有人回答了这个问题,但似乎是config导致了这个问题。

Auto commit false意味着kafka主题永远不会有消费群体遍历的偏移量信息,Auto offset reset Early意味着始终从开始读取。

spring.kafka.enable.auto.commit= false
spring.kafka.consumer.auto-offset-reset=earliest
 类似资料:
  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

  • 我将python kafka consumer的< code>auto_commit设置为< code>False,我正在手动提交消息。然而,重启后,消费者再次消费来自每个分区的最后一条消息。只有最后一个,不能再多。 这就是所展示的: 不知道为什么会显示滞后,whu当前偏移设置为最后一条消息而不是下一条?当我提交偏移量3时,当前偏移量不应该移动到4吗? 我提交我使用的每条消息,但是在重启时,它总是

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 问题是Spring Kafka侦听器只配置了主题名。 我似乎可以让Kafka产生100个消费者来处理来自“队列”(日志)的消息。怎么能做到呢?

  • 我有一个Kafka集群正在运行,当重新启动应用程序(消费者)时,它会跳过一些在应用程序关闭时推送到主题的消息。 当应用程序启动时,我可以看到它读取带有偏移量的消息,然后将偏移量推送到。然后当应用程序关闭时,带有偏移量的消息被推送到主题。重启应用程序后,它读取并将其偏移量设置为,因此跳过。 这是我的配置:

  • 然而,当在我的环境中测试此示例时,我得到了一个异常。