当前位置: 首页 > 工具软件 > Percolator > 使用案例 >

talentKV分布式事务训练营 2PC +Percolator

姬欣怡
2023-12-01

1. 设计

transaction layer需要保证事务的原子性,隔离性(并发下还能正确执行)

采用Precolator Protocol来保证事务的原子性,并采用全局时间戳对并发事务进行排序。给客户端提供Snapshot Isolation(快照隔离)或 Repeatable Read(可重复读)可以为客户端提供强隔离级别。tinyKV和tinysql中都实现了precolator 协议,tinyschedule会给事务加上全局单调递增时间戳。在本lab中,需要事先tinykv的peocolator。

1.1 实现StandAloneStorage的核心接口

// Reader 调用Badge数据库的raw接口,通过badge传入当前事务txn,进行遍历
func (s *StandAloneStorage) Reader(ctx *kvrpcpb.Context) (storage.StorageReader, error) {
	txn := s.db.NewTransaction(false)
	return NewBadgerReader(txn), nil
}
// Write 将storage.Modify内容通过列族cf中的keys进行修改
func (s *StandAloneStorage) Write(ctx *kvrpcpb.Context, batch []storage.Modify) error {

	// Try to check the definition of `storage.Modify` and txn interface of `badger`.
	// As the column family is not supported by `badger`, a wrapper is used to simulate it.
	txn := s.db.NewTransaction(true)
	for _, m := range batch {
		// Modify中支持两种操作,put和delete
		switch m.Data.(type) {
		case storage.Put:
			put := m.Data.(storage.Put)
			// set(key, vals) 事务txn将键值对放入storage,在put的列族中进行put
			if err := txn.Set(engine_util.KeyWithCF(put.Cf, put.Key), put.Value); err != nil {
				return err
			}
		case storage.Delete:
			del := m.Data.(storage.Delete)
			// 在del的列族中进行删除
			if err := txn.Delete(engine_util.KeyWithCF(del.Cf, del.Key)); err != nil {
				return err
			}
		}

		err := txn.Commit()
		if err != nil {
			return err
		}
	}

	return nil
}

2. 实现TinyKV中的Percolator

