我尝试在使用邮件时进行以下错误处理:
我拥有的(2.5.1Kafka客户端的Spring kafka 2.5.5版本)如下:
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, MyObject> kafkaListenerContainerFactory() throws ExceptionTechnique {
ConcurrentKafkaListenerContainerFactory<String, MyObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(RECORD);
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(context -> {
Object record = context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD);
LOGGER_TECHNIQUE.error("Fail to handle message after {} retries. {}",
context.getRetryCount(), record);
return record;
});
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaOperations,
(cr, e) -> new TopicPartition("myTopicToRead.dlq", cr.partition())),
new FixedBackOff(10000L, 2L)));
factory.setConcurrency(kafkaListenerConcurrency);
factory.setStatefulRetry(true);
return factory;
}
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(backOffPolicy());
retryTemplate.setRetryPolicy(retryPolicy());
return retryTemplate;
}
private BackOffPolicy backOffPolicy() {
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(10000);
return fixedBackOffPolicy;
}
private SimpleRetryPolicy retryPolicy() {
Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
exceptionMap.put(IllegalArgumentException.class, false);
exceptionMap.put(TimeoutException.class, false);
exceptionMap.put(ListenerExecutionFailedException.class, true);
// Custom exception
exceptionMap.put(MyTechnicalException.class, false);
exceptionMap.put(MyFunctionalException.class, false);
return new SimpleRetryPolicy(3, exceptionMap, true);
}
现在,如果我发送不可序列化的消息,我的消息将不重试地发送到DLT-
在我的@KafkaHandler
中,我有一个抛出新的MyTechnicalException
,捕获并重新捕获。
我应该没有重试,但我得到了2个重试,每个20秒(而不是10秒?),并在2次重试后向DLT发送了一条消息。
如果我删除errorHandler,不足为奇的是,我尝试了3次,并且显示了我的日志错误消息。。。但我需要发送给DLQ。。。
如果我删除RetryTemplate和RecoveryCallback,这并不奇怪,但所有异常都会重试。。。
问题:
由于错误处理程序已经发展到涵盖RetryTemplate
提供的所有功能(回退、异常分类等),监听器级别的重试已被弃用。
https://github.com/spring-projects/spring-kafka/issues/1886
我有一个配置了spring kafka的Springboot应用程序,我想处理听主题时可能发生的各种错误。如果由于反序列化或任何其他异常而丢失/无法使用任何消息,将重试2次,然后将消息记录到错误文件中。我有两种方法可以遵循:- 第一种方法(使用带有DeadLetterPublishingRecoverer的SeekTocurInterrorHandler):- 但为此,我们需要添加主题(一个新的.
我正在使用一个Kafka产品和一个SpringKafka消费者。我正在使用Json序列化器和反序列化器。每当我试图从主题中读取消费者中的消息时,我会得到以下错误: 我没有在生产者和消费者中配置任何关于头的内容。我错过了什么?
通过对错误类型实现 Display 和 From,我们能够利用上绝大部分标准库错误处理工具。然而,我们遗漏了一个功能:轻松 Box 我们错误类型的能力。 标准库会自动通过 Form 将任意实现了 Error trait 的类型转换成 trait 对象 Box<Error> 的类型(原文:The std library automatically converts any type that imp
错误处理(error handling)是处理可能发生失败情况的过程。例如读取一个文件失败,然后继续使用这个失效的输入显然是有问题的。错误处理允许我们以一种显式的方式来发现并处理这类错误,避免了其余代码发生潜在的问题。 有关错误处理的更多内容,可参考官方文档的错误处理的章节。
处理一个 RESTful API 请求时, 如果有一个用户请求错误或服务器发生意外时, 你可以简单地抛出一个异常来通知用户出错了。 如果你能找出错误的原因 (例如,所请求的资源不存在),你应该 考虑抛出一个适当的HTTP状态代码的异常 (例如, yii\web\NotFoundHttpException意味着一个404 HTTP状态代码)。 Yii 将通过HTTP状态码和文本发送相应的响应。 它还
Yii 内置了一个error handler错误处理器,它使错误处理更方便, Yii错误处理器做以下工作来提升错误处理效果: 所有非致命PHP错误(如,警告,提示)会转换成可获取异常; 异常和致命的PHP错误会被显示, 在调试模式会显示详细的函数调用栈和源代码行数。 支持使用专用的 控制器操作 来显示错误; 支持不同的错误响应格式; error handler 错误处理器默认启用, 可通过在应用的