当前位置: 首页 > 工具软件 > btcd > 使用案例 >

btcd源码解析——peer节点之间的区块数据同步 (1)

严斌
2023-12-01

1. 写在前面

从这一篇博客开始,我们将介绍btcd节点之间的数据同步。考虑到内容太长,分为三篇博客来讲解。

  • 第一篇 (本篇) 介绍节点是如何发起数据请求的
  • 第二篇介绍headersFirstMode模式下的数据同步
  • 第三篇介绍非headersFirstMode模式下的数据同步

源码解析是基于btcd仓库c26ffa870fd817666a857af1bf6498fabba1ffe3commit id 版本。

我们假设peer B是先启动的节点,peer A后启动。peer A在启动后需要从peer B同步数据,那么peer A应该首先向peer B发送数据请求。

2. 从peer A发起数据请求开始

我们先来看一个问题: peer A是如何发起数据请求的
回顾我们在博客btcd源码解析——节点P2P连接建立的过程 (2)中提到的OnVersion函数,我们将其代码再次贴在下面:

// OnVersion [server.go]
func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) 
*wire.MsgReject {
    ... 
    sp.server.syncManager.NewPeer(sp.Peer)                  		// L494
    ...
    sp.server.AddPeer(sp)                                           // L501 
    ...
}

其中L501行代码我们在btcd源码解析——节点P2P连接建立的过程 (2)中介绍了其功能,主要用于将peer变量 (sp) 登记到state状态中。
这里我们重点来关注L494行代码,该行代码调用了SyncManager类的NewPeer函数。NewPeer函数代码如下所示:

// OnVersion [server.go] -> NewPeer [manager.go]
func (sm *SyncManager) NewPeer(peer *peerpkg.Peer) {
    ...
    sm.msgChan <- &newPeerMsg{peer: peer}               			// L1443
}

L1443行代码向管道msgChan中发送数据,通知SyncManager连接到了一个新peer。管道的另外一端连接着blockHandler函数,如下所示:

// OnVersion [server.go] -> NewPeer [manager.go] -> blockHandler
func (sm *SyncManager) blockHandler() {
    ...
out:
    for {
        select {
        case m := <-sm.msgChan:      
            switch msg := m.(type) {      
            case *newPeerMsg:                                       // L1281           
                sm.handleNewPeerMsg(msg.peer)                    	// L1282
            ...
            }
        ...
        }
    }
    ...
}

L1282行将对新连接的peer进行一些后续处理,代码如下所示:

// OnVersion [server.go] -> NewPeer [manager.go] -> blockHandler -> handleNewPeerMsg
func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
    ...
    isSyncCandidate := sm.isSyncCandidate(peer)                     // L406
    sm.peerStates[peer] = &peerSyncState{                      		// L407
        syncCandidate:   isSyncCandidate,      
        requestedTxns:   make(map[chainhash.Hash]struct{}),      
        requestedBlocks: make(map[chainhash.Hash]struct{}),
    }                                                               // L411
    
    // Start syncing by choosing the best candidate if needed.
    if isSyncCandidate && sm.syncPeer == nil {                     // L414
        sm.startSync()                                             // L415
    }
}

L406行检查该peer是否是合格的”同步候选者“,具体如何检查的代码,我们这里省略。
L407-L411行将该peer和其相应的”同步状态“加入到一个map变量(peerStates)中。这意味着可能会有多个合格的”同步候选者“,必要时需要从中选出最佳候选者进行数据的同步。
L414行判断如果该peer为合格的”同步候选者“,并且当前并没有启动同步工作,就将通过L415行启动同步工作。startSync函数代码如下:

// OnVersion [server.go] -> NewPeer [manager.go] -> blockHandler -> handleNewPeerMsg -> startSync
func (sm *SyncManager) startSync() {
    ...
    best := sm.chain.BestSnapshot()                       			// L248
    var higherPeers, equalPeers []*peerpkg.Peer
    for peer, state := range sm.peerStates {
        if !state.syncCandidate {
            continue
        }
        
        if segwitActive && !peer.IsWitnessEnabled() {
            log.Debugf("peer %v not witness enabled, skipping", peer)
            continue
        }
        ...
        if peer.LastBlock() < best.Height {
            state.syncCandidate = false
            continue
        }
        
        ...
        if peer.LastBlock() == best.Height {
            equalPeers = append(equalPeers, peer)
            continue
        }
        ...
        higherPeers = append(higherPeers, peer)
	}                         			                            // L284
    ...
    var bestPeer *peerpkg.Peer
    switch {
    case len(higherPeers) > 0:      
        bestPeer = higherPeers[rand.Intn(len(higherPeers))]
        
    case len(equalPeers) > 0:      
        bestPeer = equalPeers[rand.Intn(len(equalPeers))]
    }                           			                       // L298
    
    if bestPeer != nil {                 	                       // L301
        ...
        locator, err := sm.chain.LatestBlockLocator()    	       // L307
        ...
        if sm.nextCheckpoint != nil &&     
            best.Height < sm.nextCheckpoint.Height &&      
            sm.chainParams != &chaincfg.RegressionNetParams {           // L336
            
            bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) // L338
            sm.headersFirstMode = true                  		      	// L339
            ...
        } else {
            bestPeer.PushGetBlocksMsg(locator, &zeroHash)               // L344
        }
        sm.syncPeer = bestPeer                            		        // L346
    } else {
        ...
    }
}

L248-L298行实现了从多个合格的”同步候选者“中选出最佳候选者(bestPeer)的工作。
bestPeer存在时,则基于bestPeer进行数据同步。
L307行定义了一个locatorslice变量。简单来说locator代表了一条链的某个特定区块。之所以它是一个slice变量,是因为区块链可能分叉,必须记录一条从该区块到创世区块的路径,该路径即由一个slice来存储(更详细的解释可参考博客Btcd区块链协议消息解析)。这里LatestBlockLocator函数返回的是当前链最新区块对应的locator

考虑两种可能的同步数据的模式:

  1. checkpoint存在,且当前已同步区块的高度低于checkpoint的高度,且不是Regression Test网络时 (L336行),采用headersFirstMode同步模式。在该模式下,将优先获取区块头,然后再获取区块体。通过在L338行通过调用PushGetHeadersMsg函数发送”获取区块头“的请求,并在L339行将headersFirstMode值设置为true
  2. 否则,采用正常的区块数据下载模式,通过在L344行调用PushGetBlocksMsg函数发送”获取区块“的请求。

L346行将选出的最佳候选者(bestPeer)赋值给smsyncPeer

3. 总结

至此,我们介绍了一个新的节点 (peer A) 是如何向其他节点发起数据请求的。
下一篇博客,我们将介绍headersFirstMode模式下数据同步的细节。

 类似资料: