从这一篇博客开始,我们将介绍btcd
节点之间的数据同步。考虑到内容太长,分为三篇博客来讲解。
headersFirstMode
模式下的数据同步非headersFirstMode
模式下的数据同步源码解析是基于btcd仓库c26ffa870fd817666a857af1bf6498fabba1ffe3
的commit id
版本。
我们假设peer B
是先启动的节点,peer A
后启动。peer A
在启动后需要从peer B
同步数据,那么peer A
应该首先向peer B
发送数据请求。
我们先来看一个问题: peer A是如何发起数据请求的?
回顾我们在博客btcd源码解析——节点P2P连接建立的过程 (2)中提到的OnVersion
函数,我们将其代码再次贴在下面:
// OnVersion [server.go]
func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion)
*wire.MsgReject {
...
sp.server.syncManager.NewPeer(sp.Peer) // L494
...
sp.server.AddPeer(sp) // L501
...
}
其中L501行代码我们在btcd源码解析——节点P2P连接建立的过程 (2)中介绍了其功能,主要用于将peer
变量 (sp
) 登记到state
状态中。
这里我们重点来关注L494行代码,该行代码调用了SyncManager
类的NewPeer
函数。NewPeer
函数代码如下所示:
// OnVersion [server.go] -> NewPeer [manager.go]
func (sm *SyncManager) NewPeer(peer *peerpkg.Peer) {
...
sm.msgChan <- &newPeerMsg{peer: peer} // L1443
}
L1443行代码向管道msgChan
中发送数据,通知SyncManager
连接到了一个新peer
。管道的另外一端连接着blockHandler
函数,如下所示:
// OnVersion [server.go] -> NewPeer [manager.go] -> blockHandler
func (sm *SyncManager) blockHandler() {
...
out:
for {
select {
case m := <-sm.msgChan:
switch msg := m.(type) {
case *newPeerMsg: // L1281
sm.handleNewPeerMsg(msg.peer) // L1282
...
}
...
}
}
...
}
L1282行将对新连接的peer
进行一些后续处理,代码如下所示:
// OnVersion [server.go] -> NewPeer [manager.go] -> blockHandler -> handleNewPeerMsg
func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
...
isSyncCandidate := sm.isSyncCandidate(peer) // L406
sm.peerStates[peer] = &peerSyncState{ // L407
syncCandidate: isSyncCandidate,
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
} // L411
// Start syncing by choosing the best candidate if needed.
if isSyncCandidate && sm.syncPeer == nil { // L414
sm.startSync() // L415
}
}
L406行检查该peer
是否是合格的”同步候选者“,具体如何检查的代码,我们这里省略。
L407-L411行将该peer
和其相应的”同步状态“加入到一个map
变量(peerStates
)中。这意味着可能会有多个合格的”同步候选者“,必要时需要从中选出最佳候选者进行数据的同步。
L414行判断如果该peer
为合格的”同步候选者“,并且当前并没有启动同步工作,就将通过L415行启动同步工作。startSync
函数代码如下:
// OnVersion [server.go] -> NewPeer [manager.go] -> blockHandler -> handleNewPeerMsg -> startSync
func (sm *SyncManager) startSync() {
...
best := sm.chain.BestSnapshot() // L248
var higherPeers, equalPeers []*peerpkg.Peer
for peer, state := range sm.peerStates {
if !state.syncCandidate {
continue
}
if segwitActive && !peer.IsWitnessEnabled() {
log.Debugf("peer %v not witness enabled, skipping", peer)
continue
}
...
if peer.LastBlock() < best.Height {
state.syncCandidate = false
continue
}
...
if peer.LastBlock() == best.Height {
equalPeers = append(equalPeers, peer)
continue
}
...
higherPeers = append(higherPeers, peer)
} // L284
...
var bestPeer *peerpkg.Peer
switch {
case len(higherPeers) > 0:
bestPeer = higherPeers[rand.Intn(len(higherPeers))]
case len(equalPeers) > 0:
bestPeer = equalPeers[rand.Intn(len(equalPeers))]
} // L298
if bestPeer != nil { // L301
...
locator, err := sm.chain.LatestBlockLocator() // L307
...
if sm.nextCheckpoint != nil &&
best.Height < sm.nextCheckpoint.Height &&
sm.chainParams != &chaincfg.RegressionNetParams { // L336
bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) // L338
sm.headersFirstMode = true // L339
...
} else {
bestPeer.PushGetBlocksMsg(locator, &zeroHash) // L344
}
sm.syncPeer = bestPeer // L346
} else {
...
}
}
L248-L298行实现了从多个合格的”同步候选者“中选出最佳候选者(bestPeer
)的工作。
当bestPeer
存在时,则基于bestPeer
进行数据同步。
L307行定义了一个locator
的slice
变量。简单来说locator
代表了一条链的某个特定区块。之所以它是一个slice
变量,是因为区块链可能分叉,必须记录一条从该区块到创世区块的路径,该路径即由一个slice
来存储(更详细的解释可参考博客Btcd区块链协议消息解析)。这里LatestBlockLocator
函数返回的是当前链最新区块对应的locator
。
考虑两种可能的同步数据的模式:
checkpoint
存在,且当前已同步区块的高度低于checkpoint
的高度,且不是Regression Test网络
时 (L336行),采用headersFirstMode
同步模式。在该模式下,将优先获取区块头,然后再获取区块体。通过在L338行通过调用PushGetHeadersMsg
函数发送”获取区块头“的请求,并在L339行将headersFirstMode
值设置为true
PushGetBlocksMsg
函数发送”获取区块“的请求。L346行将选出的最佳候选者(bestPeer
)赋值给sm
的syncPeer
。
至此,我们介绍了一个新的节点 (peer A
) 是如何向其他节点发起数据请求的。
下一篇博客,我们将介绍headersFirstMode
模式下数据同步的细节。