当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者中错误处理的更好方法

陈野
2023-03-14

我有一个配置了spring kafka的Springboot应用程序,我想处理听主题时可能发生的各种错误。如果由于反序列化或任何其他异常而丢失/无法使用任何消息,将重试2次,然后将消息记录到错误文件中。我有两种方法可以遵循:-

第一种方法(使用带有DeadLetterPublishingRecoverer的SeekTocurInterrorHandler):-

@Autowired
KafkaTemplate<String,Object> template;

@Bean(name = "kafkaSourceProvider")
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
        Map<String, Object> config = appProperties.getSource()
                .getProperties();
        ConcurrentKafkaListenerContainerFactory<K, V> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));

        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
                (r, e) -> {
                    if (e instanceof FooException) {
                        return new TopicPartition(r.topic() + ".DLT", r.partition());
                    }
                });
        ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));

        factory.setErrorHandler(errorHandler);
        return factory;
    }

但为此,我们需要添加主题(一个新的.DLT主题),然后我们可以将其记录到一个文件中。

@Bean
    public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
                StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
        return new KafkaAdmin(configs);
    }
    
@KafkaListener( topics = MY_TOPIC + ".DLT", groupId = MY_ID)
public void listenDlt(ConsumerRecord<String, SomeClassName> consumerRecord,
    @Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String exceptionStackTrace) {

    logger.error(exceptionStackTrace);
}

方法2(使用自定义SeekToMONtErrorHandler):-

@Bean
    public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
        Map<String, Object> config = appProperties.getSource()
                .getProperties();
        ConcurrentKafkaListenerContainerFactory<K, V> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
        
        factory.setErrorHandler(new CustomSeekToCurrentErrorHandler());
        factory.setRetryTemplate(retryTemplate());
        return factory;
    }

private RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(backOffPolicy());
    retryTemplate.setRetryPolicy(aSimpleReturnPolicy);
}

public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

private static final int MAX_RETRY_ATTEMPTS = 2;

CustomSeekToCurrentErrorHandler() {
    super(MAX_RETRY_ATTEMPTS);
}

@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
    try {
        if (!records.isEmpty()) {
            log.warn("Exception: {} occurred with message: {}", exception, exception.getMessage());
            
            super.handle(exception, records, consumer, container);
        }
    } catch (SerializationException e) {
        log.warn("Exception: {} occurred with message: {}", e, e.getMessage());
    }
}

}

有人能提供他们的建议来实现这种功能的标准方法吗?在第一种方法中,我们确实看到了创建。DLT主题和额外的@KafkaListener的开销。在第二种方法中,我们可以直接记录消费者记录异常。

共有2个答案

谢唯
2023-03-14

期望记录我们在容器级别和侦听器级别可能获得的任何异常。

没有重试,以下是我做错误处理的方式:-

如果我们在容器级别遇到任何异常,我们应该能够使用错误描述记录消息负载,并查找该偏移量,跳过它,然后继续接收下一个偏移量。虽然只针对反序列化异常执行此操作,但其他异常也需要进行查找,并且需要跳过它们的偏移量。

@Component
public class KafkaContainerErrorHandler implements ErrorHandler {

    private static final Logger logger = LoggerFactory.getLogger(KafkaContainerErrorHandler.class);

    @Override
    public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];

        // modify below logic according to your topic nomenclature
        String topics = s.substring(0, s.lastIndexOf('-'));
        int offset = Integer.parseInt(s.split("offset ")[1]);
        int partition = Integer.parseInt(s.substring(s.lastIndexOf('-') + 1).split(" at")[0]);

        logger.error("...")
        TopicPartition topicPartition = new TopicPartition(topics, partition);
        logger.info("Skipping {} - {} offset {}",  topics, partition, offset);
        consumer.seek(topicPartition, offset + 1);
    }

    @Override
    public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {

    }
}


 factory.setErrorHandler(kafkaContainerErrorHandler);

如果我们在@KafkaListener级别得到任何异常,那么我将使用自定义错误处理程序配置我的侦听器,并使用如下所示的消息记录异常:-

@Bean("customErrorHandler")
    public KafkaListenerErrorHandler listenerErrorHandler() {
        return (m, e) -> {
            logger.error(...);
            return m;
        };
    }
晋骏喆
2023-03-14

在第一种方法中,无需使用死信发布恢复程序,您可以使用所需的任何消费者记录恢复程序;实际上,默认恢复程序只是记录失败的消息。

/**
 * Construct an instance with the default recoverer which simply logs the record after
 * the backOff returns STOP for a topic/partition/offset.
 * @param backOff the {@link BackOff}.
 * @since 2.3
 */
public SeekToCurrentErrorHandler(BackOff backOff) {
    this(null, backOff);
}

在故障记录跟踪器中。。。

if (recoverer == null) {
    this.recoverer = (rec, thr) -> {
        
        ...

        logger.error(thr, "Backoff "
            + (failedRecord == null
                ? "none"
                : failedRecord.getBackOffExecution())
            + " exhausted for " + ListenerUtils.recordToString(rec));
    };
}

在侦听器适配器中添加重试后,备份(和重试限制)被添加到错误处理程序中,因此它是“更新的”(并且是首选的)。

此外,如果使用长BackOffs,使用内存中重试可能会导致重新平衡问题。

最后,只有SeekToMONtErrorHandler可以处理反序列化问题(通过ErrorHandlingDeserializer)。

编辑

错误处理反序列化程序seekToCurInterrorHandler一起使用。反序列化异常被认为是致命的,并且会立即调用恢复程序。

看留档。

下面是一个简单的Spring Boot应用程序演示:

public class So63236346Application {


    private static final Logger log = LoggerFactory.getLogger(So63236346Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So63236346Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so63236346").partitions(1).replicas(1).build();
    }

    @Bean
    ErrorHandler errorHandler() {
        return new SeekToCurrentErrorHandler((rec, ex) -> log.error(ListenerUtils.recordToString(rec, true) + "\n"
                + ex.getMessage()));
    }

    @KafkaListener(id = "so63236346", topics = "so63236346")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so63236346", "{\"field\":\"value1\"}");
            template.send("so63236346", "junk");
            template.send("so63236346", "{\"field\":\"value2\"}");
        };
    }

}
package com.example.demo;

public class Thing {

    private String field;

    public Thing() {
    }

    public Thing(String field) {
        this.field = field;
    }

    public String getField() {
        return this.field;
    }

    public void setField(String field) {
        this.field = field;
    }

    @Override
    public String toString() {
        return "Thing [field=" + this.field + "]";
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Thing

结果

Thing [field=value1]
2020-08-10 14:30:14.780 ERROR 78857 --- [o63236346-0-C-1] com.example.demo.So63236346Application   : so63236346-0@7
Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[106, 117, 110, 107]] from topic [so63236346]
2020-08-10 14:30:14.782  INFO 78857 --- [o63236346-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so63236346-1, groupId=so63236346] Seeking to offset 8 for partition so63236346-0
Thing [field=value2]
 类似资料:
  • 我尝试在使用邮件时进行以下错误处理: 如果出现序列化错误:在DLT中发送消息 我拥有的(2.5.1Kafka客户端的Spring kafka 2.5.5版本)如下: 现在,如果我发送不可序列化的消息,我的消息将不重试地发送到DLT- 在我的中,我有一个,捕获并重新捕获。 我应该没有重试,但我得到了2个重试,每个20秒(而不是10秒?),并在2次重试后向DLT发送了一条消息。 如果我删除errorH

  • 我正在使用一个Kafka产品和一个SpringKafka消费者。我正在使用Json序列化器和反序列化器。每当我试图从主题中读取消费者中的消息时,我会得到以下错误: 我没有在生产者和消费者中配置任何关于头的内容。我错过了什么?

  • 我对kafka和kafka-python相当陌生。安装kafka-python后,我从这里尝试了一个简单的消费者代码实现-http://kafka-python.readthedocs.io/en/master/usage.html 我一直在kafka的bin目录中编写消费者代码,并尝试从那里运行python代码。但是,我遇到以下错误: 回溯(最近一次调用):文件 “KafkaConsumer.p

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者