曾发表过多篇文章,但大多数都与处理错误消息有关,而不是处理过程中的异常处理。
我想知道如何处理流应用程序接收到的消息,并且在处理消息时出现异常?异常可能是由于多种原因造成的,如网络故障、RuntimeException等。,
setUncaughtExceptionHandler不帮助处理异常,它在流因未捕获的异常而终止后工作。
Kafka提供了几种处理异常的方法。简单的try-catch{}有助于捕获处理器代码中的异常,但kafka反序列化异常(可能是由于数据问题)和生产异常(在与代理通信期间发生)分别需要反序列化异常处理程序和生产异常处理程序。默认情况下,如果Kafka应用程序遇到以下任何一种情况,它都会失败。
你可以在这个帖子上找到
用于处理消费者方面的异常,
1) 您可以使用以下属性在producer中添加默认异常处理程序。
"default.deserialization.exception.handler" = "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler";
基本上,apache提供了三个异常处理程序类
1) LogAndContiuneExceptionHandler,您可以将其作为
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
2) LogAndFailExceptionHandler
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
3) LogAndSkipOnInvalidTimestamp
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndSkipOnInvalidTimestamp.class);
对于自定义异常处理,
1)您可以实现DeserializationExceptionHandler接口并覆盖句柄()方法。
2) 也可以扩展上述类。
这取决于你想做什么,除了生产者方面的例外。若将在生产者上引发异常(例如,由于网络故障或kafka broker已死亡),则默认情况下,流将死亡。使用kafka streams 1.1.0版,您可以通过实现ProductionExceptionHandler来覆盖默认行为,如下所示:
public class CustomProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]", new String(record.value()), record.topic(), exception);
return ProductionExceptionHandlerResponse.CONTINUE;
}
@Override
public void configure(final Map<String, ?> configs) {
}
}
从handle方法,如果不希望流在异常时消亡,可以返回CONTINUE,如果希望流停止,可以返回FAIL(默认为FAIL)。您需要在流配置中指定此类:
default.production.exception.handler=com.example.CustomProductionExceptionHandler
另请注意,ProductionExceptionHandler
只处理生产者上的异常,它不会在使用流方法mapValue(...)
、filter(...)
、分支(...)
等处理消息期间处理异常,您需要将这些方法逻辑包装为try/catch块(将您的所有方法逻辑放入try块以保证您将处理所有异常情况):
.filter((key, value) -> { try {..} catch (Exception e) {..} })
正如我所知,我们不需要显式地处理消费者端的异常,因为kafka流稍后会自动重试消费(因为在消费和处理消息之前,偏移量不会更改);e、 g.如果Kafka代理在一段时间内无法访问,您将从Kafka流中获得异常,并且当中断时,Kafka流将消耗所有消息。因此,在这种情况下,我们只能延迟,没有任何损坏/丢失。
使用setUncaughtExceptionHandler
您将无法更改默认行为,例如使用ProductionExceptionHandler
,使用它您只能将错误记录或发送消息到失败主题。
自kafka streams 2.8.0以来更新
由于kafka streams(Kafka流)是2.8.0版,因此您可以使用KafkaStreams(Kafka流)方法自动替换失败的流线程(由未捕获的异常引起
kafkaStreams.setUncaughtExceptionHandler(ex -> {
log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
问题内容: 曾经经历过多个帖子,但是其中大多数都是相关的处理错误消息,与处理它们时的异常处理无关。 我想知道如何处理流应用程序收到的消息,并且在处理消息时出现异常?该异常可能是由于多种原因造成的,例如网络故障,RuntimeException等, 有人可以建议正确的做法吗?我应该使用 吗?或者,还有更好的方法? 如何处理重试? 问题答案: 这取决于您要如何处理生产者方面的异常。如果将对生产者抛出异
我正在开发一个应用程序,在该应用程序中,事件会导致spring data repository保存数据; 此代码可以引发各种异常,如DataIntegrityViolationException(运行时异常)。 处理此类异常和 生成带有导致此错误的有效负载的消息 例外, 允许生产者采取操作。
我正在努力定制我的spring kafka streams应用程序。我一直试图在我的KStreams上配置处理未捕获(运行时异常)。 参考文档https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_st
我正在Kafka流中的处理器节点上工作。对于一个简单的代码,我编写如下代码只是为了过滤用户ID,这是在kafka流中处理处理器节点的正确方法吗? 但是,下面的代码没有编译,抛出了一个错误:
主要内容:前记,1.processHandlerException方法前记 根据之前的文章方法中的方法返回处理的方法 1.processHandlerException方法 这个方法就是如果出现异常的话, 异常解析器进行处理异常。 先判断是否是注解下的方法, 如果是的话另外处理 -> 判断是否是注解下的方法 这里的主要有3个实现类 1.1注解下的异常 1.2注解下的方法 获取到装填码 获取到出错理由 然后渲染异常的页面 返回空的ModelAndView 1.3解析方
我用Spring云溪和Kafka溪。假设我有一个处理器,它的功能是将KStream字符串转换为KStream CityProgrammes。它调用一个API来根据名称查找城市,并调用另一个转换来查找该城市附近的任何事件。 现在的问题是,任何错误发生在转换期间,整个应用程序停止。我想把一个特定的消息发送给DLQ,然后继续前进。我已经读了几天了,每个人都建议在被调用的服务中处理错误,但在我看来这是一个