当前位置: 首页 > 知识库问答 >
问题:

Spring云流Kafka流-如何处理运行时异常?

连俊智
2023-03-14

我正在努力定制我的spring kafka streams应用程序。我一直试图在我的KStreams上配置处理未捕获(运行时异常)。

参考文档https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_streams_binder-应该这样做:

@Configuration
@Slf4j
public class CustomKafkaStreamsConfiguration {

    @Bean
    public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
        return factoryBean -> {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
                        log.error("An exception has occurred={}", e.getMessage()) ;
                    });

                }
            });
        };
    }

}

后来我有一个KStream

    @Bean
    public Function<KStream<String, Transaction>,
            KStream<String, Transaction>> paymentExecution() {
        return stream -> stream
                .peek((k, v) -> {
                    if (v.getStatus().equals(PaymentStatus.UNKNOWN)) {
                        throw new IllegalStateException();
                    }
                });
    }

如果我发送状态未知的事务,StreamThread由于IllegalStateException而死亡,那么我的KStream不再使用任何不一致的请求。

Exception in thread "payment-d4b6ddd2-40ab-4eeb-afe4-e7fc3caa2b9c-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=payment-request, partition=0, offset=13, stacktrace=java.lang.IllegalStateException
    at payment.process.PaymentExecutionRequestProcessor.lambda$paymentExecution$4(PaymentExecutionRequestProcessor.java:48)
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)

    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
Caused by: java.lang.IllegalStateException
    at payment.process.PaymentExecutionRequestProcessor.lambda$paymentExecution$4(PaymentExecutionOrchestratorProcessor.java:48)
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)

有什么让我怀念的吗?或者也许有更合适的方法来处理在处理流时出现的运行时异常?

共有1个答案

杜苏燕
2023-03-14

如果应用程序到达uncaughtexcpetionhandler,那么流线程已经停止,太晚而无法恢复。它不会继续处理。它提供给您在退出应用程序之前采取任何优雅的关闭措施。您需要重新启动应用程序才能继续处理。您可能会考虑使用类似Kafka Streams中的分支特性来忽略状态未知的记录,如下所示(我没有测试过这个,但应该可以工作)。其思想是为传入的kstream创建一个分支,以筛选出任何具有unknown状态的记录,而不是抛出异常。它仍然可以记录未知的,并提供任何额外的逻辑等。

@Bean
    public Function<KStream<String, Transaction>,
            KStream<String, Transaction>[]> paymentExecution() {
        return stream -> stream
                .branch( (k, v) - !v.getStatus().equals(PaymentStatus.UNKNOWN));
    }
 类似资料:
  • 我正在开发一个应用程序,在该应用程序中,事件会导致spring data repository保存数据; 此代码可以引发各种异常,如DataIntegrityViolationException(运行时异常)。 处理此类异常和 生成带有导致此错误的有效负载的消息 例外, 允许生产者采取操作。

  • 曾发表过多篇文章,但大多数都与处理错误消息有关,而不是处理过程中的异常处理。 我想知道如何处理流应用程序接收到的消息,并且在处理消息时出现异常?异常可能是由于多种原因造成的,如网络故障、RuntimeException等。, 有人能提出正确的方法吗?我应该使用setUncaughtExceptionHandler吗?还是有更好的方法

  • 问题内容: 曾经经历过多个帖子,但是其中大多数都是相关的处理错误消息,与处理它们时的异常处理无关。 我想知道如何处理流应用程序收到的消息,并且在处理消息时出现异常?该异常可能是由于多种原因造成的,例如网络故障,RuntimeException等, 有人可以建议正确的做法吗?我应该使用 吗?或者,还有更好的方法? 如何处理重试? 问题答案: 这取决于您要如何处理生产者方面的异常。如果将对生产者抛出异

  • 我使用Spring的@ControllerAdvice和@ExceptionHandler进行异常处理。任何方法都会从Controller中抛出自定义异常,并由相应的@ExceptionHandler处理它。如果发生运行时异常(例如任何HibernateException),那么它将抛出运行时异常,并且我没有任何@ExceptionHandler用于运行时异常。

  • 我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。 应用亚马尔 我的目标就是创造价值。 依赖性-2.2.6。释放

  • 一段时间以来,我一直试图让Spring Cloud Stream与Kafka Streams一起使用,我的项目使用嵌入式kafka进行Kafka DSL测试,我使用这个存储库作为我的测试实现的基础(它本身就是这个问题的测试用例)。 我在这里制作了一个存储库来演示这一点。 基本上,当使用“Processor.class”的“DemoApplicationTest.ExampleAppWorking.