今天介绍分布式事务XA协议,单个服务操作多个数据源,引入Atomikos 相关jar,进行实站演示,先说XA协议规范原理:
XA是由X/Open组织提出的分布式事务的规范。 XA规范主要定义了(全局)事务管理器(TM) 和(局部)资源管理器(RM)之间的接口。主流的关系型数据库产品都是实现了XA接口的。 XA接口是双向的系统接口,在事务管理器(TM)以及一个或多个资源管理器(RM)之间形成通信桥梁。 XA之所以需要引入事务管理器是因为,在分布式系统中,从理论上讲两台机器理论上无法达成一致的状态,需要引入一个单点进行协调。 由全局事务管理器管理和协调的事务,可以跨越多个资源(如数据库或JMS队列)和进程。全局事务管理器一般使用XA二阶段提交协议与数据库进行交互。下面进入实战:
1、springBoot项目引入jar
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency>
2、核心业务代码
import com.enjoy.service.TransferService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class XATransferServiceServiceImpl implements TransferService{
@Autowired
@Qualifier("nanJdbcTemplate")
private JdbcTemplate jamesJdbcTemplate;
@Autowired
@Qualifier("daoJdbcTemplate")
private JdbcTemplate peterJdbcTemplate;
@Transactional
public String transfer(int money) {
int resultNan = jamesJdbcTemplate.update("INSERT INTO bank_a(money,user_name)VALUES (?,?)",-money,"james");
int resultDao = peterJdbcTemplate.update("INSERT INTO bank_b(money,user_name)VALUES (?,?)",money,"peter");
if (money > 200){
throw new RuntimeException("money too large");//大于200后就回滚,以上两条sql操作无效
}
return resultNan+"-"+resultDao;
}
}
3、多数据源配置核心代码
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.sql.DataSource;
import javax.transaction.UserTransaction;
import java.util.Properties;
@Configuration
public class XADataSourceConfig {
@Autowired
private Environment env;
@Bean(name = "nanDataSource")//第一个数据源
@Primary
public DataSource jamesDataSource(Environment env) {
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setUniqueResourceName("nanDB");
ds.setPoolSize(5);
ds.setXaProperties(build("spring.datasource.druid.jamesDB."));
return ds;
}
@Bean(name = "daoDataSource")//第二个数据源
public DataSource peterDataSource(Environment env) {
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
Properties prop = build("spring.datasource.druid.peterDB.");
ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
ds.setUniqueResourceName("daoDB");
ds.setPoolSize(5);
ds.setXaProperties(prop);
return ds;
}
@Bean("nanJdbcTemplate")//数据源实例
public JdbcTemplate jamesJdbcTemplate(@Qualifier("nanDataSource") DataSource ds) {
return new JdbcTemplate(ds);
}
@Bean("daoJdbcTemplate")//数据源实例
public JdbcTemplate peterJdbcTemplate(@Qualifier("daoDataSource") DataSource ds) {
return new JdbcTemplate(ds);
}
@Bean//创建事务管理器
public JtaTransactionManager regTransactionManager () {
UserTransactionManager userTransactionManager = new UserTransactionManager();
UserTransaction userTransaction = new UserTransactionImp();
return new JtaTransactionManager(userTransaction, userTransactionManager);
}
private Properties build(String prefix) {
Properties prop = new Properties();
prop.put("url", env.getProperty(prefix + "url"));
prop.put("username", env.getProperty(prefix + "username"));
prop.put("password", env.getProperty(prefix + "password"));
return prop;
}
4、大家可以模拟进行操作了,这种XA协议对资源锁定力度比较大,同时锁两张表数据源,单体服务或并发量不高的服务完全没问题,但是不适合复杂的微服务场景,可以反复测试一下。下篇我们介绍LCN框架,敬请期待!