当前位置: 首页 > 知识库问答 >
问题:

Spring集成错误通道,错误处理

蔡楚
2023-03-14

场景可能是:我的期望可能是批量10个数据点,我想对{failed 5,pass 5}或其他什么给出响应。

我的逻辑是将批处理拆分为数据元素并进行验证
成功的验证将发送给aggreagtor,
失败的验证将抛出错误并通过错误通道拾取。

收件人列表路由器将错误通道作为输入通道,并连接2个过滤器,目的是过滤某些类型的错误直接发送响应(与用户输入无关的信息-服务器错误等),某些类型的客户端错误将转到聚合器构建响应。

逻辑有问题吗?我的问题是我一直得到"回复消息收到,但接收线程已经收到回复"当建立使用服务激活器后聚合的结果。

在“错误过滤器”连接到replyChannel后,我只检查了此服务激活器和服务器错误分支的集成工作流程(但从未调用句柄)

有什么不对吗?顺便说一句,收件人列表路由器或其他类型的endpoint可以连接到错误通道吗?或者它必须是服务激活器,就像我在网上看到的所有例子一样?但他们真的很简单的例子...(

示例XML

<int:gateway id="myGateway" service-interface="someGateway" default-request-channel="splitChannel" error-channel="errorChannel"  default-reply-channel="replyChannel" async-executor="MyThreadPoolTaskExecutor"/>

<int:splitter input-channel="splitChannel" output-channel="transformChannel" method="split">
    <bean class="Splitter" />
</int:splitter>

<int:transformer id="transformer" input-channel="transformChannel" method="transform" output-channel="aggregateChannel">
    <bean class="Transformer"/>  // this may throw the validation error (filter_ErrorType_1), if it cannot transform
</int:transformer>

<int:aggregator id="aggregator"
    input-channel="aggregateChannel"
    output-channel="createAnswerChannel"
    method="aggregate">
    <bean class="MyAggregator" />
</int:aggregator>

<int:recipient-list-router id="myErrorRouter" input-channel="errorChannel">
    <int:recipient channel="filter_ErrorType_1"/> 
    <int:recipient channel="filter_ErrorType_2"/> 
    <int:recipient channel="filter_ErrorType_3"/> 
</int:recipient-list-router>

<int:filter input-channel="filter_ErrorType_1" output-channel="aggregateChannel" method="accept"></int:filter>
<int:filter input-channel="filter_ErrorType_2" output-channel="createErrorAnswerChannel" method="accept"></int:filter>
<int:filter input-channel="filter_ErrorType_3" output-channel="createErrorAnswerChannel" method="accept"></int:filter>

<int:service-activator input-channel='createErrorAnswerChannel' output-channel="replyChannel" method='buildError'>
    <bean class="AnswerBuilder"/>
</int:service-activator>

<int:service-activator input-channel='createAnswerChannel' output-channel="replyChannel" method='build'>
    <bean class="AnswerBuilder"/>
</int:service-activator>

跟进:

<int:gateway id="myGateway" service-interface="someGateway" default-request-channel="splitChannel" error-channel="errorChannel"  default-reply-channel="replyChannel" async-executor="MyThreadPoolTaskExecutor"/>

<int:splitter input-channel="splitChannel" output-channel="transformChannel" method="split">
    <bean class="Splitter" />
</int:splitter>

<int:transformer id="transformer1" input-channel="toTransformer1" method="transform" output-channel="toTransformer2">
    <bean class="Transformer1"/>  // this may throw the validation error (filter_ErrorType_1), if it cannot transform
</int:transformer>

<int:transformer id="transformer2" input-channel="toTransformer2" method="transform" output-channel="toTransformer3">
    <bean class="Transformer2"/>  // this may throw the validation error (filter_ErrorType_2), if it cannot transform
</int:transformer>

<int:transformer id="transformer3" input-channel="toTransformer3" method="transform" output-channel="aggregateChannel">
    <bean class="Transformer3"/>  // this may throw the validation error (filter_ErrorType_3), if it cannot transform
</int:transformer>

???
// seems like you are proposing to have one gateway for each endpoint that may throw error.
// but in this case, take transfomer 1 for example, I cannot output the gateway directly to aggregate channel since for valid data it has to go to transformer 2
// but the failed message throwed by the error handler cannot pass transformer 2 because of afterall this is a error message not a valid data for transformer 2
// <int:service-activator input-channel="toTransformer1" output-channel="toTransformer2" ref="gateway1"/>

// <int:gateway id="gateway1" default-request-channel="toTransformer1" error-channel="errorChannel1"/>

// <int:transformer id="transformer" input-channel="toTransformer1" method="transform">
//     <bean class="Transformer"/>  // this may throw the validation error (filter_ErrorType_1), if it cannot transform
// </int:transformer>

// how to deal with this problem?




<int:service-activator input-channel='createErrorAnswerChannel' output-channel="replyChannel" method='buildError'>
    <bean class="AnswerBuilder"/>
</int:service-activator>

<int:service-activator input-channel='createAnswerChannel' output-channel="replyChannel" method='build'>
    <bean class="AnswerBuilder"/>
</int:service-activator>

我实际上有复杂的逻辑,但我用transformer 123来表示。

共有1个答案

严修德
2023-03-14

你的问题不清楚。请务必在将来提供更具体的信息。一些配置和堆栈跟踪或日志也很有用。

我猜你已经在流的开始

但我不能确定,因为你的问题中没有这些词。正当

您不能依赖那里的errorChannel头,最终返回reply,因为网关情况下的errorChannel头与replyChannel相同,它是TemporaryReplyChannel-一次性使用通道,仅用于receive()操作。

由于我们没有您的任何配置或代码,我们无法正确地帮助您。

我想你需要一个中间流网关通过服务激活器

<service-activator id="gatewayTestService" input-channel="inputChannel" 
            output-channel="outputChannel" ref="gateway"/>

<gateway id="gateway" default-request-channel="requestChannel" error-channel="myErrorChannel"/>

编辑

  1. errorChannel是默认全局bean的名称,用于捕获来自任何集成位置的任何错误消息。在http://docs.spring.io/spring-integration/reference/html/configuration.html#namespace-errorhandler.中查看更多信息

因此,这个名称对您的用例来说是完全不好的,因为myErrorOuter将处理所有错误!

我说的是

<int:service-activator input-channel="transformChannel" 
            output-channel="aggregateChannel" ref="gateway"/>

<int:gateway id="gateway" default-request-channel="transformChannel" error-channel="validationErrorChannel"/>

<int:transformer id="transformer" input-channel="transformChannel" method="transform">
    <bean class="Transformer"/>  // this may throw the validation error (filter_ErrorType_1), if it cannot transform
</int:transformer>

因此,拆分器将项目发送到服务激活器。通过自定义的错误通道将消息发送到网关周围的变压器transformerreplyChannel的应答与前面的网关的应答完全一致。如果它抛出一些异常,则由validationErrorChannel进程处理。它应该回复一些补偿信息。该消息被冒泡发送到服务激活器。最后,service activator将结果发送到aggregateChannel。如果验证良好与否,服务激活器的黑匣子。

希望这有点帮助。

编辑2

我很失望,您没有接受我在代码中的建议,但这就是我的看法:

<int:gateway id="myGateway" service-interface="someGateway" default-request-channel="splitChannel"
             async-executor="MyThreadPoolTaskExecutor" />

<int:splitter input-channel="splitChannel" output-channel="transformChannel" method="split">
    <bean class="Splitter" />
</int:splitter>

<int:service-activator input-channel="validateChannel" output-channel="aggregateChannel"
      ref="validateGateway"/>

<gateway id="validateGateway" default-request-channel="toTransformer1" error-channel="myErrorChannel"/>

<chain input-channel="toTransformer1">
    <int:transformer method="transform">
        <bean class="Transformer1" />
    </int:transformer>
    <int:transformer method="transform">
        <bean class="Transformer2" />
    </int:transformer>
    <int:transformer method="transform">
        <bean class="Transformer3" />
    </int:transformer>
</chain>


<int:service-activator input-channel="myErrorChannel" method="buildError">
    <bean class="AnswerBuilder" />
</int:service-activator>

<int:aggregator id="aggregator"
                input-channel="aggregateChannel"
                output-channel="createAnswerChannel"
                method="aggregate">
    <bean class="MyAggregator" />
</int:aggregator>

<int:service-activator input-channel='createAnswerChannel' method='build'>
    <bean class="AnswerBuilder" />
</int:service-activator>

注意,我是如何链接变形金刚的。因此,您的所有变压器都有一个网关,其中任何一个变压器上的任何错误都将被抛出到myErrorChannel上的网关进行错误处理。

 类似资料:
  • 我目前在Spring集成中处理JMS事务时遇到困难。我正在创建的集成流程如下所示: JMS队列A- 我希望在JMS队列B和JMS队列C上保证消息的传递。然而,为了使传递稍微困难一些,我希望将导致错误的消息存储在单独的JMQ队列上,并在队列a上确认消息。 但是,如果我对此进行测试并在队列C上设置消息之前抛出错误(让我们假设队列B首先完成,队列C其次完成),事务将确认队列A并在队列B和错误队列上提交消

  • 我有三种不同的系统。我使用Spring integration来同步所有这些系统中的数据。 系统2将调用服务方法来持久化数据,如果请求有效,则返回响应,否则抛出异常 我需要发送服务方法响应到系统1和系统3,只有当操作成功。调用服务方法后,根据服务方法响应,使用Transformer生成对系统3的请求。在transformer之后,我将请求放入mq队列。 更新的JMS出站代码 如果服务类失败,我需要

  • 在Spring integration中,我必须处理动态通道创建,但当我调试应用程序时,我看到不同通道之间的“阻塞”问题。 我知道是一个公共通道,在父上下文中共享,但如何为每个子上下文开发一个完整的独立场景?。公共网关是问题所在吗? 我在Spring integration flow async中看到了post错误处理,但对于每个子级,我都有一个完整的分离环境,我希望利用这些动态分离的优势。这可能

  • 我已经使用最新的可用版本建立了一个新的Spring Boot Spring Integration Spring Integration Java DSL项目。项目构建正常,但当我运行应用程序时,我得到: 当前使用的依赖项如下: 错误可能是由于jar版本的错误组合吗?我不确定如何调试此错误。

  • 例如,在我的用例中,我有一个(大小为100以限制输入)供调用方发送事件,轮询器将轮询队列并发送到以进行异步处理(假设处理是CPU密集型的,我们将任务执行器的池大小限制在2)。异步处理的结果将发回给调用方。此外,如果有异常抛出异步处理。原始调用方将处理异常,而不是让全局错误处理程序来处理它。 我不知道如何在Spring Integration Java DSL中指定一个只有调用方才能看到从抛出的错误

  • 问题内容: 我具有以下Spring Integration配置,该配置允许我从MVC Controller调用网关方法并让控制器返回,而集成流将在不阻塞控制器的单独线程中继续进行。 但是,我无法弄清楚如何使我的错误处理程序为该异步流工作。我的网关定义了错误通道,但是由于某种原因我的异常没有到达。相反,我看到被调用了。 网关: 为了查看我的错误处理程序正在处理的异步集成流程中发生的异常,我该怎么办?