当前位置: 首页 > 知识库问答 >
问题:

Spring集成WebFlow异常处理

胡光霁
2023-03-14

如果spring集成webflux流中发生异常,则异常本身(带有stacktrace)通过MessagePublishingErrorHandler作为有效负载发送回调用方,该处理器使用来自“errorChannel”头的错误通道,而不是默认错误通道。

如何设置类似于WebExceptionHandler的错误处理程序?我想生成一个Http状态代码,并可能生成一个DefaultErrorAttributes对象作为响应。

简单地定义从errorChannel开始的流不起作用,错误消息不会在那里结束。我试图定义我自己的FluxErrorChannel,但它似乎也没有用作错误通道,错误不会在我的errorFlow中结束:

@Bean
public IntegrationFlow fooRestFlow() {
    return IntegrationFlows.from(
            WebFlux.inboundGateway("/foo")
                    .requestMapping(r -> r.methods(HttpMethod.POST))
                    .requestPayloadType(Map.class)
                    .errorChannel(fluxErrorChannel()))
            .channel(bazFlow().getInputChannel())
            .get();
}

@Bean
public MessageChannel fluxErrorChannel() {
    return MessageChannels.flux().get();
}

@Bean
public IntegrationFlow errorFlow() {
    return IntegrationFlows.from(fluxErrorChannel())
            .transform(source -> source)
            .enrichHeaders(h -> h.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_GATEWAY))
            .get();
}

@Bean
public IntegrationFlow bazFlow() {
    return f -> f.split(Map.class, map -> map.get("items"))
            .channel(MessageChannels.flux())
            .<String>handle((p, h) -> throw new RuntimeException())
            .aggregate();
}

更新

在MessagingGatewaySupport中。doSendAndReceiveMessageReactiveWebFlux上定义的我的错误通道。inboundGateway从不用于设置错误通道,相反,错误通道始终是在此处创建的回复通道:

FutureReplyChannel replyChannel = new FutureReplyChannel();

Message<?> requestMessage = MutableMessageBuilder.fromMessage(message)
    .setReplyChannel(replyChannel)
    .setHeader(this.messagingTemplate.getSendTimeoutHeader(), null)
    .setHeader(this.messagingTemplate.getReceiveTimeoutHeader(), null)
    .setErrorChannel(replyChannel)
    .build();

错误通道最终会被重置为Mono.fromFuture中的原始ErrorChannelHandler,但在我的例子中,错误通道为 onErrorResumelambda:

return Mono.fromFuture(replyChannel.messageFuture)
    .doOnSubscribe(s -> {
        if (!error && this.countsEnabled) {
            this.messageCount.incrementAndGet();
        }
    })
    .<Message<?>>map(replyMessage ->
        MessageBuilder.fromMessage(replyMessage)
            .setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
            .setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
            .build())
    .onErrorResume(t -> error ? Mono.error(t) : handleSendError(requestMessage, t));

这是如何运作的?

共有1个答案

郭俊拔
2023-03-14

这是一个错误;由错误处理程序为异常创建的错误消息被发送到错误通道头(必须是回复通道,以便网关获得结果)。然后,网关应该调用错误流(如果存在)并返回结果。

https://jira.spring.io/browse/INT-4541

 类似资料:
  • 我一直在研究Spring集成文件支持,在这里我需要将文件从输入目录移动到输出目录。成功地将其移动到输出目录后,应该将其归档,然后从输入目录中删除。我正在使用下面的配置来实现这一点。 请纠正我,如果我的配置可以更好的方式。 Spring集成配置: 我在服务激活器中有代码,如果归档目录不可用,它应该终止应用程序。因此,在运行应用程序时,在第一次移动文件后,我已经删除了归档目录。然后,我将新文件放置在i

  • 在Spring integration中,我必须处理动态通道创建,但当我调试应用程序时,我看到不同通道之间的“阻塞”问题。 我知道是一个公共通道,在父上下文中共享,但如何为每个子上下文开发一个完整的独立场景?。公共网关是问题所在吗? 我在Spring integration flow async中看到了post错误处理,但对于每个子级,我都有一个完整的分离环境,我希望利用这些动态分离的优势。这可能

  • 我有一个http inboundgateway,并定义了一个通往网关的错误通道。 异常消息StackTrace: 我在这里做错了什么?

  • 我们使用的是Spring集成4.1.3。 使用tcp出站网关实现客户端。 请求期间从服务器收到一个tcp rset数据包,发生异常。什么原因?谢谢。 错误日志 wireshark日志在此处输入图像描述

  • 我正在使用下面的测试用例运行这个程序: 我得到这个错误: 当我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和StachOverflow的建议,所有这些都导致了不同的其他错误,我似乎从主要问题上转移了注意力。 最常见的建议是添加一个全局轮询器,但这会导致以下错误: 原因:java.lang.IllegalArgumentException:不应为终结点“org.SpringFramewo

  • 我阅读了Spring集成指南和这里的示例,获得了Spring集成SFTP程序的工作示例。我已经有了一个可以工作的Spring批处理程序,它可以读取大量文件并转储到数据库中。 我现在正试图通过查看Spring文档来集成Spring批处理和Spring集成程序,我创建了以下配置。 我使用下面的测试用例运行这个程序: 我得到了这个错误: 在我输入这个问题时,我尝试了谷歌关于这个问题的大多数顶级答案和St