当前位置: 首页 > 知识库问答 >
问题:

Spring云Kafka Stream StreamsUncaughtExceptionHandler

姬锐
2023-03-14

我正在尝试将StreamsUncaughtExceptionHandler添加到我的Kafka流处理器。该处理器使用Kafka函数编写。我查看了Artem Bilan提供的将StreamsUncaughtExceptionHandler包括到我的服务中的建议,但我的异常从未被它捕获/处理。

配置Bean:

@Autowired
UnCaughtExceptionHandler exceptionHandler;

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
    return new StreamsBuilderFactoryBeanConfigurer() {

        @Override
        public void configure(StreamsBuilderFactoryBean factoryBean) {;
            factoryBean.setStreamsUncaughtExceptionHandler(exceptionHandler);
        }

        @Override
        public int getOrder() {
            return Integer.MAX_VALUE;
        }

    };
}

自定义异常处理程序:

    @Component
public class UnCaughtExceptionHandler implements StreamsUncaughtExceptionHandler {

  @Autowired
  private StreamBridge streamBridge;

  @Override
  public StreamThreadExceptionResponse handle(Throwable exception) {
    return StreamThreadExceptionResponse.REPLACE_THREAD;
  }
}

流处理功能:

@Autowired
private MyService service;

@Bean
public Function<KStream<String, Input>, KStream<String, Output>> processor() {
    final AtomicReference<KeyValue<String, Output>> result = new AtomicReference<>(null);
    return kStream -> kStream
            .filter((key, value) -> value != null)
            .filter((key, value) -> {
                Optional<Output> outputResult = service.process(value);
                if (outputResult.isPresent()) {
                    result.set(new KeyValue<>(key, outputResult.get()));
                    return true;
                }
                return false;
            })
        .map((messageKey, messageValue) -> result.get());
}

我希望UnCaughtExceptionHandler处理service.process()方法引发的任何异常。但是异常永远不会进入句柄方法;相反,它们会传播到根并终止客户端。我也看了这个解决方案,但我想以更独立的方式处理它。

问题:如何使用StreamsUncaughtExceptionHandler处理任何处理异常?

  • Spring启动版本:2.6.3
  • Spring-Cloud-stream版本:3.2.1
  • 春-云-流-绑定器-Kafka-流:3.2.1
  • Kafka流:3.0.0

可复制的示例:spring cloud kafka streams异常

共有1个答案

曾实
2023-03-14

以下是一些你可以尝试的事情。

>

我注意到您为配置的impl中的顺序设置了intger.MAX_VALUE。默认情况下,StreamsBuilderFactoryBean使用的相位值为intger.MAX_VALUE-1000,因此在工厂bean准备好启动时,配置程序可能还不可用,因为intger.MAX_VALUE的优先级较低。您可以将订单更改为intger.MAX_VALUE-5000,以确保在工厂bean启动之前配置bean已完全实例化。

从这些选项开始,看看它们是否给出了该问题的任何指示。如果它仍然存在,请随时与我们分享一个可重复的小样本应用程序。

 类似资料:
  • 我有三个应用程序,一个是spring云配置服务器点,另一个是spring云配置客户endpoint。我在彼此中配置了spring云总线,以使客户endpoint自动刷新发生更改,但在我运行的应用程序中,出现了如下异常 这是我的spring云服务器配置 这是我的spring云配置服务器pom 这是我的spring云配置客户端配置 这是我的spring云配置客户端pom 此异常不会影响操作的应用程序,

  • 我无法使用功能供应商发送Avro消息。SCSt尝试将消息作为JSON发送,但失败。有人能指出是否需要任何其他配置吗? 这是供应商的功能bean 和配置

  • 我有一个带有一些endpoint的anexo API,比如: 如何将Spring Cloud Gateway与这些endpoint一起使用?

  • 到目前为止,我已经发现,通过在contract中定义“TriggeredBy”,生成的测试将调用那里提供的方法,因此我们将需要提供该方法在TestBase中执行的操作的实际实现。另一个是“outputMessage”,在这里我们可以验证之前的方法调用是否正确地生成了发送到某个交换的消息体。 资料来源:文件和样本 我的问题是,有没有办法从契约中产生输入消息,而不是触发自己的自定义方法?文档中的Spr

  • Spring云契约可以用来测试运行Spring-WSendpoint的Spring Boot服务吗?我希望能够使用Groovy DSL定义SOAP请求/响应,但我还不能让这些服务与Spring Cloud Contract一起工作。当我尝试运行这些测试时,我总是得到一个失败(预期是200,但收到的是404)。有趣的是,我将@RestController和@RequestMapping注释添加到我的

  • 当我使用spring cloud gateway集成spring cloud sleuth时,我发现性能比单独使用spring cloud gateway慢得多。是否有优化方案? 机器配置:6芯,16g Spring云网关:5331.9 tps Spring云网关Spring云侦探:4119.47 tps “Spring云网关”比“Spring云网关Spring云侦探”慢约1000-2000tps