当前位置: 首页 > 工具软件 > btcd > 使用案例 >

btcd源码解析——节点P2P连接建立的过程 (1)

淳于思淼
2023-12-01

1. 写在前面

从这一篇博客开始,我们将介绍btcd节点之间P2P连接建立的过程。考虑到内容太长,分为上下两篇博客来讲解。这两篇博客之后,我们将继续介绍节点之间数据的同步过程。
源码解析是基于btcd仓库c26ffa870fd817666a857af1bf6498fabba1ffe3commit id 版本。

2. 从节点的“启动”说起

btcd节点的启动主要由btcd.go文件中的btcdMain函数完成,其中P2P的连接过程又是由 server.Start()代码完成,如下所示:

// btcdMain [btcd.go]
func btcdMain(serverChan chan<- *server) error {    				// L43
    ...
    server, err := newServer(...)                                   // L149
    ...
    server.Start()                                                  // L162
    ...
}

Start函数中和P2P连接相关的部分在go s.peerHandler()完成,如下代码所示:

// btcdMain [btcd.go] -> Start [server.go]
func (s *server) Start() {                                      // L2291
    ...
    go s.peerHandler()                                          // L2305
    ...
}

在比特币P2P连接的语境中,一个节点就是一个peer. peerHandler函数包含三个最关键的启动工作:

  • 启动addrManager,进行peer地址的管理
  • 启动 syncManager,进行peer之间数据的同步
  • 启动 connManager,进行peer之间连接的管理

代码如下所示:

// btcdMain [btcd.go] -> Start [server.go] -> peerHandler
func (s *server) peerHandler() {                             // L2062
    ...
    s.addrManager.Start()                                    // L2068
    s.syncManager.Start()                                    // L2069
    ...
    go s.connManager.Start()                           		 // L2093
    ...
}

其中跟P2P连接相关的主要是addrManagerconnManager的启动。具体而言:

  • addrManager负责对其他peer地址的管理,主要是一些本地的工作,不涉及直接的网络连接或传输;
  • connManager则主要负责与其他peer建立P2P连接,建立连接是需要对方peer的地址,这便依赖于addrManager中管理的地址。

3. P2P连接中peer地址的管理

3.1 AddrManager数据结构相关

P2P连接中peer地址的管理主要由addrManager完成,addrManager变量中包含了各种用于地址管理的信息,其数据结构如下所示:

// AddrManager [addrmanager.go]
type AddrManager struct {                                           // L32
    ...
    peersFile      string                                           // L34
    ...
    addrIndex    map[string]*KnownAddress                           // L38
    addrNew     [newBucketCount]map[string]*KnownAddress
    addrTried    [newBucketCount]map[string]*KnownAddress
    ...
}

其中最重要的四个字段是peersFile, addrIndex, addrNew, 和addrTried:

  • peersFile 对应于一个文件名,该文件主要保存序列化后的addrManager,用于节点重启时能快速建立连接。该文件路径名默认为$data-dir/data/mainnet/peers.json
  • addrIndex 缓存所有KnownAddressmap
  • addrNew 缓存所有新地址的map slice
  • addrTried 缓存所有已经尝试连接过的地址的list slice

准确来说,peersFile中保存的并不是直接序列化后的addrManager,因为addrManager中的一些信息是运行时信息,并不需要保存下来。因此源码中构造了专门用于序列化addrManager的数据结构,如下所示:

// serializedAddrManager [addrmanager.go]
type serializedAddrManager struct {                                               // L64
    Version      int
    Key           [32]byte
    Addresses    []*serializedKnownAddress
    NewBuckets   [newBucketCount][]string // string is NetAddressKey
    TriedBuckets [triedBucketCount][]string
}

3.2 从peersFile中反序列化填充AddrManager变量

addrManagerStart函数中的loadPeers函数用来从peersFile中反序列化出peers的信息,并填充到addrManager中.

// Start [addrmanager.go]
func (a *AddrManager) Start() {                                          // L567
    ...
    a.loadPeers()
    ...
    go a.addressHandler()                                               // L580
}

loadPeers函数中主要做事的是deserializePeers函数,如下所示:

// Start [addrmanager.go] -> loadPeers
func (a *AddrManager) loadPeers() {                                            // L423
    ...
    err := a.deserializePeers(a.peersFile)
    ...
}

deserializePeers函数中的工作主要包括两个部分:

  1. peersFile文件中的数据反序列化成serializedAddrManager变量 (sam)
  2. sam中的AddressesNewBucketsTriedBuckets字段处理赋值给AddrManager变量 (a) 的addrIndex, addrNewaddrTried字段

deserializePeers函数代码如下所示, 其中:

  1. L444 - L456行代码将peersFile文件中的数据反序列化成sam变量,
  2. L471 - L502行代码将sam中的Addresses字段处理赋值给的addrIndex,
  3. L504 - L518将sam中的NewBuckets字段处理赋值给的addrNew,
  4. L519 - L531将sam中的TriedBuckets字段处理赋值给的addrTried
// Start [addrmanager.go] -> loadPeers -> deserializePeers
func (a *AddrManager) deserializePeers(filePath string) error {         // L442
    ...
    r, err := os.Open(filePath)                                                    // L444
    ...
    var sam serializedAddrManager
    dec := json.NewDecoder(r)
    err = dec.Decode(&sam)                                                      // L456
    ...
    for _, v := range sam.Addresses {                                           // L471
        ka := new(KnownAddress)
        ...
        ka.na, err = a.DeserializeNetAddress(v.Addr, v.Services)
        ...
        ka.srcAddr, err = a.DeserializeNetAddress(v.Src, v.SrcServices)
        ...
        a.addrIndex[NetAddressKey(ka.na)] = ka
    }                                                                                   // L502
    ...
    for i := range sam.NewBuckets {                                     // L504
        for _, val := range sam.NewBuckets[i] {
            ka, ok := a.addrIndex[val]
            ...
            a.addrNew[i][val] = ka
        }
    }                                                                                   // L518
    for i := range sam.TriedBuckets {                                   // L519
        for _, val := range sam.TriedBuckets[i] {
            ka, ok := a.addrIndex[val]
            ...
            a.addrTried[i].PushBack(ka)
        }
    }                                                                                   // L531
    ...
}

3.3 将AddrManager变量序列化存储到peersFile中

3.2小节中Start()函数的L580行启动了一个addressHandler函数的协程,该函数每隔10分钟调用一次savePeers函数,将peers信息 (sam变量) 序列化并保存到peersFile文件中。addressHandler函数和savePeers函数的代码分别如下所示:

// Start [addrmanager.go] -> addressHandler 
func (a *AddrManager) addressHandler() {                                 // L567
    ...
    for {
        select {
        case <- dumpAddressTicker.C:
            a.savePeers()
        ...
        }
    }
    ...
}
// Start [addrmanager.go] -> addressHandler -> savePeers
func (a *AddrManager) savePeers() {                                 // L361
    ...
    w, err := os.Create(a.peersFile)                                // L408
    ...
    enc := json.NewEncoder(w)                                       // L413
    ...
    enc.Encode(&sam)                                                // L415
    ...
}

4. 与其他peer建立P2P连接

第2小节的peerHandler函数中的L2093启动了一个新协程,该协程运行s.connManager.Start()代码启动了ConnManager管理器,代码如下所示:

// Start [connmanager.go]
func (cm *ConnManager) Start() {
    ...
    go cm.connHandler()                                                             // L518
    ...
    if cm.cfg.OnAccept != nil {
        for _, listener := range cm.cfg.Listeners {
            cm.wg.Add(1)
            go cm.listenerHandler(listener)                                        // L525
        }
    }
    
    for i := atomic.LoadUnit64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ {   
        go cm.NewConnReq()                                                       // L530
    }
}

