当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream Responsive-如果是反应流管道,如何进行错误处理?

海保臣
2023-03-14

如何对反应流管道进行错误处理。喜欢

  • 应用程序错误处理(例如:errorChannel)
  • 系统错误处理(使用DLQ、再处理等)

当前文档仅描述了非反应性管道的错误处理。https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#_application_error_handling

通过为错误处理场景提供简单的配置,Spring cloud stream为用户提供了非常简单的操作。如果相同的错误处理用例(使用相同的配置)也适用于反应流管道,那就太好了。用例和各自的配置详细信息如下:

  • @StreamListner("errorChannel")全局错误处理的注释
  • @KafkaListener(id="bar",主题="reactive-stream-error-subject")
  • 为DLQ配置并生成错误主题的失败消息spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=truespring.cloud.stream.kafka.bindings.input.consumer.dlqName=reactive-stream-error-subject

文档中的示例可以很好地处理spring cloud stream,但同样的情况也会给反应式管道带来错误。这方面的任何指导方针都将对社区大有帮助。提前感谢!

@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {

public static void main(String[] args) {
    SpringApplication.run(ReactiveStreamSinkApplication.class, args);
}

@StreamListener
public void receive(@Input(Sink.INPUT) Flux<String> inputFlux) {
    inputFlux.subscribe(System.out::println);
    throw new RuntimeException("BOOM!");
}

@StreamListener("errorChannel")
public void error(Message<?> message) {
    // log the error msg
    System.out.println("Handling ERROR: " + message);
}

@KafkaListener(id="bar", topics = "reactive-stream-error-topic")
public void error(String in) {
    System.out.println(in + " from DLQ");
}
}

共有1个答案

魏煜祺
2023-03-14

很抱歉,回复太晚了。

首先,您的代码中有一个问题,您正在处理如何从反应流中抛出异常。基本上,您处理的是一个声明性处理程序,它的处理方式非常不同。在代码中,在启动和初始化期间,receive方法只会被调用一次。因此,从中引发异常与流处理过程中引发的异常完全不同,您所查询的错误处理机制就是为这个机制而设计的。但是

除此之外。

随着Spring Cloud Function编程模型的引入,我们正在考虑将注意力从反应式模块转移到一起,因为Spring Cloud Function已经提供了对反应式编程模型的支持。所以考虑以下几点:

@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {

    public static void main(String[] args) {
        SpringApplication.run(ReactiveStreamSinkApplication.class,
        "--spring.cloud.stream.function.definition=myconsumer");
    }

    @Bean
    public Consumer<Flux<String>> myconsumer() {
        return stream -> stream.subscribe(value -> {
            if ("foo".equals(value)) {
                throw new RuntimeException("BOOM!");
            }
            System.out.println("Received value: " + value);
        });
    }

    @StreamListener("errorChannel")
    public void error(Message<?> message) {
        // log the error msg
        System.out.println("Handling ERROR: " + message);
    }
}

试试看,让我们知道。

 类似资料:
  • 但这里我没有显式地使用Actor,尽管它们是在内部使用的。是否存在流/源/汇的生命周期事件?

  • 我正在使用Azure DevOps管道构建和部署React JS应用程序。它包含配置。js文件,其中包含一些需要针对不同环境修改的变量。任何人都可以帮我处理这个配置。部署到不同阶段时,js是否在发布管道中?

  • 我一直在反复讨论我们系统中的一个问题,即使在论坛上进行了一些研究和多次测试,我们似乎也无法解决这个问题。 我会尽量弄清楚我们在处理什么 我们有一个带有路由的主服务,该路由从activemq队列读取数据(使用嵌入式代理的spring boot)将其发送到路由(B),然后将所有内容发送到最终路由(C)。路由(B)依赖于服务。 骆驼版:3.3.0Spring靴版:2.3.3。发布 路线A: 路线B: 路

  • 我在努力理解错误处理是如何在Node.js流管道内工作的,最后我用一个简单的游乐场进行了实验,这让事情变得非常清楚。 我将此作为一个自我回答的问题发布。也许有人觉得这很有帮助:)

  • 你如何防抖React.js? 我想防抖手柄。 我试过,但它不起作用。

  • 如题,nuxt2 如何捕获url malformed错误并进行处理? 根据GPT给的答案 写中间件不行,这个错误在中间件之前就报错了