transaction layer需要保证事务的原子性,隔离性(并发下还能正确执行)
采用Precolator Protocol来保证事务的原子性,并采用全局时间戳对并发事务进行排序。给客户端提供Snapshot Isolation(快照隔离)或 Repeatable Read(可重复读)可以为客户端提供强隔离级别。tinyKV和tinysql中都实现了precolator 协议,tinyschedule会给事务加上全局单调递增时间戳。在本lab中,需要事先tinykv的peocolator。
// Reader 调用Badge数据库的raw接口,通过badge传入当前事务txn,进行遍历
func (s *StandAloneStorage) Reader(ctx *kvrpcpb.Context) (storage.StorageReader, error) {
txn := s.db.NewTransaction(false)
return NewBadgerReader(txn), nil
}
// Write 将storage.Modify内容通过列族cf中的keys进行修改
func (s *StandAloneStorage) Write(ctx *kvrpcpb.Context, batch []storage.Modify) error {
// Try to check the definition of `storage.Modify` and txn interface of `badger`.
// As the column family is not supported by `badger`, a wrapper is used to simulate it.
txn := s.db.NewTransaction(true)
for _, m := range batch {
// Modify中支持两种操作,put和delete
switch m.Data.(type) {
case storage.Put:
put := m.Data.(storage.Put)
// set(key, vals) 事务txn将键值对放入storage,在put的列族中进行put
if err := txn.Set(engine_util.KeyWithCF(put.Cf, put.Key), put.Value); err != nil {
return err
}
case storage.Delete:
del := m.Data.(storage.Delete)
// 在del的列族中进行删除
if err := txn.Delete(engine_util.KeyWithCF(del.Cf, del.Key)); err != nil {
return err
}
}
err := txn.Commit()
if err != nil {
return err
}
}
return nil
}
事务执行流如下图
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7vFLAL8p-1651728923364)(https://tva1.sinaimg.cn/large/e6c9d24ely1h1rkthfnb5j21jn0u0alb.jpg)]
事务执行流如下图
percolator procotol : 用户将查询请求发送给tinysql服务器,在请求被解析和执行后,用户的数据就会被从查询语句(原本是rows)转换成键值对,之后Transaction Module(事务模块)就会把键值对存到storage engine
(存储引擎)中。由于不同的键值对可能会分散到不同region中,也就代表会分散在不同的tinykv server中。storage engine必须保证整个事务提交过程,一次性成功,或者什么也不做(all or nothing)。
当一个事务可能有多个key时,首先选取第一个要提交的key作为主键,那么事务的提交状态便由主键的提交状态决定。换言之,只有主键提交成功了,事务才提交成功。提交过程分为两阶段,① Prewrite
, 预写,②commit
, 提交。
在kv对准备好之后,事务进入Prewrite
阶段。在这个阶段所有的key都会被预写在不同的tinykv
服务器中,这些服务器为不同的region,每个region有自己的leader。每个Prewrite
请求将由tinykv
服务器处理,Prewrite Lock
(预写锁)并将被放入lock column family
存储引擎中的每个键中。任何一个预写过程失败,提交过程将立即失败,然后将清除剩余的所有预写锁。
// prewriteMutation prewrites mut to txn.
// it returns (nil, nil) on success,
// it returns (err, nil) if the key in mut is already locked or there is any other key error,
// it (nil, err) if an internal error occurs.
// 预写请求p传入一个mvcc事务和变更mut
func (p *Prewrite) prewriteMutation(txn *mvcc.MvccTxn, mut *kvrpcpb.Mutation) (*kvrpcpb.KeyError, error) {
key := mut.Key
log.Debug("prewrite key", zap.Uint64("start_ts", txn.StartTS),
zap.String("key", hex.EncodeToString(key)))
// Check for write conflicts.
// Hint: Check the interafaces provided by `mvcc.MvccTxn`. The error type `kvrpcpb.WriteConflict` is used
// denote to write conflict error, try to set error information properly in the `kvrpcpb.KeyError`
// response.
//panic("prewrite Mutation is not implemented yet")
//=========================================检查写写冲突=====================================================
// 检查txn最近一次写的commit_ts, 和txn的start_ts之间关系
/* 正常情况,tnx是最新的预写
-----------------------------------------------时间线-------------------------------------------------
[-----最近写事务 tnx_recent-------]←commit_ts
start_ts →[--------------当前事务tnx-----------]
异常情况,txn在本key的上个事务还没有提交
-----------------------------------------------时间线-------------------------------------------------
[-----最近写事务 tnx_recent-------] ←commit_ts
start_ts →[--------------当前事务tnx-----------]
*/
_, commitTs, err := txn.MostRecentWrite(key)
if err != nil {
return nil, err
}
if commitTs >= txn.StartTS {
return &kvrpcpb.KeyError{
Conflict: &kvrpcpb.WriteConflict{
StartTs: txn.StartTS,
ConflictTs: commitTs,
Key: key,
Primary: p.request.PrimaryLock,
},
}, nil
}
//=============================================检查锁======================================================
// Check if key is locked. Report key is locked error if lock does exist, note the key could be locked
// by this transaction already and the current prewrite request is stale.
// 1. 获取锁
lock, err := txn.GetLock(key)
if err != nil {
return nil, err
}
// 2. 当前key有锁
if lock != nil {
// 是自己的预写锁,直接返回
if lock.Kind == mvcc.WriteKindFromProto(mut.Op) && lock.Ts == txn.StartTS {
return nil, nil
}
return &kvrpcpb.KeyError{Locked: lock.Info(key)}, nil
}
// Write a lock and value.
// Hint: Check the interfaces provided by `mvccTxn.Txn`.
//panic("lock record generation is not implemented yet")
//=========================================加锁 并预写=====================================================
txn.PutLock(key, &mvcc.Lock{
Primary: p.request.PrimaryLock, // pk的锁状态
Ts: txn.StartTS,
Ttl: p.request.LockTtl,
Kind: mvcc.WriteKindFromProto(mut.Op),
})
switch mut.Op {
case kvrpcpb.Op_Put:
txn.PutValue(key, mut.Value)
case kvrpcpb.Op_Del:
txn.DeleteValue(key)
}
return nil, nil
}
如果所有的预写处理都成功,则事务进入Commit
阶段。在这个阶段Primary Key
将首先提交。Commit
操作过程中,会把Write Record
(写记录)放入存储引擎的write column family
中,并且把Prewrite Lock
都解锁。如果主键提交成功,则认为事务已提交,并将成功响应返回客户端。我们把剩余的键叫做Secondary Keys
,这些键将在后台任务中异步提交。
在公共路径中,这两个阶段就足够了,但不做崩溃恢复。在分布式环境中,故障可能发生在任何地方,例如tinysql
服务器可能在两阶段提交
之前发生故障,在新的region leader被选举出来后,如何恢复未完成的事务以保证正确性呢?
// commitKey 对keys进行提交,并返回提交信息
func commitKey(key []byte, commitTs uint64, txn *mvcc.MvccTxn, response interface{}) (interface{}, error) {
// txn询问当前key上是否有锁
lock, err := txn.GetLock(key)
if err != nil {
return nil, err
}
// If there is no correspond lock for this transaction.
log.Debug("commitKey", zap.Uint64("startTS", txn.StartTS),
zap.Uint64("commitTs", commitTs),
zap.String("key", hex.EncodeToString(key)))
// 正常情况下,key上应该有txn的预写锁,有如下两种异常情况
// 1. 如果没有锁,就看看txn是commit/rollback?
// 2. 如果有锁,但不是自己的
if lock == nil || lock.Ts != txn.StartTS {
// 发现当前key没有预写锁,或者不是本事务的锁,就需要检查这个key的commit记录或者rollback记录,
// 1. 如果没有任何锁记录,返回没有找到锁
// 2. 发现txn回滚,返回锁已经回滚了
// 3. txn已经提交了
write, _, err := txn.CurrentWrite(key)
if err != nil {
return nil, err
}
if write == nil { // 1. 没有找到预写锁,预写失败?
respValue := reflect.ValueOf(response)
keyError := &kvrpcpb.KeyError{Retryable: fmt.Sprintf("lock not found for key %v", key)}
reflect.Indirect(respValue).FieldByName("Error").Set(reflect.ValueOf(keyError))
} else if write.Kind == mvcc.WriteKindRollback { // 2. txn回滚了
respValue := reflect.ValueOf(response)
keyError := &kvrpcpb.KeyError{Retryable: fmt.Sprintf("roll back for key %v", key)}
reflect.Indirect(respValue).FieldByName("Error").Set(reflect.ValueOf(keyError))
}
// 3. txn已经提交了
return response, nil
}
//====================到了这里,代表本txn预写锁还在key上,需要对该key进行提交了=========================================
// Commit a Write object to the DB
write := mvcc.Write{StartTS: txn.StartTS, Kind: lock.Kind}
txn.PutWrite(key, commitTs, &write)
// Unlock the key
txn.DeleteLock(key)
return nil, nil
}
一旦确定事务失败,就应该清理它剩余的锁,并将回滚记录放入存储引擎,以防止将来可能的预写或提交(考虑网络延迟等)。所以如果回滚记录置于这个事务的主键上,那么事务状态为Rolled Back
, 代表事务不会提交,必须失败。
// rollbackKey 对key上的事务和锁进行回滚
func rollbackKey(key []byte, txn *mvcc.MvccTxn, response interface{}) (interface{}, error) {
lock, err := txn.GetLock(key)
if err != nil {
return nil, err
}
log.Info("rollbackKey",
zap.Uint64("startTS", txn.StartTS),
zap.String("key", hex.EncodeToString(key)))
if lock == nil || lock.Ts != txn.StartTS {
// There is no lock, check the write status.
existingWrite, ts, err := txn.CurrentWrite(key)
if err != nil {
return nil, err
}
// Try to insert a rollback record if there's no correspond records, use `mvcc.WriteKindRollback` to represent
// the type. Also the command could be stale that the record is already rolled back or committed.
// If there is no write either, presumably the prewrite was lost. We insert a rollback write anyway.
// if the key has already been rolled back, so nothing to do.
// If the key has already been committed. This should not happen since the client should never send both
// commit and rollback requests.
// There is no write either, presumably the prewrite was lost. We insert a rollback write anyway.
if existingWrite == nil {
// 记录回滚的写请求
write := mvcc.Write{
StartTS: txn.StartTS,
Kind: mvcc.WriteKindRollback,
}
// PutWrite 记录txn的写请求
txn.PutWrite(key, txn.StartTS, &write)
return nil, nil
} else {
if existingWrite.Kind == mvcc.WriteKindRollback {
// The key has already been rolled back, so nothing to do.
return nil, nil
}
// The key has already been committed. This should not happen since the client should never send both
// commit and rollback requests.
err := new(kvrpcpb.KeyError)
err.Abort = fmt.Sprintf("key has already been committed: %v at %d", key, ts)
respValue := reflect.ValueOf(response)
reflect.Indirect(respValue).FieldByName("Error").Set(reflect.ValueOf(err))
return response, nil
}
}
if lock.Kind == mvcc.WriteKindPut {
txn.DeleteValue(key)
}
write := mvcc.Write{StartTS: txn.StartTS, Kind: mvcc.WriteKindRollback}
txn.PutWrite(key, txn.StartTS, &write)
txn.DeleteLock(key)
return nil, nil
}
事务的最终状态仅由Primary Key
的状态或Primary Lock
的状态决定。因此,如果某些事务状态无法确定,则需要检查其Primary Key
的状态或Primary Lock
的状态。
如果主键有提交或回滚记录,可以确定事务已经提交或回滚。
如果primary lock
仍然存在,且尚未过期,则事务可能处于提交阶段。
如果没有锁记录和提交、回滚记录,则事务状态不确定,我们可以选择等待或者写回滚记录来阻止后面的提交,然后决定回滚。
两阶段提交中的Prewrite Lock
都会有一个Time To Live
字段,
/*
如果主键有提交或回滚记录,可以确定事务已经提交或回滚。
如果`primary lock`仍然存在,且尚未过期,则事务可能处于提交阶段。
如果没有锁记录和提交、回滚记录,则事务状态不确定,我们可以选择等待或者写回滚记录来阻止后面的提交,然后决定回滚。
两阶段提交中的`Prewrite Lock`都会有一个`Time To Live`字段,
*/
func (c *CheckTxnStatus) PrepareWrites(txn *mvcc.MvccTxn) (interface{}, error) {
key := c.request.PrimaryKey
response := new(kvrpcpb.CheckTxnStatusResponse)
lock, err := txn.GetLock(key)
if err != nil {
return nil, err
}
//panic("CheckTxnStatus is not implemented yet")
if lock != nil && lock.Ts == txn.StartTS {
// key上是本事务自己的锁
if physical(lock.Ts)+lock.Ttl < physical(c.request.CurrentTs) {
// 发起锁状态检查时,锁已经过期了,需要对事务进行回滚
// Lock has expired, try to rollback it. `mvcc.WriteKindRollback` could be used to
// represent the type. Try using the interfaces provided by `mvcc.MvccTxn`.
// rollback包括: 记录下回滚请求,放到txn中,删除key上的锁,如果是put,则需要删除kv
_, err = rollbackKey(key, txn, response)
if err != nil {
return nil, err
}
log.Info("checkTxnStatus rollback the primary lock as it's expired",
zap.Uint64("lock.TS", lock.Ts),
zap.Uint64("physical(lock.TS)", physical(lock.Ts)),
zap.Uint64("txn.StartTS", txn.StartTS),
zap.Uint64("currentTS", c.request.CurrentTs),
zap.Uint64("physical(currentTS)", physical(c.request.CurrentTs)))
} else {
// Lock has not expired, leave it alone.
// 不是自己锁,但还没有过期
response.Action = kvrpcpb.Action_NoAction
response.LockTtl = lock.Ttl
}
return response, nil
}
existingWrite, commitTs, err := txn.CurrentWrite(key)
if err != nil {
return nil, err
}
if existingWrite == nil {
// The lock never existed, it's still needed to put a rollback record on it so that
// the stale transaction commands such as prewrite on the key will fail.
// Note try to set correct `response.Action`,
// the action types could be found in kvrpcpb.Action_xxx.
// 发现锁不存在,代表事务已经提交,锁被删掉,或者事务回滚,锁也回滚了,所以整个事务就fail了。
write := mvcc.Write{
StartTS: txn.StartTS,
Kind: mvcc.WriteKindRollback,
}
txn.PutWrite(key, txn.StartTS, &write)
response.Action = kvrpcpb.Action_LockNotExistRollback
return response, nil
}
if existingWrite.Kind == mvcc.WriteKindRollback {
// The key has already been rolled back, so nothing to do.
response.Action = kvrpcpb.Action_NoAction
return response, nil
}
// The key has already been committed.
response.CommitVersion = commitTs
response.Action = kvrpcpb.Action_NoAction
return response, nil
}
不同的事务coordinatiors可能位于不同的tinysql服务器中,读写请求可能会相互冲突。如果一个事务txn1
在 k1
上加了一个预写锁,,另一个读事务txn2
尝试读k1
,txn2
将如何处理这种情况呢?
由于锁记录造成的阻塞,读写请求无法继续,有以下几种可能:
在tinysql/tinykv
集群中,这些冲突处理叫做Resolve
。一旦请求被另一个事务的预写锁阻塞,resolve过程将会确定锁是处于提交状态还是回滚状态,以便请求可以继续。这些Resolve
操作可以隐式地进行事务恢复。假设coordinator刚刚对一个key加上预写锁,tinysql
服务器就崩溃了,来自其他tinysql
服务器的并发事务将会帮助处理剩下的预写锁。
func (rl *ResolveLock) PrepareWrites(txn *mvcc.MvccTxn) (interface{}, error) {
// A map from start timestamps to commit timestamps which tells us whether a transaction (identified by start ts)
// has been committed (and if so, then its commit ts) or rolled back (in which case the commit ts is 0).
commitTs := rl.request.CommitVersion
response := new(kvrpcpb.ResolveLockResponse)
log.Info("There keys to resolve",
zap.Uint64("lockTS", txn.StartTS),
zap.Int("number", len(rl.keyLocks)),
zap.Uint64("commit_ts", commitTs))
//panic("ResolveLock is not implemented yet")
for _, kl := range rl.keyLocks {
// Try to commit the key if the transaction is committed already, or try to rollback the key if it's not.
// The `commitKey` and `rollbackKey` functions could be useful.
if commitTs == 0 {
// 发现pk还没提交,把这个key回滚
_, err := rollbackKey(kl.Key, txn, response)
if err != nil {
return nil, err
}
} else {
// pk提交了,把这个key和pk对齐,也提交
_, err := commitKey(kl.Key, commitTs, txn, response)
if err != nil {
return nil, err
}
}
log.Debug("resolve key", zap.String("key", hex.EncodeToString(kl.Key)))
}
return response, nil
}
func (g *Get) Read(txn *mvcc.RoTxn) (interface{}, [][]byte, error) {
key := g.request.Key
log.Debug("read key", zap.Uint64("start_ts", txn.StartTS),
zap.String("key", hex.EncodeToString(key)))
response := new(kvrpcpb.GetResponse)
// Check for locks and their visibilities.
// Hint: Check the interfaces provided by `mvcc.RoTxn`.
lock, err := txn.GetLock(key)
if err != nil {
return nil, nil, err
}
if lock != nil && lock.IsLockedFor(key, g.startTs, response) {
// get请求来的时候,还没有commit,预写锁都还在,直接返回,拿不到东西
return response, nil, nil
}
value, err := txn.GetValue(key)
if err != nil {
return nil, nil, err
}
if value == nil {
response.NotFound = true
}
response.Value = value
return response, nil, nil
}
在 TiDB 中,事务的执行过程会被缓存在 buffer 中,在提交时,才会通过 Percolator 提交协议将其完整地写入到分布的 TiKV 存储引擎中。这一调用的入口是 store/tikv/txn.go 中的 tikvTxn.Commit 函数。
在执行时,一个事务可能会遇到其他执行过程中的事务,此时需要通过 Lock Resolve 组件来查询所遇到的事务状态,并根据查询到的结果执行相应的措施。
Percolator 提交协议的两阶段提交分为 Prewrite 和 Commit,其中 Prewrite 实际写入数据,Commit 让数据对外可见。其中,事务的成功以 Primary Key 为原子性标记,当 Prewrite 失败或是 Primary Key Commit 失败时需要进行垃圾清理,将写入的事务回滚。
一个事务中的 Key 可能会设计到不同的 Region,在对 Key 进行写操作时,需要将其发送到正确的 Region 上才能够处理,GroupKeysByRegion 函数根据 region cache 将 Key 按 Region 分成多个 batch,但是可能出现因缓存过期而导致对应的存储节点返回 Region Error,此时需要分割 batch 后重试。
为了让对于 Key 的操作能够执行,需要实现 region_cache.go 中的 GroupKeysByRegion 函数。
在执行过程中,会涉及到三类操作,分别是 Prewrite/Commit/Rollback(Cleanup)。这些操作会在同一个流程中被处理。你需要完成 Prewrite 过程中的 buildPrewriteRequest 函数,然后完成 Prewrite 的 handleSingleBatch 函数完成 Commit 和 Rollback 的 handleSingleBatch 函数。
// GroupKeysByRegion 根据真实keys所在region划分txn想要修改的keys,
// 并返回第一个key(把他当做pk)所在的region,pk要比其他key更早commit
// RegionCache缓存了真实的key所在的region
// Help function `RegionCache.LocateKey` 返回key所在的region,这些都缓存在RegionCache里面
func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte, filter func(key, regionStartKey []byte) bool) (map[RegionVerID][][]byte, RegionVerID, error) {
regions := make(map[RegionVerID][][]byte)
var pkRegion RegionVerID
for idx, key := range keys {
region, err := c.LocateKey(bo, key)
if err != nil {
return nil, RegionVerID{}, err
}
// 过滤掉不想要的key
if filter != nil && !filter(key, region.StartKey) {
continue
}
// 确定pk所在region
if idx == 0 {
pkRegion = region.Region
}
// 保存所有keys分组region信息
regions[region.Region] = append(regions[region.Region], key)
}
return regions, pkRegion, nil
}
批处理prewrite、commit、roll back请求
// handleSingleBatch 用于批处理prewrite请求
func (actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
// 获取批处理请求
req := c.buildPrewriteRequest(batch)
for {
// 发送请求给对应region,记得加超时
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
// 如果该region宕机了
if regionErr != nil {
// The region info is read from region cache,
// so the cache miss cases should be considered
// You need to handle region errors here
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
// re-split keys and prewrite again.
err = c.prewriteKeys(bo, batch.keys)
return errors.Trace(err)
}
// region没有宕机
if resp.Resp == nil {
return errors.Trace(ErrBodyMissing)
}
prewriteResp := resp.Resp.(*pb.PrewriteResponse)
// 处理keys没有问题直接返回
keyErrs := prewriteResp.GetErrors()
if len(keyErrs) == 0 {
return nil
}
var locks []*Lock
// 如果keys有问题则还需要resovle
for _, keyErr := range keyErrs {
// Extract lock from key error
lock, err1 := extractLockFromKeyErr(keyErr)
if err1 != nil {
return errors.Trace(err1)
}
logutil.BgLogger().Debug("prewrite encounters lock",
zap.Uint64("conn", c.connID),
zap.Stringer("lock", lock))
locks = append(locks, lock)
}
// While prewriting, if there are some overlapped locks left by other transactions,
// TiKV will return key errors. The statuses of these transactions are unclear.
// ResolveLocks will check the transactions' statuses by locks and resolve them.
// Set callerStartTS to 0 so as not to update minCommitTS.
msBeforeExpired, _, err := c.store.lockResolver.ResolveLocks(bo, 0, locks)
if err != nil {
return errors.Trace(err)
}
if msBeforeExpired > 0 {
err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if err != nil {
return errors.Trace(err)
}
}
}
}
// handleSingleBatch 用于批处理commit请求
func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
// follow actionPrewrite.handleSingleBatch, build the commit request
var resp *tikvrpc.Response
var err error
sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
//通过tinyKVRPC包中新建请求,构造commit请求,需要keys, start_ts和commit_ts
req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &pb.CommitRequest{
StartVersion: c.startTS,
Keys: batch.keys,
CommitVersion: c.commitTS,
})
// 利用sender发送请求, 得到响应体
resp, err = sender.SendReq(bo, req, batch.region, readTimeoutShort)
logutil.BgLogger().Debug("actionCommit handleSingleBatch", zap.Bool("nil response", resp == nil))
/** 如果在commit pk,但没有收到回应,则不确定tnx是否成功commit
这种情况,不能直接说commit失败了(这样会导致数据丢失)
也不能直接抛出一个err(tnx可能被重启)
最佳的解决方案是输入err,让上层去断开相应的mysql客户端连接
*/
isPrimary := bytes.Equal(batch.keys[0], c.primary())
if isPrimary && sender.rpcError != nil {
c.setUndeterminedErr(errors.Trace(sender.rpcError))
return nil
}
if _, ok := failpoint.Eval(("mockFailAfterPK")); ok {
if !isPrimary {
err = errors.New("commit secondary keys error")
}
}
if err != nil {
return errors.Trace(err)
}
// handle the response and error refer to actionPrewrite.handleSingleBatch
regionError, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionError != nil {
err = bo.Backoff(BoRegionMiss, errors.New(regionError.String()))
if err != nil {
return errors.Trace(err)
}
// re-split keys and prewrite again.
err = c.commitKeys(bo, batch.keys)
return errors.Trace(err)
}
if resp.Resp == nil {
return errors.Trace(ErrBodyMissing)
}
commitResp := resp.Resp.(*pb.CommitResponse)
keyErr := commitResp.GetError()
if keyErr != nil {
err := extractKeyErr(keyErr)
if err != nil {
return errors.Trace(err)
}
}
c.mu.Lock()
defer c.mu.Unlock()
// Group that contains primary key is always the first.
// We mark transaction's status committed when we receive the first success response.
c.mu.committed = true
return nil
}
// 批处理回滚请求
func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
// follow actionPrewrite.handleSingleBatch, build the rollback request
// build and send the rollback request
req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &pb.BatchRollbackRequest{
StartVersion: c.startTS,
Keys: batch.keys,
})
sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort)
if err != nil {
return err
}
logutil.BgLogger().Debug("actionRollback handleSingleBatch", zap.Bool("nil response", resp == nil))
// handle the response and error refer to actionPrewrite.handleSingleBatch
regionError, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionError != nil {
err = bo.Backoff(BoRegionMiss, errors.New(regionError.String()))
if err != nil {
return errors.Trace(err)
}
// re-split keys and rollback again.
err = c.cleanupKeys(bo, batch.keys)
return errors.Trace(err)
}
rollbackResp := resp.Resp.(*pb.BatchRollbackResponse)
keyErr := rollbackResp.GetError()
if err != nil {
err := extractKeyErr(keyErr)
if err != nil {
return err
}
}
// 回滚成功
c.mu.Lock()
defer c.mu.Unlock()
// Group that contains primary key is always the first.
// We mark transaction's status committed when we receive the first success response.
c.mu.committed = true
return nil
}
在 Prewrite 阶段,对于一个 Key 的操作会写入两条记录。
// getTxnStatus sends the CheckTxnStatus request to the TiKV server.
// When rollbackIfNotExist is false, the caller should be careful with the txnNotFoundErr error.
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (TxnStatus, error) {
if s, ok := lr.getResolved(txnID); ok {
return s, nil
}
// CheckTxnStatus may meet the following cases:
// 1. LOCK 锁过期了 或者 在ttl内
// 1.1 Lock expired -- orphan lock, fail to update TTL, crash recovery etc.
// 1.2 Lock TTL -- active transaction holding the lock.
// 2. NO LOCK 已经commit或者回滚或者还没预写
// 2.1 Txn Committed
// 2.2 Txn Rollbacked -- rollback itself, rollback by others, GC tomb etc.
// 2.3 No lock -- concurrence prewrite.
var status TxnStatus
var req *tikvrpc.Request
// new一个检查状态请求
req = tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{
PrimaryKey: primary,
LockTs: txnID,
CurrentTs: currentTS,
})
for {
// 确定pk所在region
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
if err != nil {
return status, errors.Trace(err)
}
// 给pk所在region发送checkTnxStatus
resp, err := lr.store.SendReq(bo, req, loc.Region, readTimeoutShort)
if err != nil {
return status, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return status, errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return status, errors.Trace(err)
}
continue
}
if resp.Resp == nil {
return status, errors.Trace(ErrBodyMissing)
}
// 获取到pk所在region的状态了
cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse)
logutil.BgLogger().Debug("cmdResp", zap.Bool("nil", cmdResp == nil))
// Assign status with response
// pk所在region正常
status = TxnStatus{
ttl: cmdResp.LockTtl,
commitTS: cmdResp.CommitVersion,
action: cmdResp.Action,
}
return status, nil
}
}
冲突处理的实现
// resolveLock 根据pk返回的状态去处理secondary key
func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cleanRegions map[RegionVerID]struct{}) error {
cleanWholeRegion := l.TxnSize >= bigTxnThreshold
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key)
if err != nil {
return errors.Trace(err)
}
if _, ok := cleanRegions[loc.Region]; ok {
return nil
}
var req *tikvrpc.Request
req = tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{
StartVersion: l.TxnID,
CommitVersion: status.commitTS, // pk的commit_ts 以此判断pk是否commit
})
// 发送给每个secondary key的region去resolve lock
resp, err := lr.store.SendReq(bo, req, loc.Region, readTimeoutShort)
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
continue
}
if resp.Resp == nil {
return errors.Trace(ErrBodyMissing)
}
cmdResp := resp.Resp.(*kvrpcpb.ResolveLockResponse)
if keyErr := cmdResp.GetError(); keyErr != nil {
err = errors.Errorf("unexpected resolve err: %s, lock: %v", keyErr, l)
logutil.BgLogger().Error("resolveLock error", zap.Error(err))
return err
}
if cleanWholeRegion {
cleanRegions[loc.Region] = struct{}{}
}
return nil
}
}
除了在事务的提交过程中,事务对数据进行读取的时候也可能遇到 Lock,此时也会触发 ResolveLocks 函数,通过 snapshot.go 中的 tikvSnapshot.get 函数,让读请求能够正常运行。
func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
// Check the cached values first.
if s.cached != nil {
if value, ok := s.cached[string(k)]; ok {
return value, nil
}
}
failpoint.Inject("snapshot-get-cache-fail", func(_ failpoint.Value) {
if bo.ctx.Value("TestSnapshotCache") != nil {
panic("cache miss")
}
})
cli := clientHelper{
LockResolver: s.store.lockResolver,
RegionCache: s.store.regionCache,
minCommitTSPushed: &s.minCommitTSPushed,
Client: s.store.client,
}
req := tikvrpc.NewRequest(tikvrpc.CmdGet,
&pb.GetRequest{
Key: k,
Version: s.version.Ver,
}, pb.Context{})
for {
loc, err := s.store.regionCache.LocateKey(bo, k)
if err != nil {
return nil, errors.Trace(err)
}
resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, readTimeoutShort, "")
if err != nil {
return nil, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return nil, errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return nil, errors.Trace(err)
}
continue
}
if resp.Resp == nil {
return nil, errors.Trace(ErrBodyMissing)
}
cmdGetResp := resp.Resp.(*pb.GetResponse)
val := cmdGetResp.GetValue()
if keyErr := cmdGetResp.GetError(); keyErr != nil {
// If the key error is a lock, there are 2 possible cases:
// 1. The transaction is during commit, wait for a while and retry.
// 2. The transaction is dead with some locks left, resolve it.
// 从keyErr中获取锁的状态
lockFromKeyErr, err := extractLockFromKeyErr(keyErr)
if err != nil {
return nil, err
}
//
msBeforeExpired, _, err := s.store.lockResolver.ResolveLocks(bo, 0, []*Lock{lockFromKeyErr})
if err != nil {
return nil, err
}
if msBeforeExpired > 0 {
err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC get lockedKeys: %d", 1))
if err != nil {
return nil, errors.Trace(err)
}
}
continue
}
return val, nil
}
}