当前位置: 首页 > 工具软件 > JOTM > 使用案例 >

SpringBoot整合JOTM实现分布式事务

闻人昊昊
2023-12-01

0.前言

Spring3就已经移除了对JOTM的支持,但是很多老项目还是用的JOTM来管理分布式事务,所以这篇文章旨在整合SpringBoot JTA JOTM分布式框架.

1.引入maven组件

           <!-- 引入jotm core包 -->
		   <dependency>
                <groupId>org.ow2.jotm</groupId>
                <artifactId>jotm-core</artifactId>
                <version>2.1.10</version>
            </dependency>
		   <!-- 引入 commons-dbcp2 包 -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-dbcp2</artifactId>
                <version>2.6.0</version>
            </dependency>
		   <!-- 引入mybatis包-->
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
                <version>1.3.2</version>
            </dependency>

2.增加Jotm的配置

Spring3 之后就移除了对Jotm的支持,需要手动注入JotmFactoryBean的Bean

public class JotmFactoryBean implements FactoryBean<Current>, DisposableBean {

    private Current jotmCurrent;
    private Jotm jotm;
    public JotmFactoryBean() throws NamingException {
		// Check for already active JOTM instance.
        this.jotmCurrent = Current.getCurrent();

		// If none found, create new local JOTM instance.
        if (this.jotmCurrent == null) {
		// Only for use within the current Spring context:
		// local, not bound to registry.
            this.jotm = new Jotm(true, false);
            this.jotmCurrent = Current.getCurrent();
        }
    }
    /**
     * Set the default transaction timeout for the JOTM instance.
     * <p>Should only be called for a local JOTM instance,
     * not when accessing an existing (shared) JOTM instance.
     */
    public void setDefaultTimeout(int defaultTimeout) {
        this.jotmCurrent.setDefaultTimeout(defaultTimeout);
		// The following is a JOTM oddity: should be used for demarcation transaction only,
		// but is required here in order to actually get rid of JOTM's default (60 seconds).
        try {
            this.jotmCurrent.setTransactionTimeout(defaultTimeout);
        } catch (SystemException ex) {
		// should never happen
        }
    }
    /**
     * Return the JOTM instance created by this factory bean, if any.
     * Will be <code>null</code> if an already active JOTM instance is used.
     * <p>Application code should never need to access this.
     */
    public Jotm getJotm() {
        return this.jotm;
    }

    @Override
    public Current getObject() {
        return this.jotmCurrent;
    }

    @Override
    public Class getObjectType() {
        return this.jotmCurrent.getClass();
    }

    @Override
    public boolean isSingleton() {
        return true;
    }

    /**
     * Stop the local JOTM instance, if created by this FactoryBean.
     */
    @Override
    public void destroy() {
        if (this.jotm != null) {
            this.jotm.stop();
        }
    }
}

注入JotmFactoryBean

@Configuration
public class TransactionManagerConfiguration {
    @Bean
    public JotmFactoryBean transactionManager() throws NamingException {
        return new JotmFactoryBean();
    }
}

3.单个数据源的配置

@Configuration
@MapperScan(sqlSessionFactoryRef = "test1SqlSessionFactory", basePackages = "com.demo.**")
public class OdsDatabaseConfiguration {

    @Autowired
    TransactionManager transactionManager;

    @Bean("test1DataSource")
    @ConfigurationProperties("spring.datasource.test1")
    public DataSource omsDataSource() {
        BasicManagedDataSource ds = DataSourceBuilder.create().type(BasicManagedDataSource.class).build();
        ds.setTransactionManager(transactionManager);
        return ds;
    }

    @Bean("test1SqlSessionFactory")
    public SqlSessionFactory odsSqlSessionFactoryBean(@Qualifier("test1DataSource")DataSource test1DataSource) throws Exception {
        SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
        sessionFactory.setDataSource(odsDataSource);
        sessionFactory.setTransactionFactory(new ManagedTransactionFactory());
        return sessionFactory.getObject();
    }

    @Bean("test1SqlSessionTemplate")
    public SqlSessionTemplate odsSqlSessionTemplate(@Qualifier("test1SqlSessionFactory") SqlSessionFactory test1SqlSessionFactory) {
        return new SqlSessionTemplate(test1SqlSessionFactory);
    }
}

4.Java8 Lambda 实现数据库操作

@Component
public class SqlTemplate {

    private static final Logger LOGGER = LoggerFactory.getLogger(SqlTemplate.class);

    @Autowired
    private TransactionManager transactionManager;

    @Autowired(required = false)
    @Qualifier("test1SqlSessionFactory")
    private SqlSessionFactory test1SqlSessionFactory;

    private SqlTemplate() {
    }

    public TransactionManager getTransactionManager() {
        return transactionManager;
    }

    public SqlSessionFactory getTest1SqlSessionFactory() {
        return test1SqlSessionFactory;
    }

    public <T, R> R submitTest1Mapper(Function<T, R> function, Class<T> clazzMapper) {
        try (SqlSession sqlSession = test1SqlSessionFactory.openSession(ExecutorType.SIMPLE)) {
            try {
                T mapper = sqlSession.getMapper(clazzMapper);
                return function.apply(mapper);
            } finally {
                if (sqlSession != null) {
                    sqlSession.clearCache();
                    sqlSession.close();
                }
            }
        }
    }

    public <T> void executeTest1Mapper(Consumer<T> function, Class<T> clazzMapper) {
        try (SqlSession sqlSession = test1SqlSessionFactory.openSession(ExecutorType.SIMPLE)) {
            try {
                T mapper = sqlSession.getMapper(clazzMapper);
                function.accept(mapper);
            }catch (Exception e){
                throw new RuntimeException("处理失败:" + e.getMessage());
            }finally {
                if (sqlSession != null) {
                    sqlSession.clearCache();
                    sqlSession.close();
                }
            }
        }
    }
    

    public void executeTest1(Consumer<SqlSession> consumer) {
        executeTest1(consumer, ExecutorType.SIMPLE);
    }

    public void executeTest1(Consumer<SqlSession> consumer, ExecutorType executorType) {
        Assert.notNull(test1SqlSessionFactory, "未找到Test1数据源");
        execute(test1SqlSessionFactory, consumer, executorType);
    }

    public void execute(SqlSessionFactory sqlSessionFactory, Consumer<SqlSession> consumer) {
        execute(sqlSessionFactory, consumer, ExecutorType.SIMPLE);
    }

    public <T> T submitOMS(Function<SqlSession, T> function) {
        return submitOMS(function, ExecutorType.SIMPLE);
    }

    public <T> T submitOMS(Function<SqlSession, T> function, ExecutorType executorType) {
        Assert.notNull(test1SqlSessionFactory, "未找到Test1数据源");
        return submit(test1SqlSessionFactory, function, executorType);
    }

    public <T> T submit(SqlSessionFactory sqlSessionFactory, Function<SqlSession, T> function) {
        return submit(sqlSessionFactory, function, ExecutorType.SIMPLE);
    }

    /**
     * 带事务 没有返回值的方法
     * @param sqlSessionFactory
     * @param consumer
     * @param executorType
     */
    public void execute(SqlSessionFactory sqlSessionFactory, Consumer<SqlSession> consumer, ExecutorType executorType) {
        try (SqlSession sqlSession = sqlSessionFactory.openSession(executorType)) {
            try {
                transactionManager.begin();
                consumer.accept(sqlSession);
                sqlSession.commit();
                transactionManager.commit();
            } catch (Exception e) {
                LOGGER.error("处理异常!", e);
                try {
                    transactionManager.rollback();
                } catch (SystemException e1) {
                    LOGGER.error("事务回滚失败!", e);
                    throw new RuntimeException("事务回滚失败!");
                }
                throw new RuntimeException("处理失败:" + e.getMessage());
            }finally {
                if (sqlSession != null) {
                    sqlSession.clearCache();
                    sqlSession.close();
                }
            }
        }
    }

    /**
     * 带事务 有返回值的方法
     * @param sqlSessionFactory
     * @param function
     * @param executorType
     * @param <T>
     * @return
     */
    public <T> T submit(SqlSessionFactory sqlSessionFactory, Function<SqlSession, T> function, ExecutorType executorType) {
        try (SqlSession sqlSession = sqlSessionFactory.openSession(executorType)) {
            try {
                transactionManager.begin();
                T result = function.apply(sqlSession);
                sqlSession.commit();
                transactionManager.commit();
                return result;
            } catch (Exception e) {
                LOGGER.error("处理异常!", e);
                try {
                    transactionManager.rollback();
                } catch (SystemException e1) {
                    LOGGER.error("事务回滚失败!", e);
                }
                throw new RuntimeException("处理失败:" + e.getMessage());
            }
        }
    }
}
 类似资料: