当前位置: 首页 > 工具软件 > JTA > 使用案例 >

使用JTA解决多数据源事务问题

马银龙
2023-12-01

分享知识 传递快乐

 

在一些复杂的应用开发中,一个应用可能会涉及到连接多个数据源,所谓多数据源可以简单理解为至少连接两个及以上的数据库。在动手之前最好先了解对JTA有个了解,可参考:浅谈 JTA 事务

 

项目环境

  • springboot  2.3.1
  • mybatis plus
  • jta

项目依赖

pom.xml中关键依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--JTA组件核心依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>

<!-- mysql -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.19</version>
</dependency>

<!-- Druid 数据源 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>1.1.22</version>
</dependency>

<!-- mybatis plus -->
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.3.2</version>
</dependency>

 

配置多数据源

application.yml

spring:
  profiles:
    active: local
  application:
    name: jta-center
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    test-query: SELECT 1
    min-pool-size: 5
    max-pool-size: 20
    max-life-time: 0
    dynamic:
      datasource:
        one:
          driver-class-name: com.mysql.cj.jdbc.Driver # 3.2.0开始支持SPI可省略此配置
          url: jdbc:mysql://127.0.0.1:3306/one?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&autoReconnect=true&serverTimezone=Asia/Shanghai
          username: root
          password: root
        tow:
          driver-class-name: com.mysql.cj.jdbc.Driver 
          url: jdbc:mysql://127.0.0.1:3306/tow?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&autoReconnect=true&serverTimezone=Asia/Shanghai
          username: root
          password: root

配置 DataSource

创建事务数据源

package com.jta.demo.common.dynamic;

import com.baomidou.mybatisplus.core.MybatisConfiguration;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.core.env.Environment;

import javax.sql.DataSource;

/**
 * Title: 事务数据源
 * Description:
 *
 */
public class AtomikosDataSourceCreator {

    public static final String DATA_SOURCE_PREFIX = "spring.datasource.";

    /**
     * 创建数据源
     * <p>
     * 创建AtomikosDataSourceBean是使用Atomikos连接池的首选类
     *
     * @param environment
     * @param uniqueResourceName
     * @param dataBase
     * @return
     */
    public static AtomikosDataSourceBean createAtomikosDataSourceBean(Environment environment, String uniqueResourceName, String dataBase) {
        // 设置数据库连
        MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();

        mysqlXaDataSource.setDatabaseName(environment.getProperty(dataBase + "name"));
        mysqlXaDataSource.setURL(environment.getProperty(dataBase + "url"));
        mysqlXaDataSource.setUser(environment.getProperty(dataBase + "username"));
        mysqlXaDataSource.setPassword(environment.getProperty(dataBase + "password"));

        // 事务管理器
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();

        // 数据源唯一标识
        xaDataSource.setUniqueResourceName(uniqueResourceName);
        // XADataSource实现类,使用DruidXADataSource
        xaDataSource.setXaDataSourceClassName(environment.getProperty(DATA_SOURCE_PREFIX + "type"));
        // 最小连接数,默认1
        xaDataSource.setMinPoolSize(environment.getProperty(DATA_SOURCE_PREFIX + "min-pool-size", Integer.class));
        // 最大连接数,默认1
        xaDataSource.setMaxPoolSize(environment.getProperty(DATA_SOURCE_PREFIX + "max-pool-size", Integer.class));
        // 设置连接在池中被自动销毁之前保留的最大秒数。 可选,默认为0(无限制)。
        xaDataSource.setMaxLifetime(environment.getProperty(DATA_SOURCE_PREFIX + "max-life-time", Integer.class));
        // 返回连接前用于测试连接的SQL查询
        xaDataSource.setTestQuery(environment.getProperty(DATA_SOURCE_PREFIX + "test-query"));
        xaDataSource.setBorrowConnectionTimeout(60);
        xaDataSource.setXaDataSource(mysqlXaDataSource);

        return xaDataSource;
    }


    /**
     * 创建SqlSessionFactory实例
     */
    public static SqlSessionFactory createSqlSessionFactory(DataSource dataSource) throws Exception {
        /**
         * 必须使用MybatisSqlSessionFactoryBean,
         * 不能使用SqlSessionFactoryBean,不然会报invalid bound statement (not found)
         *
         * com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration#sqlSessionFactory(javax.sql.DataSource)
         * 源码中也是使用MybatisSqlSessionFactoryBean
         * 并且源码中使用了@ConditionalOnMissingBean,即IOC中如果存在了SqlSessionFactory实例,mybatis-plus就不创建SqlSessionFactory实例了
         */
        MybatisSqlSessionFactoryBean sessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sessionFactoryBean.setDataSource(dataSource);

        MybatisConfiguration configuration = new MybatisConfiguration();
        sessionFactoryBean.setConfiguration(configuration);

        return sessionFactoryBean.getObject();
    }
}

创建 one 数据源


package com.jta.demo.common.dynamic.datasource;

import com.jta.demo.common.dynamic.AtomikosDataSourceCreator;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;

import javax.sql.DataSource;


@Slf4j
@Configuration
@MapperScan(basePackages = "com.jta.demo.mappers.one.mapper", sqlSessionFactoryRef = OneDataSourcesConfiguration.SQL_SESSION_FACTORY)
public class OneDataSourcesConfiguration {

    public static final String DATABASE_PREFIX = "spring.datasource.dynamic.datasource.one.";

    public static final String DATA_SOURCE_NAME = "oneDataSource";
    public static final String SQL_SESSION_FACTORY = "oneSqlSessionFactory";


    /**
     * 通过配置文件创建DataSource,一个数据库对应一个DataSource
     *
     * @param environment 环境变量,spring-boot会自动将IOC中的environment实例设置给本参数值
     *                    由于IOC中有多个DataSource实例,必须给其中一个实例加上@Primary
     */
    @Primary
    @Bean(DATA_SOURCE_NAME)
    public DataSource dataSource(Environment environment) {
        log.info("initialize the one database...");
        return AtomikosDataSourceCreator.createAtomikosDataSourceBean(environment, DATA_SOURCE_NAME, DATABASE_PREFIX);
    }

    /**
     * 通过dataSource创建SqlSessionFactory
     * 由于IOC中有多个DataSource实例,必须给其中一个实例加上@Primary
     */
    @Primary
    @Bean(name = SQL_SESSION_FACTORY)
    public SqlSessionFactory sqlSessionFactory(@Qualifier(DATA_SOURCE_NAME) DataSource dataSource) throws Exception {
        return AtomikosDataSourceCreator.createSqlSessionFactory(dataSource);
    }

}

 

创建 two 数据源

package com.jta.demo.common.dynamic.datasource;

import com.jta.demo.common.dynamic.AtomikosDataSourceCreator;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

import javax.sql.DataSource;


@Slf4j
@Configuration
@MapperScan(basePackages = "com.jta.demo.mappers.two.mapper", sqlSessionFactoryRef = TwoDataSourcesConfiguration.SQL_SESSION_FACTORY)
public class TwoDataSourcesConfiguration {

    public static final String DATABASE_PREFIX = "spring.datasource.dynamic.datasource.two.";
    public static final String DATA_SOURCE_NAME = "twoDataSource";
    public static final String SQL_SESSION_FACTORY = "twoSqlSessionFactory";

    @Bean(DATA_SOURCE_NAME)
    public DataSource dataSource(Environment environment) {
        log.info("initialize the two database...");
        return AtomikosDataSourceCreator.createAtomikosDataSourceBean(environment, DATA_SOURCE_NAME, DATABASE_PREFIX);
    }

    @Bean(name = SQL_SESSION_FACTORY)
    public SqlSessionFactory sqlSessionFactory(@Qualifier(DATA_SOURCE_NAME) DataSource dataSource) throws Exception {
        return AtomikosDataSourceCreator.createSqlSessionFactory(dataSource);
    }
}

分布式事务配置

package com.jta.demo.common.dynamic;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

/**
 * Title: 分布式事务配置
 * Description:
 */
@Configuration
@EnableTransactionManagement
public class TransactionManagerConfiguration {
    /**
     * 初始化JTA事务管理器
     */
    @Bean(name = "userTransaction")
    // TODO 知识点,有兴趣可以了解以下 @SneakyThrows(Exception.class)
//    @SneakyThrows(Exception.class)
    public UserTransaction userTransaction() throws Throwable {
        UserTransactionImp userTransactionImp = new UserTransactionImp();
        userTransactionImp.setTransactionTimeout(10000);
        return userTransactionImp;
    }

    /**
     * 初始化Atomikos事务管理器
     */
    @Bean(name = "atomikosTransactionManager")
    public TransactionManager atomikosTransactionManager() throws Throwable {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(false);
        userTransactionManager.setTransactionTimeout(999999999);
        return userTransactionManager;
    }

    /**
     * 加载事务管理
     */
    @Bean(name = "transactionManager")
    public PlatformTransactionManager transactionManager(@Qualifier("atomikosTransactionManager") TransactionManager atomikosTransactionManager, @Qualifier("userTransaction") UserTransaction userTransaction) throws Throwable {
        return new JtaTransactionManager(userTransaction(), atomikosTransactionManager());
    }
}

到此多数据源事务已经配置完成,接下来需要完成业务代码即可。

 

注意

在使用JTA处理多数据源事务时,在执行业务逻辑时间较长时会出现事务超时的问题。

常见下异常如下:

Transaction 127.0.0.1.tm151796505627700002 has timed out and will rollback.
或
nested exception is javax.transaction.RollbackException: Prepare: NO vote

由于JTA事务默认超时时间是100000毫秒,超过这个时间,提交事务就会抛出异常。此时需要增加一个 jta.properties 配置文件。

修改事务默认超时时间。

# SAMPLE PROPERTIES FILE FOR THE TRANSACTION SERVICE
# THIS FILE ILLUSTRATES THE DIFFERENT SETTINGS FOR THE TRANSACTION MANAGER
# UNCOMMENT THE ASSIGNMENTS TO OVERRIDE DEFAULT VALUES;

# Required: factory implementation class of the transaction core.
# NOTE: there is no default for this, so it MUST be specified!
#
com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
#com.atomikos.icatch.max_timeout=2000

# Set base name of file where messages are output
# (also known as the 'console file').
#
# com.atomikos.icatch.console_file_name = tm.out

# Size limit (in bytes) for the console file;
# negative means unlimited.
#
# com.atomikos.icatch.console_file_limit=-1

# For size-limited console files, this option
# specifies a number of rotating files to
# maintain.
#
# com.atomikos.icatch.console_file_count=1

# Set the number of log writes between checkpoints
#
# com.atomikos.icatch.checkpoint_interval=500

# Set output directory where console file and other files are to be put
# make sure this directory exists!
#
# com.atomikos.icatch.output_dir = ./

# Set directory of log files; make sure this directory exists!
#
# com.atomikos.icatch.log_base_dir = ./

# Set base name of log file
# this name will be  used as the first part of
# the system-generated log file name
#
# com.atomikos.icatch.log_base_name = tmlog

# Set the max number of active local transactions
# or -1 for unlimited.
#
# com.atomikos.icatch.max_actives = 50

# Set the default timeout (in milliseconds) for local transactions
#
# com.atomikos.icatch.default_jta_timeout = 10000

# Set the max timeout (in milliseconds) for local transactions
#
# com.atomikos.icatch.max_timeout = 300000

# The globally unique name of this transaction manager process
# override this value with a globally unique name
#
# com.atomikos.icatch.tm_unique_name = tm

# Do we want to use parallel subtransactions? JTA's default
# is NO for J2EE compatibility
#
#com.atomikos.icatch.serial_jta_transactions=false

# If you want to do explicit resource registration then
# you need to set this value to false.
#
# com.atomikos.icatch.automatic_resource_registration=true

# Set this to WARN, INFO or DEBUG to control the granularity
# of output to the console file.
#
# com.atomikos.icatch.console_log_level=WARN

# Do you want transaction logging to be enabled or not?
# If set to false, then no logging overhead will be done
# at the risk of losing data after restart or crash.
#
# com.atomikos.icatch.enable_logging=true

# Should two-phase commit be done in (multi-)threaded mode or not?
# Set this to false if you want commits to be ordered according
# to the order in which resources are added to the transaction.
#
# NOTE: threads are reused on JDK 1.5 or higher.
# For JDK 1.4, thread reuse is enabled as soon as the
# concurrent backport is in the classpath - see
# http://mirrors.ibiblio.org/pub/mirrors/maven2/backport-util-concurrent/backport-util-concurrent/
#
# com.atomikos.icatch.threaded_2pc=false

# Should shutdown of the VM trigger shutdown of the transaction core too?
#
# com.atomikos.icatch.force_shutdown_on_vm_exit=false

# 以上是完整的配置
#
# 配置最大的事务活动个数,-1代表无限制
com.atomikos.icatch.max_actives = -1

# 默认超时时间,单位:毫秒
com.atomikos.icatch.default_jta_timeout = 3000000

# 默认最大超时时间,单位:毫秒
com.atomikos.icatch.max_timeout = 600000

 

 

 

 

 

 

 

 

—————————
如有不足请留言指正
相互学习,共同进步

 类似资料: