当前位置: 首页 > 知识库问答 >
问题:

Apache TomEE外部ActiveMQ资源在分布式事务中不回滚

柯唯
2023-03-14

我试图在ApacheTome中实现分布式事务。换句话说,流程是:

  • 消息读取器(即消息驱动bean)从队列(1)中读取并处理一条消息触发:

行动1,2,

托米。xml

<?xml version="1.0" encoding="UTF-8"?>
<tomee>
     this resource adapter is just necessary to tell tomee to not start internal ActiveMq instance
    <Resource id="MyAdapter" type="ActiveMQResourceAdapter">
        BrokerXmlConfig
        ServerUrl tcp://fakehost:666 
    </Resource> 

     <Resource id="jms/MyIncomingConnFactory" type="javax.jms.ConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
        BrokerURL tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0 
    </Resource>

    <Resource id="jms/MyOutgoingConnFactory" type="javax.jms.ConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
        BrokerURL tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
    </Resource>

    <Resource id="jms/MyOutgoingQueue" class-name="org.apache.activemq.command.ActiveMQQueue">
        PhysicalName MY_OUTGOING_QUEUE 
    </Resource>

    <Resource id="jms/MyIncomingQueue" class-name="org.apache.activemq.command.ActiveMQQueue">
        PhysicalName MY_INCOMING_QUEUE 
    </Resource>

    <Resource id="jdbc/myDBXAPooled" type="DataSource">
        XaDataSource myDBXA
        DataSourceCreator dbcp
        JtaManaged true
        UserName TestUser
        Password TestPassword
        MaxWait 2000
        ValidationQuery SELECT 1    
        MaxActive 15
    </Resource> 

    <Resource id="myDBXA" type="XADataSource" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
        Url jdbc:mysql://localhost:3306/test
        User TestUser
        Password TestPassword
    </Resource>
</tomee>

Springconfig。xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jee="http://www.springframework.org/schema/jee"
            xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">


    <!-- <jee:jndi-lookup jndi-name="myDBXAPooled" id="myDatasource" resource-ref="true" />  -->
    <jee:jndi-lookup jndi-name="jms/MyOutgoingConnFactory" id="myOutgoingConnFactory" resource-ref="true" />
    <jee:jndi-lookup jndi-name="jms/MyIncomingConnFactory" id="myIncomingConnFactory" resource-ref="true" />  
    <jee:jndi-lookup jndi-name="jms/MyOutgoingQueue" id="myOutgoingQueue" resource-ref="true" />
    <jee:jndi-lookup jndi-name="jms/MyIncomingQueue" id="myIncomingQueue" resource-ref="true" />
    <jee:jndi-lookup jndi-name="jdbc/myDBXAPooled" id="myDatasource" resource-ref="true" />

    <tx:jta-transaction-manager/>
    <!-- <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/> -->
    <!-- the previous two ways of getting the transactionManager seems equivalent and both get Geronimo -->


</beans>

SpringConfig。xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jee="http://www.springframework.org/schema/jee"
            xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">


    <bean id="messageListener" class="com.test.MyListener">
        <property name="connectionFactory" ref="myIncomingConnFactory" />
        <property name="destination" ref="myIncomingQueue" />
        <!-- <property name="sessionTransacted" value="true" /> -->
        <property name="concurrentConsumers" value="1" />
        <property name="maxConcurrentConsumers" value="6" />
        <property name="messageListener" ref="myMessageProcessor" />
        <property name="transactionManager" ref="transactionManager" />
        <property name="taskExecutor" ref="msgListenersTaskExecutor" />
    </bean>

    <bean id="myMessageProcessor" class="com.test.MyMessageReceiver">
        <property name="forwardConnectionFactory" ref="myOutgoingConnFactory" />
        <property name="forwardQueue" ref="myOutgoingQueue" />
        <property name="datasource" ref="myDatasource" />
    </bean>

    <bean id="msgListenersTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"/>



</beans>

MyMessageReceiver。爪哇:

package com.test;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.sql.DataSource;

import org.apache.log4j.Logger;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

public class MyMessageReceiver implements MessageListener {

    static Logger log = Logger.getLogger(MyMessageReceiver.class);

    private ConnectionFactory forwardConnectionFactory;
    private Queue forwardQueue;
    private DataSource datasource;

    public void setForwardConnectionFactory(ConnectionFactory connFactory) {
        forwardConnectionFactory=connFactory;
    }
    public void setforwardQueue(Queue queue) {
        forwardQueue=queue;
    }
    public void setDatasource(DataSource ds) {
        datasource=ds;
    }

    @Override
    @Transactional(propagation=Propagation.REQUIRED)
    public void onMessage(Message message) {

        log.info("************************************");
        MyListener listener = (MyListener)SpringContext.getBean("messageListener");
        listener.printInfo();
        log.info("************************************");

        TextMessage msg = (TextMessage) message;
        String text = null;
        try {
            text = msg.getText();

            if (text != null) log.info("MESSAGE RECEIVED: "+ text);

            updateDB(text); // function call to update DB

            sendMsg(text);   // function call to publish messages to queue

           System.out.println("****************Rollback");
            // Throwing exception to rollback DB, Message should not be 
             // published and consumed message sent to a DLQ 
             //(Broker side DLQ configuration already done) 
        throw new RuntimeException();
            //if (text!=null && text.indexOf("rollback")!=-1) throw new RuntimeException("Message content includes the word rollback");

        } catch (Exception e) {
            log.error("Rolling back the entire XA transaction");
            log.error(e.getMessage());
            throw new RuntimeException("Rolled back because of "+e.getMessage());
        }

    }

    private void updateDB(String text) throws Exception {

        Connection conn = null;
        PreparedStatement ps = null;
        try {
            System.out.println("*******datasource "+datasource);
            conn = datasource.getConnection();
            System.out.println("*******conn "+conn.getMetaData().getUserName());
            if (conn!=null) {
                System.out.println("*******conn "+conn.getMetaData().getUserName());
                ps = conn.prepareStatement("INSERT INTO MY_TABLE (name) VALUES(?)");
                ps.setString(1, text);
                ps.executeUpdate();
            }
        } catch (Exception e) {
            throw e;
        } finally {
            if (ps!=null) {
                try {
                    ps.close();
                } catch (SQLException e) {
                    log.error(e.getMessage());
                    // do nothing
                }
            }
            if (conn!=null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    log.error(e.getMessage());
                    // do nothing
                }
            }
        }

    }

    private void sendMsg(String msgToBeSent) throws Exception {

        javax.jms.Connection conn = null;
        Session session = null;
        try {
            System.out.println("*************forwardConnectionFactory"+forwardConnectionFactory);
            conn = forwardConnectionFactory.createConnection();
            session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
            MessageProducer messageProducer = session.createProducer(forwardQueue);
            TextMessage msg = session.createTextMessage(msgToBeSent);
            messageProducer.send(msg);

        } catch (Exception e) {
            throw e;
        } finally {
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    // do nothing
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    // do nothing
                }
            }
        }
    }

}

我的听众。爪哇:

package com.test;

import javax.transaction.Status;
import javax.transaction.SystemException;

import org.apache.log4j.Logger;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.jta.JtaTransactionManager;

public class MyListener extends DefaultMessageListenerContainer {

    static Logger log = Logger.getLogger(MyListener.class);

