当前位置: 首页 > 知识库问答 >
问题:

Spring与JMS的集成和XA事务的条件回滚

赏阳嘉
2023-03-14

在通过MQ处理消息时,我想有条件地回滚XA事务并将MQ消息放回原始队列。
失败将被记录到数据库中,并且可以使用基于消息类型和错误的自定义逻辑从数据库中重试。
如果我们无法将错误记录到数据库中,则应回滚整个XA事务并将消息放回队列中。
每条消息都经过多个步骤处理,代码可以处理消息的重新提交/复制。

我有一个解决方案,但它的结果是在丑陋的配置,我想知道是否有更好的方法来实现同样的结果?我在考虑使用一个链,如果消息出错,它会忽略消息
我讨厌服务激活器不是被调用的实际服务。有更好的方法吗?

xml prettyprint-override"><!-- transactionManager is an XA transaction manager --> 
<jms:message-driven-channel-adapter id="batchMessagesIn" 
    destination="batchQueue" 
    error-channel="batchErrorChannel" 
    connection-factory="batchConnectionFactory" 
    channel="batchMessageInChannel" 
    task-executor="integrationTaskExecutor"
    recovery-interval="10000"
    concurrent-consumers="1"
    max-concurrent-consumers="1"
    cache-level="0"
    transaction-manager="transactionManager"/>

<channel id="processMessageFirstStage" />

<!-- The number of stages will depend on the type of message and this type of configuration will be duplicated multiple times -->
<int:service-activator input-channel="processMessageFirstStage" ref="messageServiceAdatper" method="processFirstStage" output-channel="checkIfFirstStageResultedInError"/>  

<!-- Payload is an instance of CustomMessage -->
<int:router input-channel="checkIfFirstStageResultedInError"
    expression="payload.inError"  >
    <mapping value="true" channel="messageInError" />
    <mapping value="false" channel="processMessageSecondStage" />
</int:router>   

<int:service-activator input-channel="processMessageSecondStage" ref="messageServiceAdatper" method="processSecondStage" output-channel="checkIfFirstStageResultedInError"/>    

<int:router input-channel="checkIfSecondStageResultedInError"
    expression="payload.inError"  >
    <mapping value="true" channel="messageInError" />
    <mapping value="false" channel="nullChannel" />
</int:router>   

<channel id="messageInError" />

<int:service-activator input-channel="messageInError" ref="errorMessageProcessor" method="handleMessageError" output-channel="nullChannel"/>

<beans:bean id="messageServiceAdatper" class="com.foo.messaging.MessageServiceAdatperImpl"/>
<beans:bean id="errorMessageProcessor" class="com.foo.messaging.ErrorMessageProcessorImpl"/>


<!-- this error channel will only be used for logging -->
<channel id="batchErrorChannel" />
<stream:stderr-channel-adapter channel="batchErrorChannel" append-newline="true" />
public class CustomMessage {
    private Throwable throwable;
    private String originalMessage;
    private boolean inError;
    private Object payload;
}

public class MessageServiceAdatperImpl {

    @Autowired
    private FirstStageService firstStageService;
    @Autowired
    private SecondStageService secondStageService;

    //Don't let a failure rollback the XA transaction
    @Transactional
    public CustomMessage processFirstStage(CustomMessage customMessage) {
        try {
            firstStageService.processFirstStage(customMessage.getPayload());
        } catch(Throwable e) {
            customMessage.setException(e);
        }
        return customMessage;
    }

    //Don't let a failure rollback the XA transaction
    @Transactional
    public CustomMessage processSecondStage(CustomMessage customMessage) {
        try {
            secondStageService.processSecondStage(customMessage.getPayload());
        } catch(Throwable e) {
           markMessageInError(customMessage,e)
        }
        return customMessage;
    }

    private void markMessageInError(CustomMessage customMessage, Throwable e) {
        customMessage.setThrowable(e);
        customMessage.setInError(true);
    }
}

public class FirstStageService () {
    //Start a new transaction. Code also handles duplicate messages
    @Transactional(propagation=Propagation.REQUIRES_NEW)
    public void processFirstStage() {
        //Do some work
    }
}

public class ErrorMessageProcessorImpl() {
   private static final Marker fatal = MarkerFactory.getMarker("FATAL");

    @Transactional
    public void handleMessageError(CustomMessage customMessage) {
         if (customMessage != null) {

            if (customMessage.isInError()) {
                try {

                    //At this point implment custom logic for logging message into the database. Message can be reprocessed from
                    //database with custom retry limits depending on message type and type of error.

                }
                catch (Throwable e) {
                    //At this point roll back the XA transaction and put the message back on the queue
                    logger.error(fatal, String.format("Fatal error attempting to save error", e));
                    throw new RuntimeException("Fatal error attempting to save error", e);
                }

            }
        }
    }
}

共有1个答案

笪智志
2023-03-14

既然您的逻辑与MessageServiceAdatperImpl非常接近,那么使用Spring集成(我的意思是

从另一方面,您可以编写自定义通用路由器,它只是通过其内部逻辑返回通道名称。

或者...路由滑自Spring集成4.1以来

 类似资料:
  • 我制作了一个POC,其中包含Spring-boot-starter-data-jpa和Spring-boot-starter-active emq。当提交jpa事务时,我想在代理(activeMQ)上推送jms消息。 我的代码: UtilsateurService具有"主"事务: “管理”Jms消息的SendMessage类: 我的主要班级: 在抛出异常之前,JMS消息被推送到activeMq代理

  • 我目前在Spring集成中处理JMS事务时遇到困难。我正在创建的集成流程如下所示: JMS队列A- 我希望在JMS队列B和JMS队列C上保证消息的传递。然而,为了使传递稍微困难一些,我希望将导致错误的消息存储在单独的JMQ队列上,并在队列a上确认消息。 但是,如果我对此进行测试并在队列C上设置消息之前抛出错误(让我们假设队列B首先完成,队列C其次完成),事务将确认队列A并在队列B和错误队列上提交消

  • 我正在尝试将我的应用程序与JMS队列集成(使用ActiveMQ)。我使用Spring集成作为集成组件。我们希望有连池。已将'maxContopt消费者'作为100提供给'DefaultMessageListenerContainer'。 问题是,一旦从队列中读取了所有消息,“消费者数量”仍为100(如ActiveMq控制台上所示)。当我们在数据库中使用连接池(通过JNDI)时,一旦不再需要连接,它

  • 我们的应用程序中存在以下问题。消息通过入站通道适配器传入,并使用持久消息存储在聚合器中累积。一旦释放策略中定义的条件返回true,消息将被发送到处理的下一阶段。如果在下一个处理阶段抛出异常,事务将回滚,消息将再次放入持久消息存储中。但是,事务不会将消息放回原始队列,因为消息一旦放在聚合器中就会被确认。这不是我们想要的。理想情况下,如果在处理聚合器已批处理的其中一条下游消息时发生异常,则事务只会回滚

  • 如果发生异常,ActiveMQ broker不会相应地重新传递消息。 当我使用进行简单的集成测试时,JMS消息会被重新传递,即可以实现重试机制 如何用ActiveMQ为Tomcat实现同样的功能?

  • 我正在尝试从JMS队列(使用ActiveMQ)读取消息。我面临的问题是,消息正在从队列中读取,但没有显示在“服务激活器”中。 非常感谢您的帮助。 我的代码如下: (1) Spring配置 (2) 服务激活器MDP: (3) 申请开始课程: 谢谢