当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者错误处理:混淆错误处理/RetryTemplate

燕永昌
2023-03-14

我尝试在使用邮件时进行以下错误处理:

  • 如果出现序列化错误:在DLT中发送消息

我拥有的(2.5.1Kafka客户端的Spring kafka 2.5.5版本)如下:

@Bean(name = "kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, MyObject> kafkaListenerContainerFactory() throws ExceptionTechnique {
        ConcurrentKafkaListenerContainerFactory<String, MyObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(RECORD);
        factory.setRetryTemplate(retryTemplate());
        factory.setRecoveryCallback(context -> {
           Object record = context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD);
           LOGGER_TECHNIQUE.error("Fail to handle message after {} retries. {}",
                context.getRetryCount(), record);
           return record;
        });
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaOperations,
                        (cr, e) -> new TopicPartition("myTopicToRead.dlq", cr.partition())),
                        new FixedBackOff(10000L, 2L)));
        factory.setConcurrency(kafkaListenerConcurrency);
        factory.setStatefulRetry(true);

        return factory;
    }

@Bean
public RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(backOffPolicy());
    retryTemplate.setRetryPolicy(retryPolicy());
    return retryTemplate;
}

private BackOffPolicy backOffPolicy() {
    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    fixedBackOffPolicy.setBackOffPeriod(10000);
    return fixedBackOffPolicy;
}

private SimpleRetryPolicy retryPolicy() {
    Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();

    exceptionMap.put(IllegalArgumentException.class, false);
    exceptionMap.put(TimeoutException.class, false);
    exceptionMap.put(ListenerExecutionFailedException.class, true);
    // Custom exception
    exceptionMap.put(MyTechnicalException.class, false);
    exceptionMap.put(MyFunctionalException.class, false);

    return new SimpleRetryPolicy(3, exceptionMap, true);
}

现在,如果我发送不可序列化的消息,我的消息将不重试地发送到DLT-

在我的@KafkaHandler中,我有一个抛出新的MyTechnicalException,捕获并重新捕获。

我应该没有重试,但我得到了2个重试,每个20秒(而不是10秒?),并在2次重试后向DLT发送了一条消息。

如果我删除errorHandler,不足为奇的是,我尝试了3次,并且显示了我的日志错误消息。。。但我需要发送给DLQ。。。

如果我删除RetryTemplate和RecoveryCallback,这并不奇怪,但所有异常都会重试。。。

问题:

  1. 有办法处理我的用例吗?怎么做

共有1个答案

侯和惬
2023-03-14

由于错误处理程序已经发展到涵盖RetryTemplate提供的所有功能(回退、异常分类等),监听器级别的重试已被弃用。

https://github.com/spring-projects/spring-kafka/issues/1886

 类似资料:
  • 我有一个配置了spring kafka的Springboot应用程序,我想处理听主题时可能发生的各种错误。如果由于反序列化或任何其他异常而丢失/无法使用任何消息,将重试2次,然后将消息记录到错误文件中。我有两种方法可以遵循:- 第一种方法(使用带有DeadLetterPublishingRecoverer的SeekTocurInterrorHandler):- 但为此,我们需要添加主题(一个新的.

  • 我正在使用一个Kafka产品和一个SpringKafka消费者。我正在使用Json序列化器和反序列化器。每当我试图从主题中读取消费者中的消息时,我会得到以下错误: 我没有在生产者和消费者中配置任何关于头的内容。我错过了什么?

  • 通过对错误类型实现 Display 和 From,我们能够利用上绝大部分标准库错误处理工具。然而,我们遗漏了一个功能:轻松 Box 我们错误类型的能力。 标准库会自动通过 Form 将任意实现了 Error trait 的类型转换成 trait 对象 Box<Error> 的类型(原文:The std library automatically converts any type that imp

  • 错误处理(error handling)是处理可能发生失败情况的过程。例如读取一个文件失败,然后继续使用这个失效的输入显然是有问题的。错误处理允许我们以一种显式的方式来发现并处理这类错误,避免了其余代码发生潜在的问题。 有关错误处理的更多内容,可参考官方文档的错误处理的章节。

  • 处理一个 RESTful API 请求时, 如果有一个用户请求错误或服务器发生意外时, 你可以简单地抛出一个异常来通知用户出错了。 如果你能找出错误的原因 (例如,所请求的资源不存在),你应该 考虑抛出一个适当的HTTP状态代码的异常 (例如, yii\web\NotFoundHttpException意味着一个404 HTTP状态代码)。 Yii 将通过HTTP状态码和文本发送相应的响应。 它还

  • Yii 内置了一个error handler错误处理器,它使错误处理更方便, Yii错误处理器做以下工作来提升错误处理效果: 所有非致命PHP错误(如,警告,提示)会转换成可获取异常; 异常和致命的PHP错误会被显示, 在调试模式会显示详细的函数调用栈和源代码行数。 支持使用专用的 控制器操作 来显示错误; 支持不同的错误响应格式; error handler 错误处理器默认启用, 可通过在应用的