    public void printInfo() {

        try {

            log.info("trans manager="+((JtaTransactionManager)this.getTransactionManager()).getTransactionManager()+","+((JtaTransactionManager)this.getTransactionManager()).getTransactionManager().getStatus()+", this.isSessionTransacted()="+this.isSessionTransacted());
            log.info("STATUS_ACTIVE="+Status.STATUS_ACTIVE);
            log.info("STATUS_COMMITTEDE="+Status.STATUS_COMMITTED);
            log.info("STATUS_COMMITTING="+Status.STATUS_COMMITTING);
            log.info("STATUS_MARKED_ROLLBACK="+Status.STATUS_MARKED_ROLLBACK);
            log.info("STATUS_NO_TRANSACTION="+Status.STATUS_NO_TRANSACTION);
            log.info("STATUS_PREPARED="+Status.STATUS_PREPARED);
            log.info("STATUS_PREPARING="+Status.STATUS_PREPARING);
            log.info("STATUS_ROLLEDBACK="+Status.STATUS_ROLLEDBACK);
            log.info("STATUS_ROLLING_BACK="+Status.STATUS_ROLLING_BACK);
            log.info("STATUS_UNKNOWN="+Status.STATUS_UNKNOWN);



        } catch (SystemException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    public void forceRollback() {
        try {
            ((JtaTransactionManager)this.getTransactionManager()).getTransactionManager().setRollbackOnly();
        } catch (IllegalStateException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SecurityException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SystemException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

在更新数据库并将消息发送到传出队列之后,我故意抛出RuntimeException,只是为了测试数据库和message broker的事务回滚。

这三个操作在成功的情况下都会提交,但只有在失败的情况下才会回滚数据库操作,而这两个JMS操作仍然会提交。

它可以是:

  • 我的书中有错误的设置。xml(尤其是队列连接工厂)或其他地方
  • 虫子

我已经花了很多时间和这件事斗争,寻找可能的解决办法。

很高兴听到你对这件事的看法,如果这是我这边的错误,我再次表示道歉。

共有1个答案

谷梁驰
2023-03-14

我认为您需要使用ActiveMQ JCA资源适配器来确保连接自动登记到XA事务中。试试这个:

<tomee>
    <Resource id="MyJmsResourceAdapter" type="ActiveMQResourceAdapter">
        # Do not start the embedded ActiveMQ broker
        BrokerXmlConfig  =
        ServerUrl = tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
    </Resource>

    <Resource id="jms/MyIncomingConnFactory" type="javax.jms.ConnectionFactory">
        resourceAdapter = MyJmsResourceAdapter
        transactionSupport = xa
    </Resource>

    <Resource id="jms/MyOutgoingConnFactory" type="javax.jms.ConnectionFactory">
        resourceAdapter = MyJmsResourceAdapter
        transactionSupport = xa
    </Resource>

    <Resource id="jms/MyOutgoingQueue" type="javax.jms.Queue"/>
    <Resource id="jms/MyIncomingQueue" type="javax.jms.Queue"/>

    <Resource id="jdbc/myDBXAPooled" type="DataSource">
        XaDataSource myDBXA
        DataSourceCreator dbcp
        JtaManaged true
        UserName TestUser
        Password TestPassword
        MaxWait 2000
        ValidationQuery SELECT 1    
        MaxActive 15
    </Resource> 

    <Resource id="myDBXA" type="XADataSource" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
        Url jdbc:mysql://localhost:3306/test
        User TestUser
        Password TestPassword
    </Resource>
</tomee>
 类似资料:
  • ShardingSphereTransactionManager SPI 名称 详细说明 ShardingSphereTransactionManager 分布式事务管理器 已知实现类 详细说明 XAShardingSphereTransactionManager 基于 XA 的分布式事务管理器 SeataATShardingSphereTransactionManager 基于 Seata 的分

  • ShardingSphere-Proxy 接入的分布式事务 API 同 ShardingSphere-JDBC 保持一致,支持 LOCAL,XA,BASE 类型的事务。 XA 事务 ShardingSphere-Proxy 原生支持 XA 事务,默认的事务管理器为 Atomikos。 可以通过在 ShardingSphere-Proxy 的 conf 目录中添加 jta.properties 来定

  • 通过 Apache ShardingSphere 使用分布式事务,与本地事务并无区别。 除了透明化分布式事务的使用之外,Apache ShardingSphere 还能够在每次数据库访问时切换分布式事务类型。 支持的事务类型包括 本地事务、XA事务 和 柔性事务。可在创建数据库连接之前设置,缺省为 Apache ShardingSphere 启动时的默认事务类型。

  • 背景 数据库事务需要满足 ACID(原子性、一致性、隔离性、持久性)四个特性。 原子性(Atomicity)指事务作为整体来执行,要么全部执行,要么全不执行。 一致性(Consistency)指事务应确保数据从一个一致的状态转变为另一个一致的状态。 隔离性(Isolation)指多个事务并发执行时,一个事务的执行不应影响其他事务的执行。 持久性(Durability)指已提交的事务修改数据会被持久

  • 单文档原子性可满足大多数业务需求 在 MongoDB 中,对单个文档的操作是原子操作。 由于 MongoDB 文档数据模型,一个文档中通过嵌入式的文档和数组来表示传统关系数据库模型中的一对一、一对多关系,而不是通过文档之间的复杂关系来描述业务需求中的一对一、一对多关系。 所以单文档原子性可以满足实际生产中大多数关于事务的需求。 对于需要对多个文档(在单个或多个集合中)进行原子读写的情况,Mongo

  • Generators 相关文章 The Basics Of ES6 Generators By Kyle Simpson ES6 generators in depth By Axel Rauschmayer redux-saga 相关文章 Redux nowadays : From actions creators to sagas By Riad Benguella Managing Side