Start函数中最重要的代码包括三部分:

  1. L530行主动发起与其他peer节点的连接
  2. L518行对所有的主动连接进行管理
  3. L525行被动接受其他peer节点的连接

需要进一步说明的是,1主要是完成主动连接的建立过程,2主要完成主动连接建立之后的管理。1每建立一次连接后,都需要由2完成后续的管理工作,如登记到conns变量中。

4.1 主动发起连接

节点主动发起连接的行为由NewConnReq函数完成,代码如下所示:

// Start [connmanager.go] -> NewConnReq [server.go]
func (cm *ConnManager) NewConnReq() {
    ...
    done := make(chan struct{})                                                         // L376
    select {
    case cm.requests <- registerPending{c, done}:
    case <-cm.quit:       
        return
    }
    ...
    select {
    case <-done:
    case <-cm.quit:       
    return
    }                                                                                           // L398
    
    addr, err := cm.cfg.GetNewAddress()                                             // L400
    ...
    c.Addr = addr
    
    cm.Connect(c)                                                                           // L402
}

其中需要解释的代码包括三个部分:

  1. L376-L398: 将当前连接请求登记到pending变量中,方便对该连接进行后续管理。登记过程是通过requests管道完成的,管道的另一端连接connHandler函数,后续在讲解connHandler函数的再做详细介绍;
  2. L400: 获取将要连接的peer地址,将在4.1.1节讲述
  3. L402: 完成实际的连接过程,将在4.1.2节讲述

4.1.1 获取将要连接的peer地址

由上可知,获取将要连接的peer地址是由GetNewAddress函数实现的,该函数是connmanager.go文件中config数据结构的字段. 在初始化cmgr变量时,该字段被赋值为newAddressFunc,代码如下所示:

// newServer [server.go]
func newServer(...) (*server, error) {
    ...
    cmgr, err := connmgr.New(&connmgr.Config{                                         // L2818
        ...     
        GetNewAddress:  newAddressFunc,
    })
    ...
    ...
}

进一步查看newAddressFunc函数的定义,其也定义在newServer函数中,代码如下所示:

// newServer [server.go]
func newServer(...) (*server, error) {
    ...
var newAddressFunc func() (net.Addr, error)                                             // L2773
    if !cfg.SimNet && len(cfg.ConnectPeers) == 0 {       
        newAddressFunc = func() (net.Addr, error) {              
            for tries := 0; tries < 100; tries++ {
                addr := s.addrManager.GetAddress()                                      // L2777
                ...
                addrString := addrmgr.NetAddressKey(addr.NetAddress())
                return addrStringToNetAddr(addrString)
            }
            ...
        }
    }
    ...
}

其中最重要的代码在L2777行,利用addrManagerGetAddress函数获取可用的连接地址。GetAddress函数主要在addrTriedaddrNew两个列表中随机地挑选可用的地址,这就与第3节的内容联系起来了。

4.1.2 完成实际的连接过程

实际的连接过程是由cm.Connect(c)完成的,其代码如下所示:

// Start [connmanager.go] -> NewConnReq [server.go] -> Connect [connmanager.go]
func (cm *ConnManager) Connect(c *ConnReq) {
    ...
    conn, err := cm.cfg.Dial(c.Addr)                                                    // L444
    if err != nil {       
        select {       
        case cm.requests <- handleFailed{c, err}:                                   // L447
        case <-cm.quit:      
        }       
        return
    }
    select {
    case cm.requests <- handleConnected{c, conn}:                               // L454
    case <-cm.quit:
    }
}

L444行通过调用Dial函数进行连接,该函数主要对golangnet包的Dial方法进行了一些包装。
L447L454行分别用来处理连接失败和成功的情况,具体处理过程也是通过向requests通道传递数据来完成。

 类似资料: