从这一篇博客开始,我们将介绍btcd
节点之间P2P
连接建立的过程。考虑到内容太长,分为上下两篇博客来讲解。这两篇博客之后,我们将继续介绍节点之间数据的同步过程。
源码解析是基于btcd仓库c26ffa870fd817666a857af1bf6498fabba1ffe3
的commit id
版本。
btcd
节点的启动主要由btcd.go
文件中的btcdMain
函数完成,其中P2P
的连接过程又是由 server.Start()
代码完成,如下所示:
// btcdMain [btcd.go]
func btcdMain(serverChan chan<- *server) error { // L43
...
server, err := newServer(...) // L149
...
server.Start() // L162
...
}
Start
函数中和P2P
连接相关的部分在go s.peerHandler()
完成,如下代码所示:
// btcdMain [btcd.go] -> Start [server.go]
func (s *server) Start() { // L2291
...
go s.peerHandler() // L2305
...
}
在比特币P2P
连接的语境中,一个节点就是一个peer
. peerHandler
函数包含三个最关键的启动工作:
addrManager
,进行peer
地址的管理syncManager
,进行peer
之间数据的同步connManager
,进行peer
之间连接的管理代码如下所示:
// btcdMain [btcd.go] -> Start [server.go] -> peerHandler
func (s *server) peerHandler() { // L2062
...
s.addrManager.Start() // L2068
s.syncManager.Start() // L2069
...
go s.connManager.Start() // L2093
...
}
其中跟P2P
连接相关的主要是addrManager
和connManager
的启动。具体而言:
addrManager
负责对其他peer
地址的管理,主要是一些本地的工作,不涉及直接的网络连接或传输;connManager
则主要负责与其他peer
建立P2P
连接,建立连接是需要对方peer
的地址,这便依赖于addrManager
中管理的地址。P2P
连接中peer
地址的管理主要由addrManager
完成,addrManager
变量中包含了各种用于地址管理的信息,其数据结构如下所示:
// AddrManager [addrmanager.go]
type AddrManager struct { // L32
...
peersFile string // L34
...
addrIndex map[string]*KnownAddress // L38
addrNew [newBucketCount]map[string]*KnownAddress
addrTried [newBucketCount]map[string]*KnownAddress
...
}
其中最重要的四个字段是peersFile
, addrIndex
, addrNew
, 和addrTried
:
peersFile
对应于一个文件名,该文件主要保存序列化后的addrManager
,用于节点重启时能快速建立连接。该文件路径名默认为$data-dir/data/mainnet/peers.json
addrIndex
缓存所有KnownAddress
的map
addrNew
缓存所有新地址的map slice
addrTried
缓存所有已经尝试连接过的地址的list slice
准确来说,peersFile
中保存的并不是直接序列化后的addrManager
,因为addrManager
中的一些信息是运行时信息,并不需要保存下来。因此源码中构造了专门用于序列化addrManager
的数据结构,如下所示:
// serializedAddrManager [addrmanager.go]
type serializedAddrManager struct { // L64
Version int
Key [32]byte
Addresses []*serializedKnownAddress
NewBuckets [newBucketCount][]string // string is NetAddressKey
TriedBuckets [triedBucketCount][]string
}
addrManager
的Start
函数中的loadPeers
函数用来从peersFile
中反序列化出peers
的信息,并填充到addrManager
中.
// Start [addrmanager.go]
func (a *AddrManager) Start() { // L567
...
a.loadPeers()
...
go a.addressHandler() // L580
}
loadPeers
函数中主要做事的是deserializePeers
函数,如下所示:
// Start [addrmanager.go] -> loadPeers
func (a *AddrManager) loadPeers() { // L423
...
err := a.deserializePeers(a.peersFile)
...
}
deserializePeers
函数中的工作主要包括两个部分:
peersFile
文件中的数据反序列化成serializedAddrManager
变量 (sam
)sam
中的Addresses
,NewBuckets
和TriedBuckets
字段处理赋值给AddrManager
变量 (a
) 的addrIndex
, addrNew
和addrTried
字段deserializePeers
函数代码如下所示, 其中:
peersFile
文件中的数据反序列化成sam
变量,sam
中的Addresses
字段处理赋值给的addrIndex
,sam
中的NewBuckets
字段处理赋值给的addrNew
,sam
中的TriedBuckets
字段处理赋值给的addrTried
// Start [addrmanager.go] -> loadPeers -> deserializePeers
func (a *AddrManager) deserializePeers(filePath string) error { // L442
...
r, err := os.Open(filePath) // L444
...
var sam serializedAddrManager
dec := json.NewDecoder(r)
err = dec.Decode(&sam) // L456
...
for _, v := range sam.Addresses { // L471
ka := new(KnownAddress)
...
ka.na, err = a.DeserializeNetAddress(v.Addr, v.Services)
...
ka.srcAddr, err = a.DeserializeNetAddress(v.Src, v.SrcServices)
...
a.addrIndex[NetAddressKey(ka.na)] = ka
} // L502
...
for i := range sam.NewBuckets { // L504
for _, val := range sam.NewBuckets[i] {
ka, ok := a.addrIndex[val]
...
a.addrNew[i][val] = ka
}
} // L518
for i := range sam.TriedBuckets { // L519
for _, val := range sam.TriedBuckets[i] {
ka, ok := a.addrIndex[val]
...
a.addrTried[i].PushBack(ka)
}
} // L531
...
}
3.2小节中Start()
函数的L580行启动了一个addressHandler
函数的协程,该函数每隔10分钟调用一次savePeers
函数,将peers
信息 (sam
变量) 序列化并保存到peersFile
文件中。addressHandler
函数和savePeers
函数的代码分别如下所示:
// Start [addrmanager.go] -> addressHandler
func (a *AddrManager) addressHandler() { // L567
...
for {
select {
case <- dumpAddressTicker.C:
a.savePeers()
...
}
}
...
}
// Start [addrmanager.go] -> addressHandler -> savePeers
func (a *AddrManager) savePeers() { // L361
...
w, err := os.Create(a.peersFile) // L408
...
enc := json.NewEncoder(w) // L413
...
enc.Encode(&sam) // L415
...
}
第2小节的peerHandler
函数中的L2093启动了一个新协程,该协程运行s.connManager.Start()
代码启动了ConnManager
管理器,代码如下所示:
// Start [connmanager.go]
func (cm *ConnManager) Start() {
...
go cm.connHandler() // L518
...
if cm.cfg.OnAccept != nil {
for _, listener := range cm.cfg.Listeners {
cm.wg.Add(1)
go cm.listenerHandler(listener) // L525
}
}
for i := atomic.LoadUnit64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ {
go cm.NewConnReq() // L530
}
}
Start
函数中最重要的代码包括三部分:
peer
节点的连接peer
节点的连接需要进一步说明的是,1主要是完成主动连接的建立过程,2主要完成主动连接建立之后的管理。1每建立一次连接后,都需要由2完成后续的管理工作,如登记到conns
变量中。
节点主动发起连接的行为由NewConnReq
函数完成,代码如下所示:
// Start [connmanager.go] -> NewConnReq [server.go]
func (cm *ConnManager) NewConnReq() {
...
done := make(chan struct{}) // L376
select {
case cm.requests <- registerPending{c, done}:
case <-cm.quit:
return
}
...
select {
case <-done:
case <-cm.quit:
return
} // L398
addr, err := cm.cfg.GetNewAddress() // L400
...
c.Addr = addr
cm.Connect(c) // L402
}
其中需要解释的代码包括三个部分:
pending
变量中,方便对该连接进行后续管理。登记过程是通过requests
管道完成的,管道的另一端连接connHandler
函数,后续在讲解connHandler
函数的再做详细介绍;由上可知,获取将要连接的peer
地址是由GetNewAddress
函数实现的,该函数是connmanager.go
文件中config
数据结构的字段. 在初始化cmg
r变量时,该字段被赋值为newAddressFunc
,代码如下所示:
// newServer [server.go]
func newServer(...) (*server, error) {
...
cmgr, err := connmgr.New(&connmgr.Config{ // L2818
...
GetNewAddress: newAddressFunc,
})
...
...
}
进一步查看newAddressFunc
函数的定义,其也定义在newServer
函数中,代码如下所示:
// newServer [server.go]
func newServer(...) (*server, error) {
...
var newAddressFunc func() (net.Addr, error) // L2773
if !cfg.SimNet && len(cfg.ConnectPeers) == 0 {
newAddressFunc = func() (net.Addr, error) {
for tries := 0; tries < 100; tries++ {
addr := s.addrManager.GetAddress() // L2777
...
addrString := addrmgr.NetAddressKey(addr.NetAddress())
return addrStringToNetAddr(addrString)
}
...
}
}
...
}
其中最重要的代码在L2777行,利用addrManager
的GetAddress
函数获取可用的连接地址。GetAddress
函数主要在addrTried
和addrNew
两个列表中随机地挑选可用的地址,这就与第3节的内容联系起来了。
实际的连接过程是由cm.Connect(c)
完成的,其代码如下所示:
// Start [connmanager.go] -> NewConnReq [server.go] -> Connect [connmanager.go]
func (cm *ConnManager) Connect(c *ConnReq) {
...
conn, err := cm.cfg.Dial(c.Addr) // L444
if err != nil {
select {
case cm.requests <- handleFailed{c, err}: // L447
case <-cm.quit:
}
return
}
select {
case cm.requests <- handleConnected{c, conn}: // L454
case <-cm.quit:
}
}
L444行通过调用Dial
函数进行连接,该函数主要对golang
中net
包的Dial
方法进行了一些包装。
L447和L454行分别用来处理连接失败和成功的情况,具体处理过程也是通过向requests
通道传递数据来完成。