当前位置: 首页 > 知识库问答 >
问题:

SpringKafka-偏移量未设置为on错误

莫河
2023-03-14

在我的侦听器中,在使用消息后,如果发生任何异常,我将抛出一个异常。如果它成功了,那么我承认。但是,即使抛出异常,偏移量也不会后退。i、 e重试没有按预期进行。错误事件不会再次出现。

此外,我发现我没有消费所有预期的消息。我做错什么了吗?

@Bean
public ConcurrentKafkaListenerContainerFactory<String,Result<FormatException,String>> kafkaListenerContainerFactory(){
    ConcurrentKafkaListenerContainerFactory<String, Result<FormatException, String>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(this.consumerFactory());
    factory.setErrorHandler(new SeekToCurrentErrorHandler(3));
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

    return factory;
}


public ConsumerFactory<String, Result<FormatException, String>> consumerFactory() {
   kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}

监听器类

@KafkaListener(topics = "${gpc.consumer.topic}")
    public void listenWithHeadersGPC(ConsumerRecord<String, Result<FormatException, String>> consumerRecord,
            Acknowledgment acknowledgment) {
        try {
            String body = consumerRecord.value().get();
            log.logInfo("Received message for " + GPCSourceName + ": " + body);
            log.logInfo("consumption successful from kafka. partition="+consumerRecord.partition()+", "
                    + "offset="+consumerRecord.offset());
                            if(body.contains("berlin+braze@gmail.com")) {
                                throw new RuntimeException("custom exception");
                            }
                            log.logInfo("Filtered message for " + GPCSourceName + " : " + body);
            acknowledgment.acknowledge();
        }catch(Exception e) {
            log.logError("Exception occurred in listenWithHeadersGPC: ", e);
            throw e;
        }
    }

我正在做确认。承认() 在我的侦听器中,如果没有异常

编辑:SpringKafka版本:2.2.8


共有1个答案

颜举
2023-03-14

工厂。setErrorHandler(新的SeekTocurErrorHandler(3))

在这种配置下,你会看到3次重试,两次之间的延迟为零;最后记录下来。默认情况下,“已恢复”记录的偏移量不会提交。

发布这样的问题时;指出您使用的版本。

从2.3版开始,您可以使用确认。nack(睡眠时间);您还可以在seekTocurInterrorHandler上调用setCommitRecovered(true),恢复记录的偏移量将被提交(使用手动\u IMMEDIATE)。

只有当异常被抛出到容器时,这一切才会发生;如果在侦听器中捕捉到异常,则不会看到重试。

如果您根本看不到重试,并且您的侦听器正在抛出异常,那么您的侦听器或配置有问题。显示所有配置@KafkaListener代码。

 类似资料:
  • 我能够使用ErrorDesrializationHandler成功处理反序列化错误,但当我重新启动我的消费者时,它再次开始重新处理由于反序列化而导致的所有失败消息。 由于反序列化异常无法到达Kafka Listener,如何确认并提交偏移量? 谢谢。 我正在使用的自定义错误处理程序: }

  • 我有如下代码 每当我通过将其设置为实体来保存到我的数据库时,它会添加5:30作为偏移量

  • 我想开始最新的抵消,而不是为旧的价值所困扰。是否有可能重置该组的偏移量?

  • 我用Kafka和spring-布特: Kafka制作人班: Kafka-配置: 问题: 我有一个主题的5个分区,比方说。 发生的情况是,我获得成功(即消息成功发送到Kafka)日志,但是topic的无分区的偏移量增加。 正如您在上面看到的,我添加了日志和。我所期望的是,当Kafka不能发送消息给Kafka时,我应该得到一个错误,但在这种情况下,我没有收到任何错误消息。 Kafka的上述行为以的比例

  • 问题内容: 使用javascript,我知道我的用户时区为UTC +3。 现在,我想用此知识创建DateTime对象: 我得到的回应是: 我究竟做错了什么?我该如何解决? 问题答案: 这个怎么样…

  • 我有一个LocalDateTime的实例。 我需要映射它XMLGregorianCalendar(这里使用JAXB ),在最终的XML中,我希望时间在XML文档中看起来像下面这样:2020-03-04T19:45:00.000 1:00 (1小时是UTC的偏移量)。 我尝试过使用DateTimeFormatter将LocalDateTime转换为字符串,然后将其映射到XMLGregorianCal