在一些复杂的应用开发中,一个应用可能会涉及到连接多个数据源,所谓多数据源可以简单理解为至少连接两个及以上的数据库。在动手之前最好先了解对JTA有个了解,可参考:浅谈 JTA 事务
项目环境
项目依赖
pom.xml中关键依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--JTA组件核心依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>
<!-- Druid 数据源 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.22</version>
</dependency>
<!-- mybatis plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.2</version>
</dependency>
配置多数据源
application.yml
spring:
profiles:
active: local
application:
name: jta-center
datasource:
type: com.alibaba.druid.pool.DruidDataSource
test-query: SELECT 1
min-pool-size: 5
max-pool-size: 20
max-life-time: 0
dynamic:
datasource:
one:
driver-class-name: com.mysql.cj.jdbc.Driver # 3.2.0开始支持SPI可省略此配置
url: jdbc:mysql://127.0.0.1:3306/one?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&autoReconnect=true&serverTimezone=Asia/Shanghai
username: root
password: root
tow:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/tow?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&autoReconnect=true&serverTimezone=Asia/Shanghai
username: root
password: root
配置 DataSource
创建事务数据源
package com.jta.demo.common.dynamic;
import com.baomidou.mybatisplus.core.MybatisConfiguration;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.core.env.Environment;
import javax.sql.DataSource;
/**
* Title: 事务数据源
* Description:
*
*/
public class AtomikosDataSourceCreator {
public static final String DATA_SOURCE_PREFIX = "spring.datasource.";
/**
* 创建数据源
* <p>
* 创建AtomikosDataSourceBean是使用Atomikos连接池的首选类
*
* @param environment
* @param uniqueResourceName
* @param dataBase
* @return
*/
public static AtomikosDataSourceBean createAtomikosDataSourceBean(Environment environment, String uniqueResourceName, String dataBase) {
// 设置数据库连
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setDatabaseName(environment.getProperty(dataBase + "name"));
mysqlXaDataSource.setURL(environment.getProperty(dataBase + "url"));
mysqlXaDataSource.setUser(environment.getProperty(dataBase + "username"));
mysqlXaDataSource.setPassword(environment.getProperty(dataBase + "password"));
// 事务管理器
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
// 数据源唯一标识
xaDataSource.setUniqueResourceName(uniqueResourceName);
// XADataSource实现类,使用DruidXADataSource
xaDataSource.setXaDataSourceClassName(environment.getProperty(DATA_SOURCE_PREFIX + "type"));
// 最小连接数,默认1
xaDataSource.setMinPoolSize(environment.getProperty(DATA_SOURCE_PREFIX + "min-pool-size", Integer.class));
// 最大连接数,默认1
xaDataSource.setMaxPoolSize(environment.getProperty(DATA_SOURCE_PREFIX + "max-pool-size", Integer.class));
// 设置连接在池中被自动销毁之前保留的最大秒数。 可选,默认为0(无限制)。
xaDataSource.setMaxLifetime(environment.getProperty(DATA_SOURCE_PREFIX + "max-life-time", Integer.class));
// 返回连接前用于测试连接的SQL查询
xaDataSource.setTestQuery(environment.getProperty(DATA_SOURCE_PREFIX + "test-query"));
xaDataSource.setBorrowConnectionTimeout(60);
xaDataSource.setXaDataSource(mysqlXaDataSource);
return xaDataSource;
}
/**
* 创建SqlSessionFactory实例
*/
public static SqlSessionFactory createSqlSessionFactory(DataSource dataSource) throws Exception {
/**
* 必须使用MybatisSqlSessionFactoryBean,
* 不能使用SqlSessionFactoryBean,不然会报invalid bound statement (not found)
*
* com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration#sqlSessionFactory(javax.sql.DataSource)
* 源码中也是使用MybatisSqlSessionFactoryBean
* 并且源码中使用了@ConditionalOnMissingBean,即IOC中如果存在了SqlSessionFactory实例,mybatis-plus就不创建SqlSessionFactory实例了
*/
MybatisSqlSessionFactoryBean sessionFactoryBean = new MybatisSqlSessionFactoryBean();
sessionFactoryBean.setDataSource(dataSource);
MybatisConfiguration configuration = new MybatisConfiguration();
sessionFactoryBean.setConfiguration(configuration);
return sessionFactoryBean.getObject();
}
}
创建 one 数据源
package com.jta.demo.common.dynamic.datasource;
import com.jta.demo.common.dynamic.AtomikosDataSourceCreator;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import javax.sql.DataSource;
@Slf4j
@Configuration
@MapperScan(basePackages = "com.jta.demo.mappers.one.mapper", sqlSessionFactoryRef = OneDataSourcesConfiguration.SQL_SESSION_FACTORY)
public class OneDataSourcesConfiguration {
public static final String DATABASE_PREFIX = "spring.datasource.dynamic.datasource.one.";
public static final String DATA_SOURCE_NAME = "oneDataSource";
public static final String SQL_SESSION_FACTORY = "oneSqlSessionFactory";
/**
* 通过配置文件创建DataSource,一个数据库对应一个DataSource
*
* @param environment 环境变量,spring-boot会自动将IOC中的environment实例设置给本参数值
* 由于IOC中有多个DataSource实例,必须给其中一个实例加上@Primary
*/
@Primary
@Bean(DATA_SOURCE_NAME)
public DataSource dataSource(Environment environment) {
log.info("initialize the one database...");
return AtomikosDataSourceCreator.createAtomikosDataSourceBean(environment, DATA_SOURCE_NAME, DATABASE_PREFIX);
}
/**
* 通过dataSource创建SqlSessionFactory
* 由于IOC中有多个DataSource实例,必须给其中一个实例加上@Primary
*/
@Primary
@Bean(name = SQL_SESSION_FACTORY)
public SqlSessionFactory sqlSessionFactory(@Qualifier(DATA_SOURCE_NAME) DataSource dataSource) throws Exception {
return AtomikosDataSourceCreator.createSqlSessionFactory(dataSource);
}
}
创建 two 数据源
package com.jta.demo.common.dynamic.datasource;
import com.jta.demo.common.dynamic.AtomikosDataSourceCreator;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import javax.sql.DataSource;
@Slf4j
@Configuration
@MapperScan(basePackages = "com.jta.demo.mappers.two.mapper", sqlSessionFactoryRef = TwoDataSourcesConfiguration.SQL_SESSION_FACTORY)
public class TwoDataSourcesConfiguration {
public static final String DATABASE_PREFIX = "spring.datasource.dynamic.datasource.two.";
public static final String DATA_SOURCE_NAME = "twoDataSource";
public static final String SQL_SESSION_FACTORY = "twoSqlSessionFactory";
@Bean(DATA_SOURCE_NAME)
public DataSource dataSource(Environment environment) {
log.info("initialize the two database...");
return AtomikosDataSourceCreator.createAtomikosDataSourceBean(environment, DATA_SOURCE_NAME, DATABASE_PREFIX);
}
@Bean(name = SQL_SESSION_FACTORY)
public SqlSessionFactory sqlSessionFactory(@Qualifier(DATA_SOURCE_NAME) DataSource dataSource) throws Exception {
return AtomikosDataSourceCreator.createSqlSessionFactory(dataSource);
}
}
分布式事务配置
package com.jta.demo.common.dynamic;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
/**
* Title: 分布式事务配置
* Description:
*/
@Configuration
@EnableTransactionManagement
public class TransactionManagerConfiguration {
/**
* 初始化JTA事务管理器
*/
@Bean(name = "userTransaction")
// TODO 知识点,有兴趣可以了解以下 @SneakyThrows(Exception.class)
// @SneakyThrows(Exception.class)
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
return userTransactionImp;
}
/**
* 初始化Atomikos事务管理器
*/
@Bean(name = "atomikosTransactionManager")
public TransactionManager atomikosTransactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
userTransactionManager.setTransactionTimeout(999999999);
return userTransactionManager;
}
/**
* 加载事务管理
*/
@Bean(name = "transactionManager")
public PlatformTransactionManager transactionManager(@Qualifier("atomikosTransactionManager") TransactionManager atomikosTransactionManager, @Qualifier("userTransaction") UserTransaction userTransaction) throws Throwable {
return new JtaTransactionManager(userTransaction(), atomikosTransactionManager());
}
}
到此多数据源事务已经配置完成,接下来需要完成业务代码即可。
注意
在使用JTA处理多数据源事务时,在执行业务逻辑时间较长时会出现事务超时的问题。
常见下异常如下:
Transaction 127.0.0.1.tm151796505627700002 has timed out and will rollback.
或
nested exception is javax.transaction.RollbackException: Prepare: NO vote
由于JTA事务默认超时时间是100000毫秒,超过这个时间,提交事务就会抛出异常。此时需要增加一个 jta.properties 配置文件。
修改事务默认超时时间。
# SAMPLE PROPERTIES FILE FOR THE TRANSACTION SERVICE
# THIS FILE ILLUSTRATES THE DIFFERENT SETTINGS FOR THE TRANSACTION MANAGER
# UNCOMMENT THE ASSIGNMENTS TO OVERRIDE DEFAULT VALUES;
# Required: factory implementation class of the transaction core.
# NOTE: there is no default for this, so it MUST be specified!
#
com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
#com.atomikos.icatch.max_timeout=2000
# Set base name of file where messages are output
# (also known as the 'console file').
#
# com.atomikos.icatch.console_file_name = tm.out
# Size limit (in bytes) for the console file;
# negative means unlimited.
#
# com.atomikos.icatch.console_file_limit=-1
# For size-limited console files, this option
# specifies a number of rotating files to
# maintain.
#
# com.atomikos.icatch.console_file_count=1
# Set the number of log writes between checkpoints
#
# com.atomikos.icatch.checkpoint_interval=500
# Set output directory where console file and other files are to be put
# make sure this directory exists!
#
# com.atomikos.icatch.output_dir = ./
# Set directory of log files; make sure this directory exists!
#
# com.atomikos.icatch.log_base_dir = ./
# Set base name of log file
# this name will be used as the first part of
# the system-generated log file name
#
# com.atomikos.icatch.log_base_name = tmlog
# Set the max number of active local transactions
# or -1 for unlimited.
#
# com.atomikos.icatch.max_actives = 50
# Set the default timeout (in milliseconds) for local transactions
#
# com.atomikos.icatch.default_jta_timeout = 10000
# Set the max timeout (in milliseconds) for local transactions
#
# com.atomikos.icatch.max_timeout = 300000
# The globally unique name of this transaction manager process
# override this value with a globally unique name
#
# com.atomikos.icatch.tm_unique_name = tm
# Do we want to use parallel subtransactions? JTA's default
# is NO for J2EE compatibility
#
#com.atomikos.icatch.serial_jta_transactions=false
# If you want to do explicit resource registration then
# you need to set this value to false.
#
# com.atomikos.icatch.automatic_resource_registration=true
# Set this to WARN, INFO or DEBUG to control the granularity
# of output to the console file.
#
# com.atomikos.icatch.console_log_level=WARN
# Do you want transaction logging to be enabled or not?
# If set to false, then no logging overhead will be done
# at the risk of losing data after restart or crash.
#
# com.atomikos.icatch.enable_logging=true
# Should two-phase commit be done in (multi-)threaded mode or not?
# Set this to false if you want commits to be ordered according
# to the order in which resources are added to the transaction.
#
# NOTE: threads are reused on JDK 1.5 or higher.
# For JDK 1.4, thread reuse is enabled as soon as the
# concurrent backport is in the classpath - see
# http://mirrors.ibiblio.org/pub/mirrors/maven2/backport-util-concurrent/backport-util-concurrent/
#
# com.atomikos.icatch.threaded_2pc=false
# Should shutdown of the VM trigger shutdown of the transaction core too?
#
# com.atomikos.icatch.force_shutdown_on_vm_exit=false
# 以上是完整的配置
#
# 配置最大的事务活动个数,-1代表无限制
com.atomikos.icatch.max_actives = -1
# 默认超时时间,单位:毫秒
com.atomikos.icatch.default_jta_timeout = 3000000
# 默认最大超时时间,单位:毫秒
com.atomikos.icatch.max_timeout = 600000
—————————
如有不足请留言指正
相互学习,共同进步