当前位置: 首页 > 知识库问答 >
问题:

Spring集成Java DSL:在出现拆分和聚合方法的错误情况后,如何继续?

程成天
2023-03-14

我的程序在高层执行以下操作

Task 1
  get the data from the System X
  the Java DSL split
    post the data to the System Y
    post the reply data to the X
  the Java DSL aggregate
Task 2
  get the data from the System X
  the Java DSL split
    post the data to the System Y
    post the reply data to the X
  the Java DSL aggregate
...

问题是,当一个数据发布到System Y子任务失败时,错误消息会正确发送回System X,但在此之后,任何其他子任务或任务都不会执行。

我的错误处理程序这样做:

...
Message<String> newMessage = MessageBuilder.withPayload("error occurred")
                .copyHeadersIfAbsent(message.getPayload().getFailedMessage().getHeaders()).build();
...
Set some extra headers etc. 
...
return newMessage;

有什么问题吗?

编辑:

我调试了Spring集成。在错误情况下,只有第一条错误消息到达方法AbstractCorrelationMessageHandler。handleMessageInternal。其他成功消息和失败消息未到达该方法。

如果没有错误,则所有消息都会发送到该方法,最后释放该组。

我的程序可能有什么问题?

编辑2:

这起作用了:

为Http添加了建议。outboundGateway:

.handle(Http.outboundGateway(...,
                    c -> c.advice(myAdvice()))

还有myAdvice bean

@Bean
private Advice myAdvice() {
    return new MyAdvice();
}

MyAdvie

public class MyAdvice<T> extends AbstractRequestHandlerAdvice {
@SuppressWarnings("unchecked")
@Override
protected Object doInvoke(final ExecutionCallback callback, final Object target, final Message<?> message)
        throws Exception {
    ...

    try {
        result = (MessageBuilder<T>) callback.execute();
    } catch (final MessageHandlingException e) {
        take the exception cause for the new payload
    }

    return new message with the old headers and replyChannel header and result.payload or the exception cause as a payload
}

}

共有1个答案

白坚壁
2023-03-14

你的程序没有问题。这正是正则循环在Java中的工作方式。要捕获每个迭代的异常并继续处理其他剩余项,您肯定需要进行一次尝试。。捕获Java循环中的。因此,类似的东西需要在这里应用于拆分器。它可以通过表达式EvaluationRequestHandlerAdvice、作为拆分器输出的ExecutorChannel或通过拆分器输出通道上的service activator进行网关调用来实现。

由于故事是关于一个聚合器的,因此您仍然需要以某种方式完成一个组,这只能通过从错误处理中发出一些错误补偿消息以返回聚合器的输入通道来完成。在这种情况下,您需要确保从抛出到错误流的MessagingException失败消息中复制请求标头。聚合组后,您需要将带有错误的消息与正常消息分开。这只能通过特殊的有效负载来完成,或者您可能只是将异常作为有效负载,以便在聚合器的最终结果中正确区分错误与正常消息。

 类似资料:
  • 我正在尝试做一个GroupBy基于共享ID的GeoJSON功能列表,以便通过使用拆分/聚合来聚合这些功能的单个字段,如下所示: 除非我取消对这三行的注释,否则聚合器永远不会发布组,数据库也不会收到任何更新。如果我将groupTimeout设置为小于5秒,则会丢失部分结果。 我预计发布策略默认为,我预计在处理完所有(拆分)功能后会自动释放所有组(REST服务消息中总共只有129个功能)。手动将其设置

  • 目前,我正在与spring integration合作开发新的应用程序,并启动了poc,以了解如何处理故障案例。在我的应用程序中,spring integration将接收来自IBM mq的消息,并根据消息类型验证头信息和到不同队列的路由。传入的消息可能是批量消息,所以我使用了spring integration的拆分器和聚合器,并且对技术工作流程有很好的进展和控制。目前我面临的问题很少,我们有I

  • 使用Spring Integr中的拆分器,我拆分了从数据库中的表中选择的数据行。每条消息完成处理后,我想像旧消息一样将每条消息聚合到一条消息中。我该怎么办?我不知道拆分器拆分了多少条消息。我只知道拆分消息头中的相关ID。即使我聚合消息,我也无法制定发布策略。 我如何解决这个问题? 以及是否有任何方法可以使用jdbc-out站网关或jdbc-out站通道适配器一次插入多行数据,而无需使用拆分器插入每

  • 我有以下XML负载,我正试图将其用于Spring集成和Spring集成AMQP: 我正在使用xpath拆分器拆分消息: 我工作正常,消息被分成3条新消息,例如使用此有效负载: 在此步骤之后,将使用此设置聚合消息: 作为最后一步,消息将使用此出站通道适配器发送到交换机: 不幸的是,出现了一些问题,因为我最终得到了这样的有效载荷。我需要它保持XML格式。

  • 我有一个基于DSL的流,它使用拆分迭代对象列表并发送Kafka消息: 在所有消息发出后,我需要调用服务,还需要记录处理了多少消息。我知道一种方法是使用publishSubscribeChannel,其中第一个subscribe执行实际的Kafka发送,然后聚合执行服务调用: 我在弄清楚如何使用DSL在pubSubChannel中实际执行部分时遇到了问题。到目前为止,我已经尝试过: 有什么指示吗?

  • 我有一个用例,我的消息被拆分两次,我想聚合所有这些消息。如何才能最好地实现这一点,我应该通过引入不同的序列头来聚合消息两次,还是有办法通过重写消息分组的方法在单个聚合步骤中聚合消息?