场景可能是:我的期望可能是批量10个数据点,我想对{failed 5,pass 5}或其他什么给出响应。
我的逻辑是将批处理拆分为数据元素并进行验证
成功的验证将发送给aggreagtor,
失败的验证将抛出错误并通过错误通道拾取。
收件人列表路由器将错误通道作为输入通道,并连接2个过滤器,目的是过滤某些类型的错误直接发送响应(与用户输入无关的信息-服务器错误等),某些类型的客户端错误将转到聚合器构建响应。
逻辑有问题吗?我的问题是我一直得到"回复消息收到,但接收线程已经收到回复"当建立使用服务激活器后聚合的结果。
在“错误过滤器”连接到replyChannel后,我只检查了此服务激活器和服务器错误分支的集成工作流程(但从未调用句柄)
有什么不对吗?顺便说一句,收件人列表路由器或其他类型的endpoint可以连接到错误通道吗?或者它必须是服务激活器,就像我在网上看到的所有例子一样?但他们真的很简单的例子...(
示例XML
<int:gateway id="myGateway" service-interface="someGateway" default-request-channel="splitChannel" error-channel="errorChannel" default-reply-channel="replyChannel" async-executor="MyThreadPoolTaskExecutor"/>
<int:splitter input-channel="splitChannel" output-channel="transformChannel" method="split">
<bean class="Splitter" />
</int:splitter>
<int:transformer id="transformer" input-channel="transformChannel" method="transform" output-channel="aggregateChannel">
<bean class="Transformer"/> // this may throw the validation error (filter_ErrorType_1), if it cannot transform
</int:transformer>
<int:aggregator id="aggregator"
input-channel="aggregateChannel"
output-channel="createAnswerChannel"
method="aggregate">
<bean class="MyAggregator" />
</int:aggregator>
<int:recipient-list-router id="myErrorRouter" input-channel="errorChannel">
<int:recipient channel="filter_ErrorType_1"/>
<int:recipient channel="filter_ErrorType_2"/>
<int:recipient channel="filter_ErrorType_3"/>
</int:recipient-list-router>
<int:filter input-channel="filter_ErrorType_1" output-channel="aggregateChannel" method="accept"></int:filter>
<int:filter input-channel="filter_ErrorType_2" output-channel="createErrorAnswerChannel" method="accept"></int:filter>
<int:filter input-channel="filter_ErrorType_3" output-channel="createErrorAnswerChannel" method="accept"></int:filter>
<int:service-activator input-channel='createErrorAnswerChannel' output-channel="replyChannel" method='buildError'>
<bean class="AnswerBuilder"/>
</int:service-activator>
<int:service-activator input-channel='createAnswerChannel' output-channel="replyChannel" method='build'>
<bean class="AnswerBuilder"/>
</int:service-activator>
跟进:
<int:gateway id="myGateway" service-interface="someGateway" default-request-channel="splitChannel" error-channel="errorChannel" default-reply-channel="replyChannel" async-executor="MyThreadPoolTaskExecutor"/>
<int:splitter input-channel="splitChannel" output-channel="transformChannel" method="split">
<bean class="Splitter" />
</int:splitter>
<int:transformer id="transformer1" input-channel="toTransformer1" method="transform" output-channel="toTransformer2">
<bean class="Transformer1"/> // this may throw the validation error (filter_ErrorType_1), if it cannot transform
</int:transformer>
<int:transformer id="transformer2" input-channel="toTransformer2" method="transform" output-channel="toTransformer3">
<bean class="Transformer2"/> // this may throw the validation error (filter_ErrorType_2), if it cannot transform
</int:transformer>
<int:transformer id="transformer3" input-channel="toTransformer3" method="transform" output-channel="aggregateChannel">
<bean class="Transformer3"/> // this may throw the validation error (filter_ErrorType_3), if it cannot transform
</int:transformer>
???
// seems like you are proposing to have one gateway for each endpoint that may throw error.
// but in this case, take transfomer 1 for example, I cannot output the gateway directly to aggregate channel since for valid data it has to go to transformer 2
// but the failed message throwed by the error handler cannot pass transformer 2 because of afterall this is a error message not a valid data for transformer 2
// <int:service-activator input-channel="toTransformer1" output-channel="toTransformer2" ref="gateway1"/>
// <int:gateway id="gateway1" default-request-channel="toTransformer1" error-channel="errorChannel1"/>
// <int:transformer id="transformer" input-channel="toTransformer1" method="transform">
// <bean class="Transformer"/> // this may throw the validation error (filter_ErrorType_1), if it cannot transform
// </int:transformer>
// how to deal with this problem?
<int:service-activator input-channel='createErrorAnswerChannel' output-channel="replyChannel" method='buildError'>
<bean class="AnswerBuilder"/>
</int:service-activator>
<int:service-activator input-channel='createAnswerChannel' output-channel="replyChannel" method='build'>
<bean class="AnswerBuilder"/>
</int:service-activator>
我实际上有复杂的逻辑,但我用transformer 123来表示。
你的问题不清楚。请务必在将来提供更具体的信息。一些配置和堆栈跟踪或日志也很有用。
我猜你已经在流的开始
但我不能确定,因为你的问题中没有这些词。正当
您不能依赖那里的
errorChannel
头,最终返回reply
,因为网关情况下的errorChannel
头与replyChannel
相同,它是TemporaryReplyChannel
-一次性使用通道,仅用于receive()
操作。
由于我们没有您的任何配置或代码,我们无法正确地帮助您。
我想你需要一个中间流
网关
通过服务激活器
:
<service-activator id="gatewayTestService" input-channel="inputChannel"
output-channel="outputChannel" ref="gateway"/>
<gateway id="gateway" default-request-channel="requestChannel" error-channel="myErrorChannel"/>
或
编辑
errorChannel
是默认全局bean的名称,用于捕获来自任何集成位置的任何错误消息。在http://docs.spring.io/spring-integration/reference/html/configuration.html#namespace-errorhandler.中查看更多信息
因此,这个名称对您的用例来说是完全不好的,因为
myErrorOuter
将处理所有错误!
在
我说的是
<int:service-activator input-channel="transformChannel"
output-channel="aggregateChannel" ref="gateway"/>
<int:gateway id="gateway" default-request-channel="transformChannel" error-channel="validationErrorChannel"/>
<int:transformer id="transformer" input-channel="transformChannel" method="transform">
<bean class="Transformer"/> // this may throw the validation error (filter_ErrorType_1), if it cannot transform
</int:transformer>
因此,
拆分器
将项目发送到服务激活器
。通过自定义的错误通道
将消息发送到网关
周围的变压器
。transformer
对replyChannel
的应答与前面的网关的应答完全一致。如果它抛出一些异常,则由
validationErrorChannel
进程处理。它应该回复一些补偿信息。该消息被冒泡发送到服务激活器
。最后,service activator
将结果发送到aggregateChannel
。如果验证良好与否,服务激活器
的黑匣子。
希望这有点帮助。
编辑2
我很失望,您没有接受我在代码中的建议,但这就是我的看法:
<int:gateway id="myGateway" service-interface="someGateway" default-request-channel="splitChannel"
async-executor="MyThreadPoolTaskExecutor" />
<int:splitter input-channel="splitChannel" output-channel="transformChannel" method="split">
<bean class="Splitter" />
</int:splitter>
<int:service-activator input-channel="validateChannel" output-channel="aggregateChannel"
ref="validateGateway"/>
<gateway id="validateGateway" default-request-channel="toTransformer1" error-channel="myErrorChannel"/>
<chain input-channel="toTransformer1">
<int:transformer method="transform">
<bean class="Transformer1" />
</int:transformer>
<int:transformer method="transform">
<bean class="Transformer2" />
</int:transformer>
<int:transformer method="transform">
<bean class="Transformer3" />
</int:transformer>
</chain>
<int:service-activator input-channel="myErrorChannel" method="buildError">
<bean class="AnswerBuilder" />
</int:service-activator>
<int:aggregator id="aggregator"
input-channel="aggregateChannel"
output-channel="createAnswerChannel"
method="aggregate">
<bean class="MyAggregator" />
</int:aggregator>
<int:service-activator input-channel='createAnswerChannel' method='build'>
<bean class="AnswerBuilder" />
</int:service-activator>
注意,我是如何链接变形金刚的。因此,您的所有变压器都有一个网关,其中任何一个变压器上的任何错误都将被抛出到
myErrorChannel
上的网关进行错误处理。
我目前在Spring集成中处理JMS事务时遇到困难。我正在创建的集成流程如下所示: JMS队列A- 我希望在JMS队列B和JMS队列C上保证消息的传递。然而,为了使传递稍微困难一些,我希望将导致错误的消息存储在单独的JMQ队列上,并在队列a上确认消息。 但是,如果我对此进行测试并在队列C上设置消息之前抛出错误(让我们假设队列B首先完成,队列C其次完成),事务将确认队列A并在队列B和错误队列上提交消
我有三种不同的系统。我使用Spring integration来同步所有这些系统中的数据。 系统2将调用服务方法来持久化数据,如果请求有效,则返回响应,否则抛出异常 我需要发送服务方法响应到系统1和系统3,只有当操作成功。调用服务方法后,根据服务方法响应,使用Transformer生成对系统3的请求。在transformer之后,我将请求放入mq队列。 更新的JMS出站代码 如果服务类失败,我需要
在Spring integration中,我必须处理动态通道创建,但当我调试应用程序时,我看到不同通道之间的“阻塞”问题。 我知道是一个公共通道,在父上下文中共享,但如何为每个子上下文开发一个完整的独立场景?。公共网关是问题所在吗? 我在Spring integration flow async中看到了post错误处理,但对于每个子级,我都有一个完整的分离环境,我希望利用这些动态分离的优势。这可能
我已经使用最新的可用版本建立了一个新的Spring Boot Spring Integration Spring Integration Java DSL项目。项目构建正常,但当我运行应用程序时,我得到: 当前使用的依赖项如下: 错误可能是由于jar版本的错误组合吗?我不确定如何调试此错误。
例如,在我的用例中,我有一个(大小为100以限制输入)供调用方发送事件,轮询器将轮询队列并发送到以进行异步处理(假设处理是CPU密集型的,我们将任务执行器的池大小限制在2)。异步处理的结果将发回给调用方。此外,如果有异常抛出异步处理。原始调用方将处理异常,而不是让全局错误处理程序来处理它。 我不知道如何在Spring Integration Java DSL中指定一个只有调用方才能看到从抛出的错误
问题内容: 我具有以下Spring Integration配置,该配置允许我从MVC Controller调用网关方法并让控制器返回,而集成流将在不阻塞控制器的单独线程中继续进行。 但是,我无法弄清楚如何使我的错误处理程序为该异步流工作。我的网关定义了错误通道,但是由于某种原因我的异常没有到达。相反,我看到被调用了。 网关: 为了查看我的错误处理程序正在处理的异步集成流程中发生的异常,我该怎么办?