当前位置: 首页 > 知识库问答 >
问题:

处理Kafka流中的异常

吴靖
2023-03-14

曾发表过多篇文章,但大多数都与处理错误消息有关,而不是处理过程中的异常处理。

我想知道如何处理流应用程序接收到的消息,并且在处理消息时出现异常?异常可能是由于多种原因造成的,如网络故障、RuntimeException等。,

  • 有人能提出正确的方法吗?我应该使用setUncaughtExceptionHandler吗?还是有更好的方法

共有3个答案

宇文鸿畴
2023-03-14

setUncaughtExceptionHandler不帮助处理异常,它在流因未捕获的异常而终止后工作。

Kafka提供了几种处理异常的方法。简单的try-catch{}有助于捕获处理器代码中的异常,但kafka反序列化异常(可能是由于数据问题)和生产异常(在与代理通信期间发生)分别需要反序列化异常处理程序和生产异常处理程序。默认情况下,如果Kafka应用程序遇到以下任何一种情况,它都会失败。

你可以在这个帖子上找到

伊温书
2023-03-14

用于处理消费者方面的异常,

1) 您可以使用以下属性在producer中添加默认异常处理程序。

"default.deserialization.exception.handler" = "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler";

基本上,apache提供了三个异常处理程序类

1) LogAndContiuneExceptionHandler,您可以将其作为

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndContinueExceptionHandler.class);

2) LogAndFailExceptionHandler

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndFailExceptionHandler.class);

3) LogAndSkipOnInvalidTimestamp

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
           LogAndSkipOnInvalidTimestamp.class);

对于自定义异常处理,

1)您可以实现DeserializationExceptionHandler接口并覆盖句柄()方法。

2) 也可以扩展上述类。

萧光华
2023-03-14

这取决于你想做什么,除了生产者方面的例外。若将在生产者上引发异常(例如,由于网络故障或kafka broker已死亡),则默认情况下,流将死亡。使用kafka streams 1.1.0版,您可以通过实现ProductionExceptionHandler来覆盖默认行为,如下所示:

public class CustomProductionExceptionHandler implements ProductionExceptionHandler {

    @Override
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]",  new String(record.value()), record.topic(), exception);
        return ProductionExceptionHandlerResponse.CONTINUE;
    }

    @Override
    public void configure(final Map<String, ?> configs) {
    }

}

从handle方法,如果不希望流在异常时消亡,可以返回CONTINUE,如果希望流停止,可以返回FAIL(默认为FAIL)。您需要在流配置中指定此类:

default.production.exception.handler=com.example.CustomProductionExceptionHandler

另请注意,ProductionExceptionHandler只处理生产者上的异常,它不会在使用流方法mapValue(...)filter(...)分支(...)等处理消息期间处理异常,您需要将这些方法逻辑包装为try/catch块(将您的所有方法逻辑放入try块以保证您将处理所有异常情况):

.filter((key, value) -> { try {..} catch (Exception e) {..} })

正如我所知,我们不需要显式地处理消费者端的异常,因为kafka流稍后会自动重试消费(因为在消费和处理消息之前,偏移量不会更改);e、 g.如果Kafka代理在一段时间内无法访问,您将从Kafka流中获得异常,并且当中断时,Kafka流将消耗所有消息。因此,在这种情况下,我们只能延迟,没有任何损坏/丢失。

使用setUncaughtExceptionHandler您将无法更改默认行为,例如使用ProductionExceptionHandler,使用它您只能将错误记录或发送消息到失败主题。

自kafka streams 2.8.0以来更新

由于kafka streams(Kafka流)是2.8.0版,因此您可以使用KafkaStreams(Kafka流)方法自动替换失败的流线程(由未捕获的异常引起

kafkaStreams.setUncaughtExceptionHandler(ex -> {
    log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
 类似资料:
  • 问题内容: 曾经经历过多个帖子,但是其中大多数都是相关的处理错误消息,与处理它们时的异常处理无关。 我想知道如何处理流应用程序收到的消息,并且在处理消息时出现异常?该异常可能是由于多种原因造成的,例如网络故障,RuntimeException等, 有人可以建议正确的做法吗?我应该使用 吗?或者,还有更好的方法? 如何处理重试? 问题答案: 这取决于您要如何处理生产者方面的异常。如果将对生产者抛出异

  • 我正在开发一个应用程序,在该应用程序中,事件会导致spring data repository保存数据; 此代码可以引发各种异常,如DataIntegrityViolationException(运行时异常)。 处理此类异常和 生成带有导致此错误的有效负载的消息 例外, 允许生产者采取操作。

  • 我正在努力定制我的spring kafka streams应用程序。我一直试图在我的KStreams上配置处理未捕获(运行时异常)。 参考文档https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_st

  • 我正在Kafka流中的处理器节点上工作。对于一个简单的代码,我编写如下代码只是为了过滤用户ID,这是在kafka流中处理处理器节点的正确方法吗? 但是,下面的代码没有编译,抛出了一个错误:

  • 主要内容:前记,1.processHandlerException方法前记 根据之前的文章方法中的方法返回处理的方法 1.processHandlerException方法 这个方法就是如果出现异常的话, 异常解析器进行处理异常。 先判断是否是注解下的方法, 如果是的话另外处理 -> 判断是否是注解下的方法 这里的主要有3个实现类 1.1注解下的异常 1.2注解下的方法 获取到装填码 获取到出错理由 然后渲染异常的页面 返回空的ModelAndView 1.3解析方

  • 我用Spring云溪和Kafka溪。假设我有一个处理器,它的功能是将KStream字符串转换为KStream CityProgrammes。它调用一个API来根据名称查找城市,并调用另一个转换来查找该城市附近的任何事件。 现在的问题是,任何错误发生在转换期间,整个应用程序停止。我想把一个特定的消息发送给DLQ,然后继续前进。我已经读了几天了,每个人都建议在被调用的服务中处理错误,但在我看来这是一个