事务执行流如下图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7vFLAL8p-1651728923364)(https://tva1.sinaimg.cn/large/e6c9d24ely1h1rkthfnb5j21jn0u0alb.jpg)]

事务执行流如下图

percolator procotol : 用户将查询请求发送给tinysql服务器,在请求被解析和执行后,用户的数据就会被从查询语句(原本是rows)转换成键值对,之后Transaction Module(事务模块)就会把键值对存到storage engine(存储引擎)中。由于不同的键值对可能会分散到不同region中,也就代表会分散在不同的tinykv server中。storage engine必须保证整个事务提交过程,一次性成功,或者什么也不做(all or nothing)。

当一个事务可能有多个key时,首先选取第一个要提交的key作为主键,那么事务的提交状态便由主键的提交状态决定。换言之,只有主键提交成功了,事务才提交成功。提交过程分为两阶段,① Prewrite, 预写,②commit, 提交。

2.1 预写阶段

在kv对准备好之后,事务进入Prewrite阶段。在这个阶段所有的key都会被预写在不同的tinykv服务器中,这些服务器为不同的region,每个region有自己的leader。每个Prewrite请求将由tinykv服务器处理,Prewrite Lock(预写锁)并将被放入lock column family存储引擎中的每个键中。任何一个预写过程失败,提交过程将立即失败,然后将清除剩余的所有预写锁。

// prewriteMutation prewrites mut to txn.
// it returns (nil, nil) on success,
// it returns (err, nil) if the key in mut is already locked or there is any other key error,
// it (nil, err) if an internal error occurs.
// 预写请求p传入一个mvcc事务和变更mut
func (p *Prewrite) prewriteMutation(txn *mvcc.MvccTxn, mut *kvrpcpb.Mutation) (*kvrpcpb.KeyError, error) {
	key := mut.Key
	log.Debug("prewrite key", zap.Uint64("start_ts", txn.StartTS),
		zap.String("key", hex.EncodeToString(key)))
	// Check for write conflicts.
	// Hint: Check the interafaces provided by `mvcc.MvccTxn`. The error type `kvrpcpb.WriteConflict` is used
	//		 denote to write conflict error, try to set error information properly in the `kvrpcpb.KeyError`
	//		 response.
	//panic("prewrite Mutation is not implemented yet")
	//=========================================检查写写冲突=====================================================
	// 检查txn最近一次写的commit_ts, 和txn的start_ts之间关系
  /* 正常情况,tnx是最新的预写
  	-----------------------------------------------时间线-------------------------------------------------
  	[-----最近写事务 tnx_recent-------]←commit_ts
  	                                      start_ts →[--------------当前事务tnx-----------]
  	                                       
    异常情况,txn在本key的上个事务还没有提交
    -----------------------------------------------时间线-------------------------------------------------
  	[-----最近写事务 tnx_recent-------] ←commit_ts
  	               start_ts →[--------------当前事务tnx-----------]
  
	*/
	_, commitTs, err := txn.MostRecentWrite(key)
	if err != nil {
		return nil, err
	}
	if commitTs >= txn.StartTS {
		return &kvrpcpb.KeyError{
			Conflict: &kvrpcpb.WriteConflict{
				StartTs:    txn.StartTS,
				ConflictTs: commitTs,
				Key:        key,
				Primary:    p.request.PrimaryLock,
			},
		}, nil
	}

	//=============================================检查锁======================================================
	// Check if key is locked. Report key is locked error if lock does exist, note the key could be locked
	// by this transaction already and the current prewrite request is stale.
	// 1. 获取锁
	lock, err := txn.GetLock(key)
	if err != nil {
		return nil, err
	}
	// 2. 当前key有锁
	if lock != nil {
    // 是自己的预写锁,直接返回
		if lock.Kind == mvcc.WriteKindFromProto(mut.Op) && lock.Ts == txn.StartTS {
			 return nil, nil
		}
		return &kvrpcpb.KeyError{Locked: lock.Info(key)}, nil
	}

	// Write a lock and value.
	// Hint: Check the interfaces provided by `mvccTxn.Txn`.
	//panic("lock record generation is not implemented yet")
	//=========================================加锁 并预写=====================================================
	txn.PutLock(key, &mvcc.Lock{
		Primary: p.request.PrimaryLock, // pk的锁状态
		Ts:      txn.StartTS,
		Ttl:     p.request.LockTtl,
		Kind:    mvcc.WriteKindFromProto(mut.Op),
	})
	switch mut.Op {
	case kvrpcpb.Op_Put:
		txn.PutValue(key, mut.Value)
	case kvrpcpb.Op_Del:
		txn.DeleteValue(key)
	}
	return nil, nil
}

2.2 提交阶段

如果所有的预写处理都成功,则事务进入Commit阶段。在这个阶段Primary Key将首先提交。Commit 操作过程中,会把Write Record(写记录)放入存储引擎的write column family中,并且把Prewrite Lock都解锁。如果主键提交成功,则认为事务已提交,并将成功响应返回客户端。我们把剩余的键叫做Secondary Keys,这些键将在后台任务中异步提交。

在公共路径中,这两个阶段就足够了,但不做崩溃恢复。在分布式环境中,故障可能发生在任何地方,例如tinysql服务器可能在两阶段提交之前发生故障,在新的region leader被选举出来后,如何恢复未完成的事务以保证正确性呢?

// commitKey 对keys进行提交,并返回提交信息
func commitKey(key []byte, commitTs uint64, txn *mvcc.MvccTxn, response interface{}) (interface{}, error) {
	// txn询问当前key上是否有锁
	lock, err := txn.GetLock(key)
	if err != nil {
		return nil, err
	}

	// If there is no correspond lock for this transaction.
	log.Debug("commitKey", zap.Uint64("startTS", txn.StartTS),
		zap.Uint64("commitTs", commitTs),
		zap.String("key", hex.EncodeToString(key)))
	// 正常情况下,key上应该有txn的预写锁,有如下两种异常情况
	// 1. 如果没有锁,就看看txn是commit/rollback?
	// 2. 如果有锁,但不是自己的
	if lock == nil || lock.Ts != txn.StartTS {
		// 发现当前key没有预写锁,或者不是本事务的锁,就需要检查这个key的commit记录或者rollback记录,
		// 1. 如果没有任何锁记录,返回没有找到锁
		// 2. 发现txn回滚,返回锁已经回滚了
		// 3. txn已经提交了
		write, _, err := txn.CurrentWrite(key)
		if err != nil {
			return nil, err
		}

		if write == nil { // 1. 没有找到预写锁,预写失败?
			respValue := reflect.ValueOf(response)
			keyError := &kvrpcpb.KeyError{Retryable: fmt.Sprintf("lock not found for key %v", key)}
			reflect.Indirect(respValue).FieldByName("Error").Set(reflect.ValueOf(keyError))
		} else if write.Kind == mvcc.WriteKindRollback { // 2. txn回滚了
			respValue := reflect.ValueOf(response)
			keyError := &kvrpcpb.KeyError{Retryable: fmt.Sprintf("roll back for key %v", key)}
			reflect.Indirect(respValue).FieldByName("Error").Set(reflect.ValueOf(keyError))
		}
		// 3. txn已经提交了
		return response, nil
	}
	//====================到了这里,代表本txn预写锁还在key上,需要对该key进行提交了=========================================
	// Commit a Write object to the DB
	write := mvcc.Write{StartTS: txn.StartTS, Kind: lock.Kind}
	txn.PutWrite(key, commitTs, &write)
	// Unlock the key
	txn.DeleteLock(key)

	return nil, nil
}

2.3 回滚记录

一旦确定事务失败,就应该清理它剩余的锁,并将回滚记录放入存储引擎,以防止将来可能的预写或提交(考虑网络延迟等)。所以如果回滚记录置于这个事务的主键上,那么事务状态为Rolled Back, 代表事务不会提交,必须失败。

// rollbackKey  对key上的事务和锁进行回滚
func rollbackKey(key []byte, txn *mvcc.MvccTxn, response interface{}) (interface{}, error) {
	lock, err := txn.GetLock(key)
	if err != nil {
		return nil, err
	}
	log.Info("rollbackKey",
		zap.Uint64("startTS", txn.StartTS),
		zap.String("key", hex.EncodeToString(key)))

	if lock == nil || lock.Ts != txn.StartTS {
		// There is no lock, check the write status.
		existingWrite, ts, err := txn.CurrentWrite(key)
		if err != nil {
			return nil, err
		}
		// Try to insert a rollback record if there's no correspond records, use `mvcc.WriteKindRollback` to represent
		// the type. Also the command could be stale that the record is already rolled back or committed.
		// If there is no write either, presumably the prewrite was lost. We insert a rollback write anyway.
		// if the key has already been rolled back, so nothing to do.
		// If the key has already been committed. This should not happen since the client should never send both
		// commit and rollback requests.
		
		
		// There is no write either, presumably the prewrite was lost. We insert a rollback write anyway.
		if existingWrite == nil {
			// 记录回滚的写请求
			write := mvcc.Write{
				StartTS: txn.StartTS,
				Kind:    mvcc.WriteKindRollback,
			}
			// PutWrite 记录txn的写请求
			txn.PutWrite(key, txn.StartTS, &write)
			return nil, nil
		} else {
			if existingWrite.Kind == mvcc.WriteKindRollback {
				// The key has already been rolled back, so nothing to do.
				return nil, nil
			}

			// The key has already been committed. This should not happen since the client should never send both
			// commit and rollback requests.
			err := new(kvrpcpb.KeyError)
			err.Abort = fmt.Sprintf("key has already been committed: %v at %d", key, ts)
			respValue := reflect.ValueOf(response)
			reflect.Indirect(respValue).FieldByName("Error").Set(reflect.ValueOf(err))
			return response, nil
		}
	}

	if lock.Kind == mvcc.WriteKindPut {
		txn.DeleteValue(key)
	}

	write := mvcc.Write{StartTS: txn.StartTS, Kind: mvcc.WriteKindRollback}
	txn.PutWrite(key, txn.StartTS, &write)
	txn.DeleteLock(key)

	return nil, nil
}

2.4 检查事务状态

事务的最终状态仅由Primary Key的状态或Primary Lock的状态决定。因此,如果某些事务状态无法确定,则需要检查其Primary Key的状态或Primary Lock的状态。

  • 如果主键有提交或回滚记录,可以确定事务已经提交或回滚。

  • 如果primary lock仍然存在,且尚未过期,则事务可能处于提交阶段。

  • 如果没有锁记录和提交、回滚记录,则事务状态不确定,我们可以选择等待或者写回滚记录来阻止后面的提交,然后决定回滚。

    两阶段提交中的Prewrite Lock都会有一个Time To Live字段,

    /*
      如果主键有提交或回滚记录,可以确定事务已经提交或回滚。
      如果`primary lock`仍然存在,且尚未过期,则事务可能处于提交阶段。
      如果没有锁记录和提交、回滚记录,则事务状态不确定,我们可以选择等待或者写回滚记录来阻止后面的提交,然后决定回滚。
     两阶段提交中的`Prewrite Lock`都会有一个`Time To Live`字段,
    */
    func (c *CheckTxnStatus) PrepareWrites(txn *mvcc.MvccTxn) (interface{}, error) {
    	key := c.request.PrimaryKey
    	response := new(kvrpcpb.CheckTxnStatusResponse)
    
    	lock, err := txn.GetLock(key)
    	if err != nil {
    		return nil, err
    	}
    	//panic("CheckTxnStatus is not implemented yet")
    	if lock != nil && lock.Ts == txn.StartTS {
    		// key上是本事务自己的锁
    		if physical(lock.Ts)+lock.Ttl < physical(c.request.CurrentTs) {
    			// 发起锁状态检查时,锁已经过期了,需要对事务进行回滚
    			// Lock has expired, try to rollback it. `mvcc.WriteKindRollback` could be used to
    			// represent the type. Try using the interfaces provided by `mvcc.MvccTxn`.
    			// rollback包括: 记录下回滚请求,放到txn中,删除key上的锁,如果是put,则需要删除kv
    			_, err = rollbackKey(key, txn, response)
    			if err != nil {
    				return nil, err
    			}
    
    			log.Info("checkTxnStatus rollback the primary lock as it's expired",
    				zap.Uint64("lock.TS", lock.Ts),
    				zap.Uint64("physical(lock.TS)", physical(lock.Ts)),
    				zap.Uint64("txn.StartTS", txn.StartTS),
    				zap.Uint64("currentTS", c.request.CurrentTs),
    				zap.Uint64("physical(currentTS)", physical(c.request.CurrentTs)))
    		} else {
    			// Lock has not expired, leave it alone.
    			// 不是自己锁,但还没有过期
    			response.Action = kvrpcpb.Action_NoAction
    			response.LockTtl = lock.Ttl
    		}
    
    		return response, nil
    	}
    
    	existingWrite, commitTs, err := txn.CurrentWrite(key)
    	if err != nil {
    		return nil, err
    	}
    
    	if existingWrite == nil {
    		// The lock never existed, it's still needed to put a rollback record on it so that
    		// the stale transaction commands such as prewrite on the key will fail.
    		// Note try to set correct `response.Action`,
    		// the action types could be found in kvrpcpb.Action_xxx.
    		// 发现锁不存在,代表事务已经提交,锁被删掉,或者事务回滚,锁也回滚了,所以整个事务就fail了。
    		write := mvcc.Write{
    			StartTS: txn.StartTS,
    			Kind:    mvcc.WriteKindRollback,
    		}
    
    		txn.PutWrite(key, txn.StartTS, &write)
    
    		response.Action = kvrpcpb.Action_LockNotExistRollback
    
    		return response, nil
    	}
    
    	if existingWrite.Kind == mvcc.WriteKindRollback {
    		// The key has already been rolled back, so nothing to do.
    		response.Action = kvrpcpb.Action_NoAction
    		return response, nil
    	}
    
    	// The key has already been committed.
    	response.CommitVersion = commitTs
    	response.Action = kvrpcpb.Action_NoAction
    	return response, nil
    }
    

2.5 冲突 和 恢复

不同的事务coordinatiors可能位于不同的tinysql服务器中,读写请求可能会相互冲突。如果一个事务txn1k1上加了一个预写锁,,另一个读事务txn2尝试读k1txn2将如何处理这种情况呢?

由于锁记录造成的阻塞,读写请求无法继续,有以下几种可能:

  • 锁的拥有者事务已经提交,那么锁会被释放,被阻塞的请求课获取该key, 继续执行。
  • 锁的拥有者事务已经回滚,锁也会被释放,被阻塞的请求可以获取该key,继续执行。
  • 锁的拥有者事务仍在进行中,被阻塞的请求必须等待阻塞事务完成。

tinysql/tinykv集群中,这些冲突处理叫做Resolve。一旦请求被另一个事务的预写锁阻塞,resolve过程将会确定锁是处于提交状态还是回滚状态,以便请求可以继续。这些Resolve操作可以隐式地进行事务恢复。假设coordinator刚刚对一个key加上预写锁,tinysql服务器就崩溃了,来自其他tinysql服务器的并发事务将会帮助处理剩下的预写锁。

func (rl *ResolveLock) PrepareWrites(txn *mvcc.MvccTxn) (interface{}, error) {
	// A map from start timestamps to commit timestamps which tells us whether a transaction (identified by start ts)
	// has been committed (and if so, then its commit ts) or rolled back (in which case the commit ts is 0).
	commitTs := rl.request.CommitVersion
	response := new(kvrpcpb.ResolveLockResponse)

	log.Info("There keys to resolve",
		zap.Uint64("lockTS", txn.StartTS),
		zap.Int("number", len(rl.keyLocks)),
		zap.Uint64("commit_ts", commitTs))
	//panic("ResolveLock is not implemented yet")
	for _, kl := range rl.keyLocks {

		// Try to commit the key if the transaction is committed already, or try to rollback the key if it's not.
		// The `commitKey` and `rollbackKey` functions could be useful.
		if commitTs == 0 {
			// 发现pk还没提交,把这个key回滚
			_, err := rollbackKey(kl.Key, txn, response)
			if err != nil {
				return nil, err
			}
		} else {
			// pk提交了,把这个key和pk对齐,也提交
			_, err := commitKey(kl.Key, commitTs, txn, response)
			if err != nil {
				return nil, err
			}
		}

		log.Debug("resolve key", zap.String("key", hex.EncodeToString(kl.Key)))
	}

	return response, nil
}

2.6 可见性

func (g *Get) Read(txn *mvcc.RoTxn) (interface{}, [][]byte, error) {
	key := g.request.Key
	log.Debug("read key", zap.Uint64("start_ts", txn.StartTS),
		zap.String("key", hex.EncodeToString(key)))
	response := new(kvrpcpb.GetResponse)

	// Check for locks and their visibilities.
	// Hint: Check the interfaces provided by `mvcc.RoTxn`.
	lock, err := txn.GetLock(key)
	if err != nil {
		return nil, nil, err
	}
	if lock != nil && lock.IsLockedFor(key, g.startTs, response) {
		// get请求来的时候,还没有commit,预写锁都还在,直接返回,拿不到东西
		return response, nil, nil
	}

	value, err := txn.GetValue(key)
	if err != nil {
		return nil, nil, err
	}
	if value == nil {
		response.NotFound = true
	}
	
	response.Value = value
	
	return response, nil, nil
}

3. 分布式事务: Percolator

3.1 执行模型介绍

在 TiDB 中,事务的执行过程会被缓存在 buffer 中,在提交时,才会通过 Percolator 提交协议将其完整地写入到分布的 TiKV 存储引擎中。这一调用的入口是 store/tikv/txn.go 中的 tikvTxn.Commit 函数。
在执行时,一个事务可能会遇到其他执行过程中的事务,此时需要通过 Lock Resolve 组件来查询所遇到的事务状态,并根据查询到的结果执行相应的措施。

3.2 两阶段提交——2PC

Percolator 提交协议的两阶段提交分为 Prewrite 和 Commit,其中 Prewrite 实际写入数据,Commit 让数据对外可见。其中,事务的成功以 Primary Key 为原子性标记,当 Prewrite 失败或是 Primary Key Commit 失败时需要进行垃圾清理,将写入的事务回滚。
一个事务中的 Key 可能会设计到不同的 Region,在对 Key 进行写操作时,需要将其发送到正确的 Region 上才能够处理,GroupKeysByRegion 函数根据 region cache 将 Key 按 Region 分成多个 batch,但是可能出现因缓存过期而导致对应的存储节点返回 Region Error,此时需要分割 batch 后重试。
为了让对于 Key 的操作能够执行,需要实现 region_cache.go 中的 GroupKeysByRegion 函数。
在执行过程中,会涉及到三类操作,分别是 Prewrite/Commit/Rollback(Cleanup)。这些操作会在同一个流程中被处理。你需要完成 Prewrite 过程中的 buildPrewriteRequest 函数,然后完成 Prewrite 的 handleSingleBatch 函数完成 Commit 和 Rollback 的 handleSingleBatch 函数。

// GroupKeysByRegion 根据真实keys所在region划分txn想要修改的keys,
// 并返回第一个key(把他当做pk)所在的region,pk要比其他key更早commit
// RegionCache缓存了真实的key所在的region
// Help function `RegionCache.LocateKey` 返回key所在的region,这些都缓存在RegionCache里面
func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte, filter func(key, regionStartKey []byte) bool) (map[RegionVerID][][]byte, RegionVerID, error) {

  regions := make(map[RegionVerID][][]byte)
  var pkRegion RegionVerID

  for idx, key := range keys {
    region, err := c.LocateKey(bo, key)
    if err != nil {
      return nil, RegionVerID{}, err
    }
    // 过滤掉不想要的key
    if filter != nil && !filter(key, region.StartKey) {
      continue
    }
    // 确定pk所在region
    if idx == 0 {
      pkRegion = region.Region
    }
    // 保存所有keys分组region信息
    regions[region.Region] = append(regions[region.Region], key)
  }

  return regions, pkRegion, nil
}

批处理prewrite、commit、roll back请求
// handleSingleBatch 用于批处理prewrite请求
func (actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
   // 获取批处理请求
   req := c.buildPrewriteRequest(batch)
   for {
      // 发送请求给对应region,记得加超时
      resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
      if err != nil {
         return errors.Trace(err)
      }
      regionErr, err := resp.GetRegionError()
      if err != nil {
         return errors.Trace(err)
      }
      // 如果该region宕机了
      if regionErr != nil {
         // The region info is read from region cache,
         // so the cache miss cases should be considered
         // You need to handle region errors here
         err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
         if err != nil {
            return errors.Trace(err)
         }
         // re-split keys and prewrite again.
         err = c.prewriteKeys(bo, batch.keys)
         return errors.Trace(err)
      }
      // region没有宕机
      if resp.Resp == nil {
         return errors.Trace(ErrBodyMissing)
      }
      prewriteResp := resp.Resp.(*pb.PrewriteResponse)
      // 处理keys没有问题直接返回
      keyErrs := prewriteResp.GetErrors()
      if len(keyErrs) == 0 {
         return nil
      }
      var locks []*Lock
      // 如果keys有问题则还需要resovle
      for _, keyErr := range keyErrs {
         // Extract lock from key error
         lock, err1 := extractLockFromKeyErr(keyErr)
         if err1 != nil {
            return errors.Trace(err1)
         }
         logutil.BgLogger().Debug("prewrite encounters lock",
            zap.Uint64("conn", c.connID),
            zap.Stringer("lock", lock))
         locks = append(locks, lock)
      }
      // While prewriting, if there are some overlapped locks left by other transactions,
      // TiKV will return key errors. The statuses of these transactions are unclear.
      // ResolveLocks will check the transactions' statuses by locks and resolve them.
      // Set callerStartTS to 0 so as not to update minCommitTS.
      msBeforeExpired, _, err := c.store.lockResolver.ResolveLocks(bo, 0, locks)
      if err != nil {
         return errors.Trace(err)
      }
      if msBeforeExpired > 0 {
         err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
         if err != nil {
            return errors.Trace(err)
         }
      }
   }
}


// handleSingleBatch 用于批处理commit请求
func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
   // follow actionPrewrite.handleSingleBatch, build the commit request

   var resp *tikvrpc.Response
   var err error
   sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
   //通过tinyKVRPC包中新建请求,构造commit请求,需要keys, start_ts和commit_ts
   req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &pb.CommitRequest{
      StartVersion:  c.startTS,
      Keys:          batch.keys,
      CommitVersion: c.commitTS,
   })
   // 利用sender发送请求, 得到响应体
   resp, err = sender.SendReq(bo, req, batch.region, readTimeoutShort)
   logutil.BgLogger().Debug("actionCommit handleSingleBatch", zap.Bool("nil response", resp == nil))

   /**  如果在commit pk,但没有收到回应,则不确定tnx是否成功commit
       这种情况,不能直接说commit失败了(这样会导致数据丢失)
        也不能直接抛出一个err(tnx可能被重启)
        最佳的解决方案是输入err,让上层去断开相应的mysql客户端连接
   */
   isPrimary := bytes.Equal(batch.keys[0], c.primary())
   if isPrimary && sender.rpcError != nil {
      c.setUndeterminedErr(errors.Trace(sender.rpcError))
      return nil
   }

   if _, ok := failpoint.Eval(("mockFailAfterPK")); ok {
      if !isPrimary {
         err = errors.New("commit secondary keys error")
      }
   }

   if err != nil {
      return errors.Trace(err)
   }

   // handle the response and error refer to actionPrewrite.handleSingleBatch
   regionError, err := resp.GetRegionError()
   if err != nil {
      return errors.Trace(err)
   }
   if regionError != nil {
      err = bo.Backoff(BoRegionMiss, errors.New(regionError.String()))
      if err != nil {
         return errors.Trace(err)
      }
      // re-split keys and prewrite again.
      err = c.commitKeys(bo, batch.keys)
      return errors.Trace(err)
   }
   if resp.Resp == nil {
      return errors.Trace(ErrBodyMissing)
   }

   commitResp := resp.Resp.(*pb.CommitResponse)
   keyErr := commitResp.GetError()
   if keyErr != nil {
      err := extractKeyErr(keyErr)
      if err != nil {
         return errors.Trace(err)
      }
   }

   c.mu.Lock()
   defer c.mu.Unlock()
   // Group that contains primary key is always the first.
   // We mark transaction's status committed when we receive the first success response.
   c.mu.committed = true
   return nil
}

