当前位置: 首页 > 工具软件 > FESCAR > 使用案例 >

Fescar原理

陈坚
2023-12-01

Fescar原理
1、概述
fescar刚推出不久,没几天。看了github的Issues,有人问:可以直接商用吗?

作者的回复:

image
我们也看一下fescard的历史:

阿里是国内最早一批进行应用分布式(微服务化)改造的企业,所以很早就遇到微服务架构下的分布式事务问题。

2014 年,阿里中间件团队发布 TXC(Taobao Transaction Constructor),为集团内应用提供分布式事务服务。

2016 年,TXC 经过产品化改造,以 GTS(Global Transaction Service) 的身份登陆阿里云,成为当时业界唯一一款云上分布式事务产品,在阿云里的公有云、专有云解决方案中,开始服务于众多外部客户。

2019 年起,基于 TXC 和 GTS 的技术积累,阿里中间件团队发起了开源项目 Fescar(Fast & EaSy Commit And Rollback, FESCAR),和社区一起建设这个分布式事务解决方案。

TXC/GTS/Fescar 一脉相承,为解决微服务架构下的分布式事务问题交出了一份与众不同的答卷。

阿里出品的中间件有一种精神是值得赞扬的,就是出品的任何中间件都是业务中已经使用多年的产品。都是经过锤炼的产品,开源的话,去除和内部业务耦合的代码,开源出来。至少有一点,底层的代码是经历过双11这种高并发请求量打磨过的代码。在迁移开源的过程中,可能会存在一些bug,但是相对其他开源框架会小很多风险。

fescar的设计初衷:

对业务无侵入: 这里的 侵入 是指,因为分布式事务这个技术问题的制约,要求应用在业务层面进行设计和改造。这种设计和改造往往会给应用带来很高的研发和维护成本。我们希望把分布式事务问题在 中间件 这个层次解决掉,不要求应用在业务层面做额外的工作。

高性能: 引入分布式事务的保障,必然会有额外的开销,引起性能的下降。我们希望把分布式事务引入的性能损耗降到非常低的水平,让应用不因为分布式事务的引入导致业务的可用性受影响。

这两点我其实非常赞同,任何中间件在设计之初就应该考虑这两个平衡点,否则花费大量的精力,开发的产品不能够通用的话,对产品,对社区都没有太大的贡献能力。至少自己的项目里面,都无法做到任何一个项目可以快速嵌入,快速使用。

我们在看看fescar的历史规划:

image
目前fescar还处理0.1.0-SNAPSHOT的版本,这个版本的server端还是单机环境,真正的商用版本要到v1.0.0左右。所以这个时候弄清楚fescar的设计原理,可以和fescar一起成长进步。

2、原理
先根据官网的demo介绍一下原理。demo在github根目录下的examples下面,大家下载一下看看这个例子。关系图如下:

image
Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。

Transaction Manager ™: 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。

Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

一个典型的分布式事务过程:

TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。

XID 在微服务调用链路的上下文中传播。

RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。

TM 向 TC 发起针对 XID 的全局提交或回滚决议。

TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

Fscar两种模式:

Fescar 原生工作模式称为 AT(Automatic Transaction)模式,这种模式是对业务无侵入的。与之相应的另外一种工作模式称为 MT(Manual Transaction)模式,这种模式下,分支事务需要应用自己来定义业务本身及提交和回滚的逻辑。

AT模式:

业务逻辑不需要关注事务机制,分支与全局事务的交互过程自动进行。

image
MT模式:

业务逻辑需要被分解为 Prepare/Commit/Rollback 3 部分,形成一个 MT 分支,加入全局事务。

image
MT 模式一方面是 AT 模式的补充。另外,更重要的价值在于,通过 MT 模式可以把众多非事务性资源纳入全局事务的管理中

混合模式:

因为 AT 和 MT 模式的分支从根本上行为模式是一致的,所以可以完全兼容,即,一个全局事务中,可以同时存在 AT 和 MT 的分支。这样就可以达到全面覆盖业务场景的目的:AT 模式可以支持的,使用 AT 模式;AT 模式暂时支持不了的,用 MT 模式来替代。另外,自然的,MT 模式管理的非事务性资源也可以和支持事务的关系型数据库资源一起,纳入同一个分布式事务的管理中。

3、TM
我们在2原理的图上可以看到Business是TM资源

3.1、项目启动的时候注册TM服务

String applicationId = "dubbo-demo-account-service";
String txServiceGroup = "my_test_tx_group";
TMClient.init(applicationId, txServiceGroup);

3.2、xml注册
xml注册GlobalTransactionScaner的时候,在initClient()方法内自动注册TM,如果是AT_MODE(默认就是AT模式),则注册RM。很可惜的是目前版本(0.1.0-SNAPSHOT)还不支持MT模式,只有AT模式一种。如果确认了是TM资源,不建议通过该方式注册,因为该方式会同时注册RM和TM

TMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
    "Transaction Manager Client is initialized. applicationId[" + applicationId + "] txServiceGroup["
        + txServiceGroup + "]");
}
if ((AT_MODE & mode) > 0) {
RMClientAT.init(applicationId, txServiceGroup);
}
if ((MT_MODE & mode) > 0) {
    throw new NotSupportYetException();
}

3.3、TMClient.init过程

public class TMClient {
    public static void init(String applicationId, String transactionServiceGroup) {
        TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup);
        tmRpcClient.init();
    }
}

TmRpcClient通过底层netty通讯,向server(其实就是TC端)发送链接请求,建立通讯。

3.4、TM服务需要添加@GlobalTransaction注解

@GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
public void purchase(String userId, String commodityCode, int orderCount) {
    storageService.deduct(commodityCode, orderCount);
    orderService.create(userId, commodityCode, orderCount);
}

1、通过GlobalTransactionScanner.wrapIfNecessary(…)方法对@GlobalTransactional注解进行扫描并注册一个GlobalTransactionalInterceptor拦截器。
2、当对purchase(…)方法进行调用时候,会进入GlobalTransactionalInterceptor.invoke(…)方法。
3、invoke会进入TransactionalTemplate.execute(TransactionalExecutor business)方法。
这里重点讲解一下TransactionalTemplate类

public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
    // 1\. get or create a transaction
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
    // 2\. begin transaction
    try {
        tx.begin(business.timeout(), business.name());
    } catch (TransactionException txe) {
        throw new TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.BeginFailure);
    }
    Object rs = null;
    try {
        // Do Your Business
        rs = business.execute();
    } catch (Throwable ex) {
        // 3\. any business exception, rollback.
        try {
            tx.rollback();
            // 3.1 Successfully rolled back
            throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
        } catch (TransactionException txe) {
            // 3.2 Failed to rollback
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.RollbackFailure, ex)
        }
    }
    // 4\. everything is fine, commit.
    try {
        tx.commit();
    } catch (TransactionException txe) {
        // 4.1 Failed to commit
        throw new TransactionalExecutor.ExecutionException(tx, txe,
            TransactionalExecutor.Code.CommitFailure)
    }
    return rs;
}

从本地ThreadLocal里面拿一个GlobalTransaction,第一次拿不到,因为本地的xid值是空。

tx.begin的时候,由于xid是空并且role==Launcher,则从TC server获取一个XID,xid格式:172.16.18.228:8091:201266589,GlobalStatus.begin并且把xid刷新到本地ThreadLocal缓存里面

执行本地业务business.execute();

根据业务执行状态,commit/rollback

返回业务执行结果rs

3.5、TM Commit
tm的commit最终会创建一个GlobalCommitRequest去请求server端

public GlobalStatus commit(String xid) throws TransactionException {
    long txId = XID.getTransactionId(xid);
    GlobalCommitRequest globalCommit = new GlobalCommitRequest();
    globalCommit.setTransactionId(txId);
    GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
    return response.getGlobalStatus();
  }

参考4.4RmMessageListener

3.6、TM Rollback
tm的rollback最终会创建一个GlobalRollbackRequest去请求server端

public GlobalStatus rollback(String xid) throws TransactionException {
   long txId = XID.getTransactionId(xid);
   GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
   globalRollback.setTransactionId(txId);
   GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
   return response.getGlobalStatus();
 }

参考4.4RmMessageListener

4、RM
我们在2原理的图上可以看到Account,Order,Storage都是RM资源。

煊檍的说明:RM 来管理 Resource(即 DataSourceProxy),AT 模式 RM 管理的资源,实际上就是数据源。

RM的创建方式有两种

4.1、项目启动的时候注册RM服务
如AccountServiceImpl

String txServiceGroup = “my_test_tx_group”;
RMClientAT.init(applicationId, txServiceGroup);
4.2、xml注册
xml里面注册GlobalTransactionScaner的时候,在initClient()方法内自动注册TM,如果是AT_MODE(默认就是AT模式),则注册RM。很可惜的是目前版本(0.1.0-SNAPSHOT)还不支持MT模式,只有AT模式一种。如果确认了是RM资源,不建议通过该方式注册,因为该方式会同时注册RM和TM。

TMClient.init(applicationId, txServiceGroup);
 if (LOGGER.isInfoEnabled()) {
    LOGGER.info(
        "Transaction Manager Client is initialized. applicationId[" + applicationId + "] txServiceGroup["
            + txServiceGroup + "]");
 }
 if ((AT_MODE & mode) > 0) {
    RMClientAT.init(applicationId, txServiceGroup);
 }
 if ((MT_MODE & mode) > 0) {
        throw new NotSupportYetException();
 }

4.3、RMClientAT.init过程

public class RMClientAT {
    public static void init(String applicationId, String transactionServiceGroup) {
        RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
        AsyncWorker asyncWorker = new AsyncWorker();
        asyncWorker.init();
        DataSourceManager.init(asyncWorker);
        rmRpcClient.setResourceManager(DataSourceManager.get());
        rmRpcClient.setClientMessageListener(new RmMessageListener(new RMHandlerAT()));
        rmRpcClient.init();
    }
}

RMRpcClient通过底层netty通讯,向server(其实就是TC端)发送链接请求,建立通讯。

AsynWorker启动一个timerExecutor每秒进行一次doBranchCommits()的调用,这个方式是去拿ASYNC_COMMIT_BUFFER集合里面的缓存数据,如果有数据则,则根据resourceID去拿取对应的dataSourceProxy,然后根据xid,branchId删除UndoLog表下的记录

让RmRpcClient注入一个ResourceManager

设置RmRpcClient一个RmMessageListener,这个Listener只有一个onMessage(…)方法,如果是一个CommitRequest/RollbackRequest,则向server端发送一个RpcMessage

RmRpcClient初始化完成

4.4 、RmMessageListener过程
RmMessgeListener是接收TC端发来的branchCommit/branchRollback命令。

其中branchRollbackRequest请求的执行过程如下

fescar事务回滚.jpg
其中branchCommitRequest请求的执行过程如下

fescar事务提交.jpg
5、RM-DATASOURCE
5.1、注册
注册RM的时候,必须注入一个DruidDataSource到DataSourceProxy里面

 

5.2、接管
而自定义的PreparedStatementProxy实现PreparedStatement接口,并重写了executeUpdate()方法,在JdbcTemplate执行ps.executeUpdate(),完全由PreparedStatementProxy接管。源码如下:

@Override
public int executeUpdate() throws SQLException {
    return ExecuteTemplate.execute(this, new StatementCallback

然后接着到ExecuteTemplate.execute(…)主要方法实现

public static

首先判断是否全局事务,不是的话,当普通sql去执行

把相应的sql转化成SQLRecognizer,其中有MySQLInsertRecognizer、MySQLUpdateRecognizer、MySQLDeleteRecognizer、MySQLSelectForUpdateRecognizer。通过SQLVisitorFactory.get(…)方法我们也可以发现,目前fescar只支持mysql。

通过不同的SQLType创建不同的Executor,其中有InsertExecutor、UpdateExecutor、DeleteExecutor、SelectForUpdateExecutor、PlainExecutor。

Executor.execute(args)去执行相应的实现。

例如:项目例子里面的

update storage_tbl set count = count - ? where commodity_code = ?

首先会找到UpdateExecutor,

UpdateExecutor继承了AbstractDMLBaseExecutor

AbstractDMLBaseExecutor继承了BaseTransactionalExecutor

BaseTransactionalExecutor实现了Executor的execute(Object … args)方法

com.alibaba.fescar.rm.datasource.exec.BaseTransactionalExecutor类

@Override
public Object execute(Object... args) throws Throwable {
    String xid = RootContext.getXID();
    statementProxy.getConnectionProxy().bind(xid);
    return doExecute(args);
}

这里的前一步是dubbo的@SPI已经把xid注入到了RootContext的上下文

拿到xid后,去绑定到ConnectionContext的上下文中

然后调用了AbstractDMLBaseExecutor.doExecute(…)方法

public T doExecute(Object... args) throws Throwable {
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    if (connectionProxy.getAutoCommit()) {
        return executeAutoCommitTrue(args);
    } else {
        return executeAutoCommitFalse(args);
    }
}

这里的executeAutoCommitXXX是整个RM数据层的精髓所在,会在这里花大量精力去讲解这里的原理

5.3、executeAutoCommitFalse(args)方法
这里首先会走beforeImage()

beforeImage 首先会创建一个行锁的,拿到数据库面操作变更前的数据(old data)

SELECT ID, count FROM storage_tbl WHERE commodity_code = ? FOR UPDATE

然后去执行业务的sql

update storage_tbl set count = count - ? where commodity_code = ?

执行afterImage 去创建一个行锁,拿到数据库变更后的数据(new data)

after和before最大的区别应该是after拿着before提供的主键去查找了吧

SELECT ID, count FROM storage_tbl WHERE ID = ? FOR UPDATE

最后会把beforeImage,afterImage 组装创建一个SQLUndoLog,丢进ConnectionContext的buffer缓存里面

5.4、executeAutoCommitTrue(args)方法
首先开启事务

创建一个失败尝试的类LockRetryController(默认尝试30次)

调用executeAutoCommitFalse(args)方法,

提交事务(ConnectionProxy.commit),这里需要细说一下

返回结果集

executeAutoCommitTrue和executeAutoCommitFalse最大的区别是本地事务是否开启了。

5.5、关于ConnectionProxy.commit原理

public void commit() throws SQLException {
    if (context.inGlobalTransaction()) {
        try {
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e);
        }
        try {
            if (context.hasUndoLog()) {
                UndoLogManager.flushUndoLogs(this);
            }
            targetConnection.commit();
        } catch (Throwable ex) {
            report(false);
            if (ex instanceof SQLException) {
                throw (SQLException) ex;
            } else {
                throw new SQLException(ex);
            }
        }
        report(true);
        context.reset();
    } else {
        targetConnection.commit();
    }
}

首先去进行register操作,register是去server端注册一个branch。会返回一个branchId

ConnectionContext的buffer缓存里面如果有UndoLog的话,会把缓存数据的UndoLog全部写入表里

进行report()操作(默认5次尝试),会把本地事务执行的情况汇报到server端,至此PhaseOne执行完成

ConnectionContext 上下文进行清空操作

6、TC(server)
暂缺

7、疑问
7.1 、datasource解析问题
如果我们的StorageService 定为是一个RM资源,也配置了datasource注入到DatasourceProxy里面去接管。默认所有的sql语句都会被SQLVisitorFactory去解析,这可能项目里面只会在分布式事务相关的sql上下文才想进入这个factory去解析,其他和事务不相关的sql不想再走一遍ExecuteTemplate。

这里不用担心,因为dubbo模块自动给我们注册了xid的上下文,在调用到RM资源后,会判断这个xid是否存在,如果存在的话,会走SQLVisitorFactory去解析,如果没有xid,则直接走execute(…)出去,基本对性能没有影响。

if (!RootContext.inGlobalTransaction()) {
    // Just work as original statement
    return statementCallback.execute(statementProxy.getTargetStatement(), args);
}

7.2 、datasource多数据源问题
RM是对datasourceProxy的一个代理。如果一个项目里面有多个数据源,是否要开启多个RM

7.3 、脏读问题
RM的一阶段提交后,是通过datasourceProxy开启事务,直接把RM操作的sql提交的,这个时候这条数据在数据库里面是可见的。如果后续遇到回滚,则从undo_log里面rollback已提交的数据。使用的时候只能业务里面避免这样的问题。就类似只读数据库的数据同步延时问题,需要业务方做实现方案。

7.4 、undo_log表
按代码的需求,一个RM资源里面需要一个undo_log表,这会导致很多库里面都有这样一个undo_log表存在。

7.5 、beforeImage,afterImage 组装创建一个SQLUndoLog,为什么afterImage的新数据也要放undolog ?
在branch commit 的时候,undolog数据要进行删除,基本无用。在branch rollback的时候,会根据branchId,xid去undolog表里面查找相应的回滚信息。详细见UndoLogManager.undo(…)方法,相应的回滚会判断是何种类型,会根据类型选择执行器,目前有以下三种执行器

MySQLUndoDeleteExecutor 会根据beforeImage进行回滚

MySQLUndoUpdateExecutor 会根据beforeImage进行回滚

MySQLUndoInsertExecutor 会根据afterImage进行回滚

所以保存记录前信息,记录后的信息,在回滚的时候拿到这些前后信息还是有必要的。况且rollback_info是一个longblob类型,后续扩展在里面的数据应该还会更多。

-------转载自语雀

作者:尼古拉斯_谭
链接:https://www.jianshu.com/p/ea9e3f4460bd
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

 类似资料: