Spring3就已经移除了对JOTM的支持,但是很多老项目还是用的JOTM来管理分布式事务,所以这篇文章旨在整合SpringBoot JTA JOTM分布式框架.
<!-- 引入jotm core包 -->
<dependency>
<groupId>org.ow2.jotm</groupId>
<artifactId>jotm-core</artifactId>
<version>2.1.10</version>
</dependency>
<!-- 引入 commons-dbcp2 包 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.6.0</version>
</dependency>
<!-- 引入mybatis包-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.2</version>
</dependency>
Spring3 之后就移除了对Jotm的支持,需要手动注入JotmFactoryBean的Bean
public class JotmFactoryBean implements FactoryBean<Current>, DisposableBean {
private Current jotmCurrent;
private Jotm jotm;
public JotmFactoryBean() throws NamingException {
// Check for already active JOTM instance.
this.jotmCurrent = Current.getCurrent();
// If none found, create new local JOTM instance.
if (this.jotmCurrent == null) {
// Only for use within the current Spring context:
// local, not bound to registry.
this.jotm = new Jotm(true, false);
this.jotmCurrent = Current.getCurrent();
}
}
/**
* Set the default transaction timeout for the JOTM instance.
* <p>Should only be called for a local JOTM instance,
* not when accessing an existing (shared) JOTM instance.
*/
public void setDefaultTimeout(int defaultTimeout) {
this.jotmCurrent.setDefaultTimeout(defaultTimeout);
// The following is a JOTM oddity: should be used for demarcation transaction only,
// but is required here in order to actually get rid of JOTM's default (60 seconds).
try {
this.jotmCurrent.setTransactionTimeout(defaultTimeout);
} catch (SystemException ex) {
// should never happen
}
}
/**
* Return the JOTM instance created by this factory bean, if any.
* Will be <code>null</code> if an already active JOTM instance is used.
* <p>Application code should never need to access this.
*/
public Jotm getJotm() {
return this.jotm;
}
@Override
public Current getObject() {
return this.jotmCurrent;
}
@Override
public Class getObjectType() {
return this.jotmCurrent.getClass();
}
@Override
public boolean isSingleton() {
return true;
}
/**
* Stop the local JOTM instance, if created by this FactoryBean.
*/
@Override
public void destroy() {
if (this.jotm != null) {
this.jotm.stop();
}
}
}
注入JotmFactoryBean
@Configuration
public class TransactionManagerConfiguration {
@Bean
public JotmFactoryBean transactionManager() throws NamingException {
return new JotmFactoryBean();
}
}
@Configuration
@MapperScan(sqlSessionFactoryRef = "test1SqlSessionFactory", basePackages = "com.demo.**")
public class OdsDatabaseConfiguration {
@Autowired
TransactionManager transactionManager;
@Bean("test1DataSource")
@ConfigurationProperties("spring.datasource.test1")
public DataSource omsDataSource() {
BasicManagedDataSource ds = DataSourceBuilder.create().type(BasicManagedDataSource.class).build();
ds.setTransactionManager(transactionManager);
return ds;
}
@Bean("test1SqlSessionFactory")
public SqlSessionFactory odsSqlSessionFactoryBean(@Qualifier("test1DataSource")DataSource test1DataSource) throws Exception {
SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setDataSource(odsDataSource);
sessionFactory.setTransactionFactory(new ManagedTransactionFactory());
return sessionFactory.getObject();
}
@Bean("test1SqlSessionTemplate")
public SqlSessionTemplate odsSqlSessionTemplate(@Qualifier("test1SqlSessionFactory") SqlSessionFactory test1SqlSessionFactory) {
return new SqlSessionTemplate(test1SqlSessionFactory);
}
}
@Component
public class SqlTemplate {
private static final Logger LOGGER = LoggerFactory.getLogger(SqlTemplate.class);
@Autowired
private TransactionManager transactionManager;
@Autowired(required = false)
@Qualifier("test1SqlSessionFactory")
private SqlSessionFactory test1SqlSessionFactory;
private SqlTemplate() {
}
public TransactionManager getTransactionManager() {
return transactionManager;
}
public SqlSessionFactory getTest1SqlSessionFactory() {
return test1SqlSessionFactory;
}
public <T, R> R submitTest1Mapper(Function<T, R> function, Class<T> clazzMapper) {
try (SqlSession sqlSession = test1SqlSessionFactory.openSession(ExecutorType.SIMPLE)) {
try {
T mapper = sqlSession.getMapper(clazzMapper);
return function.apply(mapper);
} finally {
if (sqlSession != null) {
sqlSession.clearCache();
sqlSession.close();
}
}
}
}
public <T> void executeTest1Mapper(Consumer<T> function, Class<T> clazzMapper) {
try (SqlSession sqlSession = test1SqlSessionFactory.openSession(ExecutorType.SIMPLE)) {
try {
T mapper = sqlSession.getMapper(clazzMapper);
function.accept(mapper);
}catch (Exception e){
throw new RuntimeException("处理失败:" + e.getMessage());
}finally {
if (sqlSession != null) {
sqlSession.clearCache();
sqlSession.close();
}
}
}
}
public void executeTest1(Consumer<SqlSession> consumer) {
executeTest1(consumer, ExecutorType.SIMPLE);
}
public void executeTest1(Consumer<SqlSession> consumer, ExecutorType executorType) {
Assert.notNull(test1SqlSessionFactory, "未找到Test1数据源");
execute(test1SqlSessionFactory, consumer, executorType);
}
public void execute(SqlSessionFactory sqlSessionFactory, Consumer<SqlSession> consumer) {
execute(sqlSessionFactory, consumer, ExecutorType.SIMPLE);
}
public <T> T submitOMS(Function<SqlSession, T> function) {
return submitOMS(function, ExecutorType.SIMPLE);
}
public <T> T submitOMS(Function<SqlSession, T> function, ExecutorType executorType) {
Assert.notNull(test1SqlSessionFactory, "未找到Test1数据源");
return submit(test1SqlSessionFactory, function, executorType);
}
public <T> T submit(SqlSessionFactory sqlSessionFactory, Function<SqlSession, T> function) {
return submit(sqlSessionFactory, function, ExecutorType.SIMPLE);
}
/**
* 带事务 没有返回值的方法
* @param sqlSessionFactory
* @param consumer
* @param executorType
*/
public void execute(SqlSessionFactory sqlSessionFactory, Consumer<SqlSession> consumer, ExecutorType executorType) {
try (SqlSession sqlSession = sqlSessionFactory.openSession(executorType)) {
try {
transactionManager.begin();
consumer.accept(sqlSession);
sqlSession.commit();
transactionManager.commit();
} catch (Exception e) {
LOGGER.error("处理异常!", e);
try {
transactionManager.rollback();
} catch (SystemException e1) {
LOGGER.error("事务回滚失败!", e);
throw new RuntimeException("事务回滚失败!");
}
throw new RuntimeException("处理失败:" + e.getMessage());
}finally {
if (sqlSession != null) {
sqlSession.clearCache();
sqlSession.close();
}
}
}
}
/**
* 带事务 有返回值的方法
* @param sqlSessionFactory
* @param function
* @param executorType
* @param <T>
* @return
*/
public <T> T submit(SqlSessionFactory sqlSessionFactory, Function<SqlSession, T> function, ExecutorType executorType) {
try (SqlSession sqlSession = sqlSessionFactory.openSession(executorType)) {
try {
transactionManager.begin();
T result = function.apply(sqlSession);
sqlSession.commit();
transactionManager.commit();
return result;
} catch (Exception e) {
LOGGER.error("处理异常!", e);
try {
transactionManager.rollback();
} catch (SystemException e1) {
LOGGER.error("事务回滚失败!", e);
}
throw new RuntimeException("处理失败:" + e.getMessage());
}
}
}
}