Mycat分布式事务的实现
随着并发量、数据量越来越大、业务已经细化到不能再按照业务划分的情况下不得不使用分布式数据库提高系统性能,在分布式系统中各个节点在物理上都是相对独立的,独立每一台节点数据操作都可以满足ACID。但是,各独立节点之间无法知道其他节点事务的执行情况,如果要想让分多台机器中数据保存一致,就必须保证所有节点上面的数据操作要么全部执行成功,要么全部不执行,比较常规的解决方法是引入“协调者”来统一调度所有节点执行。
X/Open组织(即现在的Open Group)定义了分布式事务处理模型。X/Open DTP模型(1994)包括应用程序(AP)、事务管理器(TM)、资源管理器(RM)、通信资源管理器(CRM)四部分。事务管理器(TM)是交易中间件,资源管理器(RM)是数据库,通信资源管理器(CRM)是消息中间件。通常把一个数据库内部的事务处理作为本地事务看待,而分布式事务处理的对象是全局事务。所谓全局事务,是指分布式事务处理环境中,多个数据库可能需要共同完成一个工作,这个工作即是一个全局事务,一个事务中可能更新几个不同的数据库,此时一个数据库对自己内部所做操作的提交不仅依赖本身操作是否成功,还要依赖与全局事务相关的其它数据库的操作是否成功,如果任一数据库的任一操作失败,则参与此事务的所有数据库所做的所有操作都必须回滚。XA就是X/Open DTP 定义的交易中间件与数据库之间的接口规范(即接口函数),交易中间件用它来通知数据库事务的开始、结束以及提交、回滚等,XA接口函数由数据库厂商提供,根据这一思想衍生出二阶提交协议和三阶提交协议。
所谓的两个阶段是指:准备阶段、提交阶段。
准备阶段:事务协调者(事务管理器)给每个参与者(资源管理器)发送准备消息,每个参与者要么直接返回失败(如权限验证失败),要么在本地执行事务,写本地的redo和undo日志但不提交,可以进一步将准备阶段分为以下三个步骤:
1)协调者节点向所有参与者节点询问是否可以执行提交操作(vote),并开始等待各参与者节点的响应。
2)参与者节点执行询问发起为止的所有事务操作,并将Undo信息和Redo信息写入日志。
3)各参与者节点响应协调者节点发起的询问。如果参与者节点的事务操作实际执行成功,则它返回一个”同意”消息;如果参与者节点的事务操作实际执行失败,则它返回一个”中止”消息。
提交阶段:如果协调者收到了参与者的失败消息或者超时,直接给每个参与者发送回滚(Rollback)消息,否则发送提交(Commit)消息,参与者根据协调者的指令执行提交或者回滚操作,释放所有事务处理过程中使用的锁资源。
二阶段提交所存在缺点的:
1)同步阻塞问题,执行过程中所有参与节点都是事务阻塞型的,当参与者占有公共资源时,其他第三方节点访问公共资源不得不处于阻塞状态。
2)单点故障,由于协调者的重要性一旦协调者发生故障参与者会一直阻塞下去。
3)数据不一致,在二阶段提交的阶段二中,当协调者向参与者发送commit请求之后,发生了局部网络异常或者在发送commit请求过程中协调者发生了故障,这回导致只有一部分参与者接受到了commit请求,而在这部分参与者接到commit请求之后就会执行commit操作,但是其他部分未接到commit请求的机器则无法执行事务提交,于是整个分布式系统便出现了数据部一致性的现象。
由于二阶段提交存在着诸如同步阻塞、单点问题、数据不一致、宕机等缺陷,所以,研究者们在二阶段提交的基础上做了改进,提出了三阶段提交。
三阶段提交(Three-phase commit),也叫三阶段提交协议(Three-phase commit protocol),是二阶段提交(2PC)的改进版本。3PC把2PC的准备阶段再次一分为二,这样三阶段提交就有CanCommit、PreCommit、DoCommit三个阶段。
CanCommit阶段:3PC的CanCommit阶段其实和2PC的准备阶段很像,协调者向参与者发送commit请求,参与者如果可以提交就返回Yes响应,否则返回No响应。
PreCommit阶段:协调者根据参与者的反应情况来决定是否可以记性事务的PreCommit操作。根据响应情况,有以下两种可能。
1)假如协调者从所有的参与者获得的反馈都是Yes响应,那么就会执行事务的预执行。
2)假如有任何一个参与者向协调者发送了No响应,或者等待超时之后,协调者都没有接到参与者的响应,那么就执行事务的中断。
DoCommit阶段:该阶段进行真正的事务提交,也可以分为以下两种情况:执行提交、中断事务。
执行提交:
1)发送提交请求 协调接收到参与者发送的ACK响应,那么他将从预提交状态进入到提交状态,并向所有参与者发送doCommit请求。
2)事务提交参与者接收到doCommit请求之后,执行正式的事务提交。并在完成事务提交之后释放所有事务资源。
3)响应反馈 事务提交完之后,向协调者发送Ack响应。
4)完成事务 协调者接收到所有参与者的ack响应之后,完成事务。
中断事务:
1)发送中断请求 协调者向所有参与者发送abort请求。
2)事务回滚 参与者接收到abort请求之后,利用其在阶段二记录的undo信息来执行事务的回滚操作,并在完成回滚之后释放所有的事务资源。
3)反馈结果 参与者完成事务回滚之后,向协调者发送ACK消息。
4)中断事务 协调者接收到参与者反馈的ACK消息之后,执行事务的中断。
Mycat可以通过用户会话Session中设置autocommit=false启动事务,通过设置ServerConnection中变量txInterrupted=true来控制是否事务异常需要回滚。在Mycat中的事务是一种二阶段提交事务方式,但是从实际应用场景出发这种出现故障的概率还是比较小的,因此这种实现方式可以满足很多应用需求。
相关类说明
MySQLConnection:数据库连接
NonBlockingSession: 用户连接Session
MultiNodeCoordinator:协调者
CommitNodeHandler:分片提交处理
RollbackNodeHandler:分片回滚处理
代码解析
XA事务启动
public class MySQLConnection extends BackendAIOConnection {
//设置开启事务
private void getAutocommitCommand(StringBuilder sb, boolean autoCommit) {
if (autoCommit) {
sb.append("SET autocommit=1;");
} else {
sb.append("SET autocommit=0;");
}
}
public void execute(RouteResultsetNode rrn, ServerConnection sc,
boolean autocommit) throws UnsupportedEncodingException {
if (!modifiedSQLExecuted && rrn.isModifySQL()) {
modifiedSQLExecuted = true;
}
//获取当前事务ID
String xaTXID = sc.getSession2().getXaTXID();
synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(),
autocommit);
}
private void synAndDoExecute(String xaTxID, RouteResultsetNode rrn,
int clientCharSetIndex, int clientTxIsoLation,
boolean clientAutoCommit) {
String xaCmd = null;
boolean conAutoComit = this.autocommit;
String conSchema = this.schema;
boolean expectAutocommit = !modifiedSQLExecuted || isFromSlaveDB()
|| clientAutoCommit;
//启动XA
if (expectAutocommit == false && xaTxID != null && xaStatus == 0) {
clientTxIsoLation = Isolations.SERIALIZABLE;
xaCmd = "XA START " + xaTxID + ';';
}
int schemaSyn = conSchema.equals(oldSchema) ? 0 : 1;
int charsetSyn = 0;
if (this.charsetIndex != clientCharSetIndex) {
setCharset(CharsetUtil.getCharset(clientCharSetIndex));
charsetSyn = 1;
}
int txIsoLationSyn = (txIsolation == clientTxIsoLation) ? 0 : 1;
int autoCommitSyn = (conAutoComit == expectAutocommit) ? 0 : 1;
int synCount = schemaSyn + charsetSyn + txIsoLationSyn + autoCommitSyn;
if (synCount == 0) {
sendQueryCmd(rrn.getStatement());
return;
}
CommandPacket schemaCmd = null;
StringBuilder sb = new StringBuilder();
if (schemaSyn == 1) {
schemaCmd = getChangeSchemaCommand(conSchema);
}
if (charsetSyn == 1) {
getCharsetCommand(sb, clientCharSetIndex);
}
if (txIsoLationSyn == 1) {
getTxIsolationCommand(sb, clientTxIsoLation);
}
if (autoCommitSyn == 1) {
getAutocommitCommand(sb, expectAutocommit);
}
if (xaCmd != null) {
sb.append(xaCmd);
}
metaDataSyned = false;
statusSync = new StatusSync(xaCmd != null, conSchema,
clientCharSetIndex, clientTxIsoLation, expectAutocommit,
synCount);
if (schemaCmd != null) {
schemaCmd.write(this);
}
sb.append(rrn.getStatement());
this.sendQueryCmd(sb.toString());
}
}
用户连接Session
public class NonBlockingSession implements Session {
//节点回滚处理
private volatile RollbackNodeHandler rollbackHandler;
//协调者
private final MultiNodeCoordinator multiNodeCoordinator;
//节点提交处理
private final CommitNodeHandler commitHandler;
@Override
public void execute(RouteResultset rrs, int type) {
clearHandlesResources();
// 检查路由结果是否为空
RouteResultsetNode[] nodes = rrs.getNodes();
if (nodes == null || nodes.length == 0 || nodes[0].getName() == null
|| nodes[0].getName().equals("")) {
source.writeErrMessage(ErrorCode.ER_NO_DB_ERROR,
"No dataNode found ,please check tables defined in schema:"
+ source.getSchema());
return;
}
//如果分片数量是单个时启动单节点处理
if (nodes.length == 1) {
singleNodeHandler = new SingleNodeHandler(rrs, this);
try {
singleNodeHandler.execute();
} catch (Exception e) {
LOGGER.warn(new StringBuilder().append(source).append(rrs), e);
source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
}
} else {
//如果分片数量是多个时启动多节点处理
boolean autocommit = source.isAutocommit();
SystemConfig sysConfig = MycatServer.getInstance().getConfig()
.getSystem();
int mutiNodeLimitType = sysConfig.getMutiNodeLimitType();
multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit,
this);
try {
multiNodeHandler.execute();
} catch (Exception e) {
LOGGER.warn(new StringBuilder().append(source).append(rrs), e);
source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
}
}
}
public void commit() {
final int initCount = target.size();
//判断分片数量
if (initCount <= 0) {
ByteBuffer buffer = source.allocate();
buffer = source.writeToBuffer(OkPacket.OK, buffer);
source.write(buffer);
return;
} else if (initCount == 1) {
//如果分片数量是单个时启动单节点事务commit处理
BackendConnection con = target.elements().nextElement();
commitHandler.commit(con);
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("multi node commit to send ,total " + initCount);
}
//如果分片数量是多个时启动协调者进行事务处理
multiNodeCoordinator.executeBatchNodeCmd(SQLCmdConstant.COMMIT_CMD);
}
}
//事务回滚
public void rollback() {
final int initCount = target.size();
if (initCount <= 0) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("no session bound connections found ,no need send rollback cmd ");
}
ByteBuffer buffer = source.allocate();
buffer = source.writeToBuffer(OkPacket.OK, buffer);
source.write(buffer);
return;
}
rollbackHandler = new RollbackNodeHandler(this);
rollbackHandler.rollback();
}
}
协调者
public class MultiNodeCoordinator implements ResponseHandler {
public void executeBatchNodeCmd(SQLCtrlCommand cmdHandler) {
this.cmdHandler = cmdHandler;
final int initCount = session.getTargetCount();
runningCount.set(initCount);
nodeCount = initCount;
failed.set(false);
faileCount.set(0);
// 执行
int started = 0;
for (RouteResultsetNode rrn : session.getTargetKeys()) {
if (rrn == null) {
LOGGER.error("null is contained in RoutResultsetNodes, source = "
+ session.getSource());
continue;
}
final BackendConnection conn = session.getTarget(rrn);
if (conn != null) {
conn.setResponseHandler(this);
cmdHandler.sendCommand(session, conn);
++started;
}
}
//判断可以连接的分片数量是否跟配置分片数量相等,如果相等所有分片进入commit阶段否则rollback
if (started < nodeCount) {
runningCount.set(started);
LOGGER.warn("some connection failed to execut "
+ (nodeCount - started));
/**
* assumption: only caused by front-end connection close. <br/>
* Otherwise, packet must be returned to front-end
*/
failed.set(true);
}
}
}
分片事务提交处理
public class CommitNodeHandler implements ResponseHandler {
//结束XA
public void commit(BackendConnection conn) {
conn.setResponseHandler(CommitNodeHandler.this);
if(conn instanceof MySQLConnection)
{
MySQLConnection mysqlCon = (MySQLConnection) conn;
if (mysqlCon.getXaStatus() == 1)
{
String xaTxId = session.getXaTXID();
String[] cmds = new String[]{"XA END " + xaTxId,
"XA PREPARE " + xaTxId};
mysqlCon.execBatchCmd(cmds);
} else
{
conn.commit();
}
}else
{
conn.commit();
}
}
//提交XA
@Override
public void okResponse(byte[] ok, BackendConnection conn) {
if(conn instanceof MySQLConnection)
{
MySQLConnection mysqlCon = (MySQLConnection) conn;
switch (mysqlCon.getXaStatus())
{
case 1:
if (mysqlCon.batchCmdFinished())
{
String xaTxId = session.getXaTXID();
mysqlCon.execCmd("XA COMMIT " + xaTxId);
mysqlCon.setXaStatus(2);
}
return;
case 2:
{
mysqlCon.setXaStatus(0);
break;
}
}
}
session.clearResources(false);
ServerConnection source = session.getSource();
source.write(ok);
}
}
分片事务回滚处理
public class RollbackNodeHandler extends MultiNodeHandler {
public void rollback() {
final int initCount = session.getTargetCount();
lock.lock();
try {
reset(initCount);
} finally {
lock.unlock();
}
if (session.closed()) {
decrementCountToZero();
return;
}
// 执行
int started = 0;
for (final RouteResultsetNode node : session.getTargetKeys()) {
if (node == null) {
try {
LOGGER.error("null is contained in RoutResultsetNodes, source = "
+ session.getSource());
} catch (Exception e) {
}
continue;
}
final BackendConnection conn = session.getTarget(node);
if (conn != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("rollback job run for " + conn);
}
if (clearIfSessionClosed(session)) {
return;
}
//分片事务回滚
conn.setResponseHandler(RollbackNodeHandler.this);
conn.rollback();
++started;
}
}
if (started < initCount && decrementCountBy(initCount - started)) {
session.clearResources(true);
}
}
}