// 批处理回滚请求
func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchKeys) error {
   // follow actionPrewrite.handleSingleBatch, build the rollback request

   // build and send the rollback request

   req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &pb.BatchRollbackRequest{
      StartVersion: c.startTS,
      Keys:         batch.keys,
   })
   sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
   resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort)
   if err != nil {
      return err
   }
   logutil.BgLogger().Debug("actionRollback handleSingleBatch", zap.Bool("nil response", resp == nil))

   // handle the response and error refer to actionPrewrite.handleSingleBatch
   regionError, err := resp.GetRegionError()
   if err != nil {
      return errors.Trace(err)
   }
   if regionError != nil {
      err = bo.Backoff(BoRegionMiss, errors.New(regionError.String()))
      if err != nil {
         return errors.Trace(err)
      }
      // re-split keys and rollback again.
      err = c.cleanupKeys(bo, batch.keys)
      return errors.Trace(err)
   }
   rollbackResp := resp.Resp.(*pb.BatchRollbackResponse)

   keyErr := rollbackResp.GetError()

   if err != nil {
      err := extractKeyErr(keyErr)
      if err != nil {
         return err
      }
   }

   // 回滚成功
   c.mu.Lock()
   defer c.mu.Unlock()
   // Group that contains primary key is always the first.
   // We mark transaction's status committed when we receive the first success response.
   c.mu.committed = true
   return nil
}

3.3 Lock Resolver

在 Prewrite 阶段,对于一个 Key 的操作会写入两条记录。

  • Default CF 中存储了实际的 KV 数据。
  • Lock CF 中存储了锁,包括 Key 和时间戳信息,会在 Commit 成功时清理。
    Lock Resolver 的职责就是当一个事务在提交过程中遇到 Lock 时,需要如何应对。
    当一个事务遇到 Lock 时,可能有几种情况。
  • Lock 所属的事务还未提交这个 Key,Lock 尚未被清理;
  • Lock 所属的事务遇到了不可恢复的错误,正在回滚中,尚未清理 Key;
  • Lock 所属事务的节点发生了意外错误,例如节点 crash,这个 Lock 所属的节点已经不能够更新它。
    在 Percolator 协议下,会通过查询 Lock 所属的 Primary Key 来判断事务的状态,但是当读取到一个未完成的事务(Primary Key 的 Lock 尚未被清理)时,我们所期望的,是等待提交中的事物至完成状态,并且清理如 crash 等异常留下的垃圾数据。此时会借助 ttl 来判断事务是否过期,遇到过期事务时则会主动 Rollback
    在 lock_resolver.go 中完成 getTxnStatus 和 resolveLock 函数,使得向外暴露的 ResolveLocks 函数能够正常运行。
    需要有getTxnStatus获得事务锁状态
// getTxnStatus sends the CheckTxnStatus request to the TiKV server.
// When rollbackIfNotExist is false, the caller should be careful with the txnNotFoundErr error.
func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte, callerStartTS, currentTS uint64, rollbackIfNotExist bool) (TxnStatus, error) {
   if s, ok := lr.getResolved(txnID); ok {
      return s, nil
   }

   // CheckTxnStatus may meet the following cases:
   // 1. LOCK 锁过期了 或者 在ttl内
   // 1.1 Lock expired -- orphan lock, fail to update TTL, crash recovery etc.
   // 1.2 Lock TTL -- active transaction holding the lock.
   // 2. NO LOCK  已经commit或者回滚或者还没预写
   // 2.1 Txn Committed
   // 2.2 Txn Rollbacked -- rollback itself, rollback by others, GC tomb etc.
   // 2.3 No lock -- concurrence prewrite.

   var status TxnStatus
   var req *tikvrpc.Request
   // new一个检查状态请求
   req = tikvrpc.NewRequest(tikvrpc.CmdCheckTxnStatus, &kvrpcpb.CheckTxnStatusRequest{
      PrimaryKey: primary,
      LockTs:     txnID,
      CurrentTs:  currentTS,
   })

   for {
      // 确定pk所在region
      loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
      if err != nil {
         return status, errors.Trace(err)
      }
      // 给pk所在region发送checkTnxStatus
      resp, err := lr.store.SendReq(bo, req, loc.Region, readTimeoutShort)
      if err != nil {
         return status, errors.Trace(err)
      }
      regionErr, err := resp.GetRegionError()
      if err != nil {
         return status, errors.Trace(err)
      }
      if regionErr != nil {
         err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
         if err != nil {
            return status, errors.Trace(err)
         }
         continue
      }
      if resp.Resp == nil {
         return status, errors.Trace(ErrBodyMissing)
      }
      // 获取到pk所在region的状态了
      cmdResp := resp.Resp.(*kvrpcpb.CheckTxnStatusResponse)
      logutil.BgLogger().Debug("cmdResp", zap.Bool("nil", cmdResp == nil))
      // Assign status with response
      // pk所在region正常
      status = TxnStatus{
         ttl:      cmdResp.LockTtl,
         commitTS: cmdResp.CommitVersion,
         action:   cmdResp.Action,
      }

      return status, nil
   }
}
冲突处理的实现
// resolveLock 根据pk返回的状态去处理secondary key
func (lr *LockResolver) resolveLock(bo *Backoffer, l *Lock, status TxnStatus, cleanRegions map[RegionVerID]struct{}) error {
   cleanWholeRegion := l.TxnSize >= bigTxnThreshold
   for {
      loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key)
      if err != nil {
         return errors.Trace(err)
      }
      if _, ok := cleanRegions[loc.Region]; ok {
         return nil
      }

      var req *tikvrpc.Request

   
      req = tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{
         StartVersion:  l.TxnID,
         CommitVersion: status.commitTS, // pk的commit_ts 以此判断pk是否commit
      })
      // 发送给每个secondary key的region去resolve lock
      resp, err := lr.store.SendReq(bo, req, loc.Region, readTimeoutShort)
      if err != nil {
         return errors.Trace(err)
      }
      regionErr, err := resp.GetRegionError()
      if err != nil {
         return errors.Trace(err)
      }
      if regionErr != nil {
         err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
         if err != nil {
            return errors.Trace(err)
         }
         continue
      }
      if resp.Resp == nil {
         return errors.Trace(ErrBodyMissing)
      }
      cmdResp := resp.Resp.(*kvrpcpb.ResolveLockResponse)
      if keyErr := cmdResp.GetError(); keyErr != nil {
         err = errors.Errorf("unexpected resolve err: %s, lock: %v", keyErr, l)
         logutil.BgLogger().Error("resolveLock error", zap.Error(err))
         return err
      }
      if cleanWholeRegion {
         cleanRegions[loc.Region] = struct{}{}
      }
      return nil
   }
}

除了在事务的提交过程中,事务对数据进行读取的时候也可能遇到 Lock,此时也会触发 ResolveLocks 函数,通过 snapshot.go 中的 tikvSnapshot.get 函数,让读请求能够正常运行。

func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
   // Check the cached values first.
   if s.cached != nil {
      if value, ok := s.cached[string(k)]; ok {
         return value, nil
      }
   }

   failpoint.Inject("snapshot-get-cache-fail", func(_ failpoint.Value) {
      if bo.ctx.Value("TestSnapshotCache") != nil {
         panic("cache miss")
      }
   })

   cli := clientHelper{
      LockResolver:      s.store.lockResolver,
      RegionCache:       s.store.regionCache,
      minCommitTSPushed: &s.minCommitTSPushed,
      Client:            s.store.client,
   }

   req := tikvrpc.NewRequest(tikvrpc.CmdGet,
      &pb.GetRequest{
         Key:     k,
         Version: s.version.Ver,
      }, pb.Context{})
   for {
      loc, err := s.store.regionCache.LocateKey(bo, k)
      if err != nil {
         return nil, errors.Trace(err)
      }
      resp, _, _, err := cli.SendReqCtx(bo, req, loc.Region, readTimeoutShort, "")
      if err != nil {
         return nil, errors.Trace(err)
      }
      regionErr, err := resp.GetRegionError()
      if err != nil {
         return nil, errors.Trace(err)
      }
      if regionErr != nil {
         err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
         if err != nil {
            return nil, errors.Trace(err)
         }
         continue
      }
      if resp.Resp == nil {
         return nil, errors.Trace(ErrBodyMissing)
      }
      cmdGetResp := resp.Resp.(*pb.GetResponse)
      val := cmdGetResp.GetValue()
      if keyErr := cmdGetResp.GetError(); keyErr != nil {
         // If the key error is a lock, there are 2 possible cases:
         //   1. The transaction is during commit, wait for a while and retry.
         //   2. The transaction is dead with some locks left, resolve it.
         // 从keyErr中获取锁的状态
         lockFromKeyErr, err := extractLockFromKeyErr(keyErr)
         if err != nil {
            return nil, err
         }
         //
         msBeforeExpired, _, err := s.store.lockResolver.ResolveLocks(bo, 0, []*Lock{lockFromKeyErr})
         if err != nil {
            return nil, err
         }
         if msBeforeExpired > 0 {
            err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC get lockedKeys: %d", 1))
            if err != nil {
               return nil, errors.Trace(err)
            }
         }

         continue

      }
      return val, nil
   }
}
 类似资料: