当前位置: 首页 > 知识库问答 >
问题:

Spring集成JMS和JTA在消息到达错误通道时回滚

边永贞
2023-03-14

我正在通过绑定到不同Webshpere MQ的入站和出站原子和JMS使用带有JTA支持的Spring集成。html" target="_blank">流程如下:

  • JMS入站通道适配器收到消息
  • 一些转变
  • 输出队列的JMS出站通道适配器
  • 发生错误时,errorChannel收到消息
  • 异常类型路由器将未处理的错误路由到自定义重新抛出服务,并将处理的错误路由到接收者列表路由器,该路由器将它们发送到2个错误队列

我的问题是,即使消息到达errorChannel下游(在已处理的异常情况和错误队列中),我也希望提交事务。据我所知,只有当异常被重新处理(这就是为什么我重新处理未处理的异常)时,才会发生回滚,但在我的情况下,事务在消息到达errorChannel时(在它被路由到其他地方之前)立即回滚。

我做错了什么?

配置如下。

<jms:inbound-channel-adapter id="jms-in"
                             destination="input-queue"
                             connection-factory="inConnectionFactory"
                             channel="edi-inbound"
                             acknowledge="transacted">
    <poller max-messages-per-poll="${process.jms.inbound.poll.messages-per-poll:1}"
            fixed-rate="${process.jms.inbound.poll.rate-millis:60000}"
            >
        <transactional timeout="${process.tx.timeout-sec:60}"/>
    </poller>
</jms:inbound-channel-adapter>
<channel id="edi-inbound"/>

<chain input-channel="edi-inbound" output-channel="edi-transformation-chain">
    <object-to-string-transformer/>
    <service-activator ref="inbound" method="service"/>
</chain>

<!-- edifact transformation flow -->
<chain input-channel="edi-transformation-chain" output-channel="outbound-message-compose">
    <transformer ref="edi2xml-converter"/>
    <transformer ref="xml-mapper"/>
</chain>




<chain input-channel="outbound-message-compose" output-channel="outbound-channel">
    <service-activator ref="outbound-message-composer" />
</chain>

<channel id="outbound-channel">
    <interceptors>
        <beans:ref bean="outbound-interceptor" />
    </interceptors>
</channel>

<recipient-list-router input-channel="outbound-channel">
    <recipient channel="file-outbound"/>
    <recipient channel="queue-outbound"/>
</recipient-list-router>



<channel id="queue-outbound"/>
<jms:outbound-channel-adapter id="jms-out" destination="output-queue" channel="queue-outbound" connection-factory="outConnectionFactory"/>



<channel id="file-outbound"/>
<file:outbound-channel-adapter id="file-outbound"
                                   directory="${output.directory}"
                                   filename-generator-expression="headers['${application.header.key.messageid}'] + '_' + new java.util.Date().getTime() + '.xml'"
                                   delete-source-files="true" />




<!-- notification outbound flow -->
<channel id="errorChannel">
    <interceptors>
        <wire-tap channel="logger"/>
    </interceptors>
</channel>
<logging-channel-adapter id="logger" level="INFO"/>

<exception-type-router input-channel="errorChannel" default-output-channel="unhandled-error-channel">
    <mapping exception-type="aero.aice.apidcm.integration.exception.HandledException" channel="error-notification-channel" />
</exception-type-router>

<recipient-list-router input-channel="error-notification-channel">
    <recipient channel="queue-outbound-error"/>
    <recipient channel="queue-inbound-error"/>
</recipient-list-router>

<chain input-channel="queue-outbound-error">
    <service-activator ref="outbound-error-composer" />
    <jms:outbound-channel-adapter id="jms-out-error"
                                  destination="error-output-queue"
                                  connection-factory="outConnectionFactory"
                                  session-transacted="true"/>
</chain>

<chain input-channel="queue-inbound-error">
    <service-activator ref="error-notif-composer" />
    <jms:outbound-channel-adapter id="jms-in-error"
                                  destination="error-input-queue"
                                  connection-factory="outConnectionFactory"
                                  session-transacted="true"/>
</chain>


<channel id="unhandled-error-channel" />
<service-activator ref="exception-rethrow" input-channel="unhandled-error-channel"/>

最后,当tx在错误通道上回滚时,两个错误队列在任何情况下都会收到消息(就好像出站适配器不会参与事务一样),而正常流的tx(在没有发生错误时)工作得很好。

共有1个答案

邹修真
2023-03-14

没错。

因为您使用轮询入站通道适配器。它的逻辑是:

AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
  ...
                    if (!Poller.this.pollingTask.call()) {
                        break;
                    }
   ...
                catch (Exception e) {
                    if (e instanceof RuntimeException) {
                        throw (RuntimeException) e;
                    }
                    else {
                        throw new MessageHandlingException(new ErrorMessage(e), e);
                    }
                }
            }
        });

其中您的TX是污染任务代理的一部分,作为AOPTransactionInterceptor

error频道是的一部分。taskExecutor错误处理程序。因此,只有当我们从pollingTask抛出异常时,我们才能到达errorChannel。既然我们有TX,它当然会被拉回。

我的观点是:轮询入站通道适配器中的错误处理过程是在TX之外完成的。

考虑切换到

 类似资料:
  • 我需要在我的Spring集成上下文中动态地将消息分配给MessageChannel。当我知道我想要的MessageChannel的名称时,我可以通过从上下文中获取MessageChannel bean来做到这一点。 我需要做的是通过编程查找在ChannelAdapter/服务中设置的消息通道的名称/id。 但是,MessageChannel API没有与之关联的getName()或getId()方

  • 我目前在Spring集成中处理JMS事务时遇到困难。我正在创建的集成流程如下所示: JMS队列A- 我希望在JMS队列B和JMS队列C上保证消息的传递。然而,为了使传递稍微困难一些,我希望将导致错误的消息存储在单独的JMQ队列上,并在队列a上确认消息。 但是,如果我对此进行测试并在队列C上设置消息之前抛出错误(让我们假设队列B首先完成,队列C其次完成),事务将确认队列A并在队列B和错误队列上提交消

  • 在Spring集成中使用出站网关时,我试图在JMS标头中发送回复Q详细信息。我了解到JIRA#INT-97中的增强功能在将Spring消息标头发送到JMS目标之前将其复制到JMS标头。 在将消息发送到出站网关之前,将消息头设置如下。message.getHeader(). setAtcm(JmsTargetAdapter.JMS_REPLY_TO, myReplyDestation); 但是我无法

  • 使用Boot 2.2.2和Integration 5.2.2——当一条XML消息源于且未能解组(即,它不是XML)时,消息将按预期的方式进行。但是,当消息来自JMS时,通过相同的通道路由,并且无法解组,它不会被路由到,消息会回滚到JMS。在这之后,我陷入了一个没完没了的循环,只为同一条消息。 我从正确的终极方式迁移JMS事件到Spring与Spring Boot的集成,遵循了这个例子。是否有一些我

  • 我是Spring集成的新手,正在研究一个从单个通道向多个通道发送消息的示例,从这个角度来看,为每个通道使用Redis消息存储,目的是不丢失任何消息。要求将消息发送到通道-replyChannel、mailChannel和dbChannel。目前,代码只打印sysout语句,没有主要功能。 为了检查消息是否被正确路由,我编写了一个java测试类来发送15条消息。 检查输出,我发现一些消息正在丢失。也

  • 我知道Spring集成(SI)会将任何异常(在SI域下)封装到MessageException实例中,并将其放在“错误通道”上。 以下是我的spring配置文件中的几个片段: 客户端对我的实现类进行REST调用,该类将接收到的请求放在上面的spring配置文件中定义的网关上 如上面的spring配置文件所述,错误通道由转换器读取,该转换器为客户端创建有效的失败响应消息并返回该消息。 当我得到一个异