我们接着上一篇博客,继续讲解P2P
主动连接的管理和被动接受连接的过程。
前面介绍到Start [connmanager.go]
函数中调用的connHandler
函数,主要用来管理主动连接。此外,在4.1节中我们提及requests
管道的接收工作就是在connHandler
函数中完成的。本小节我们就来看看connHandler
函数的具体细节,先看看其代码:
// Start [connmanager.go] -> connHandler
func (cm *ConnManager) connHandler() {
...
var (
...
pending = make(map[uint64]*ConnReq) // L233
...
conns = make(map[uint64]*ConnReq, cm.cfg.TargetOutbound) // L236
)
out:
for {
select {
case req := <.cm.requests: // L242
switch msg :=req.(type) {
case registerPending: // L245
connReq := msg.c
connReq.updateState(ConnPending)
pending[msg.c.id] = connReq
close(msg.done)
case handleConnected: // L251
connReq := msg.c
connReq.updateState(ConnEstablished) // L254
connReq.conn = msg.conn
conns[connReq.id] = connReq
log.Debugf("Connected to %v", connReq)
connReq.retryCount = 0
cm.failedAttempts = 0
delete(pending, connReq.id) // L270
if cm.cfg.OnConnection != nil { // L272
go cm.cfg.OnConnection(connReq, msg.conn) // L273
}
case handleDisconnected:
...
case handleFailed:
...
}
case <-cm.quit:
break out
}
}
...
}
L242行代码接收requests
管道中发过来的数据,判断数据类型后,交由不同的case
处理。下面主要介绍registerPending
和handleConneted
两种类型的数据
registerPending
类型的信息,其主要将conn
变量的状态更新后加入到pending
变量中handleConnected
类型的信息,其首先也是对conn
变量的状态进行了更新,将conn
加入到conns
变量中,并从pending
中删除;然后利用OnConnection
函数进行连接后的处理。OnConnection
函数也是在server.go
文件的newServer
中赋值的,代码如下所示:// Start [connmanager.go] -> connHandler -> newServer [server.go]
func newServer(...) (*server, error) {
...
cmgr, err := connmgr.New(&connmgr.Config{ // L2818
...
OnConnection: s.outboundPeerConnected,
...
GetNewAddress: newAddressFunc,
})
...
...
}
outboundPeerConnected
函数的定义如下所示:
// Start [connmanager.go] -> connHandler -> newServer [server.go] -> outboundPeerConnected
func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
sp := newServerPeer(s, c.Permanent) // L2024
p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())
...
sp.Peer = p
sp.connReq = c
sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) // L2032
sp.AssociateConnection(conn) // L2033
go s.peerDoneHandler(sp) // L2034
s.addrManager.Attempt(sp.NA()) // L2035
}
L2024-L2032代码创建了一个serverPeer
变量,该变量包含了一个Peer
变量;L2033行代码将sp
变量与conn
进行了绑定,并在AssociateConnection
函数中启动了Peer
变量,代码如下所示:
// Start [connmanager.go] -> connHandler -> newServer [server.go] -> outboundPeerConnected -> AssociateConnection
func (p *Peer) AssociateConnection(conn net.Conn) {
...
p.conn = conn // L2118
...
go func() {
if err := p.start(); err != nil { // L2137
...
}
} ()
}
L2118行代码将conn
赋值给Peer
变量中相应的字段,L2137行代码的start
函数用来处理P2P
连接中的数据传输,start
函数代码如下所示:
// Start [connmanager.go] -> connHandler -> newServer [server.go] -> outboundPeerConnected -> AssociateConnection -> start [peer.go]
func (p *Peer) start() error {
...
negotiateErr := make(chan error, 1) // L2075
go func() {
if p.inbound {
negotiateErr <- p.negotiateInboundProtocol() // L2078
} else {
negotiateErr <- p.negotiateOutboundProtocol() // L2080
}
}() // L2082
...
go p.stallHandler() // L2099
go p.inHandler()
go p.queueHandler()
go p.outHandler()
go p.pingHandler() // L2103
...
}
L2075-L2082行代码主要协商双方的协议版本,判断双方版本是否兼容。L2099-L2103行代码启动了五个协程,用于发送和接收数据 (包括区块的同步、交易的发送等),具体代码细节,我们将在下一篇博客中分析。这里需要特别提一下的是negotiateInboundProtocol
和negotiateOutboundProtocol
函数,该函数虽然名为“协商版本函数”,但该函数完成的工作却远远不止协商版本这么简单,以下以negotiateOutboundProtocol
函数为例进行简单介绍。
negotiateOutboundProtocol函数代码如下所示:
// negotiateOutboundProtocol [peer.go]
func (p *Peer) negotiateOutboundProtocol() error {
if err := p.writeLocalVersionMsg(); err != nil { // L2064
return err
}
return p.readRemoteVersionMsg() // L2068
}
L2064行代码用于给对方peer
放松版本信息,L2068接收对方peer
发来的版本信息,readRemoteVersionMsg
函数代码如下:
// negotiateOutboundProtocol [peer.go] -> readRemoteVersionMsg
func (p *Peer) readRemoteVersionMsg() error {
...
p.versionKnown = true // L1911
...
if p.cfg.Listeners.OnVersion != nil {
rejectMsg := p.cfg.Listeners.OnVersion(p, msg) // L1949
...
}
...
}
L1911行代码对versionKnown
进行了赋值,该值将在4.2.2小节使用到,后文再说。L1949行代码调用OnVersion
函数,该函数是MessageListeners
结构中的变量,其在server.go
的newPeerConfig
函数中被赋值为sp.OnVersion
,sp.OnVersion
函数的代码如下:
// negotiateOutboundProtocol [peer.go] -> readRemoteVersionMsg -> OnVersion[server.go]
func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion)
*wire.MsgReject {
...
sp.server.AddPeer(sp) // L501
...
}
L501行代码调用了AddPeer
函数,该函数向管道newPeers
中发送数据,代码如下所示:
// negotiateOutboundProtocol [peer.go] -> readRemoteVersionMsg -> OnVersion[server.go] -> AddPeer
func (s *server) AddPeer(sp *serverPeer) { // L2162
s.newPeers <- sp
}
newPeers
管道的另一端连接着server.peerHandler
函数,代码如下所示:
// negotiateOutboundProtocol [peer.go] -> readRemoteVersionMsg -> OnVersion[server.go] -> AddPeer -> peerHandler
func (s *server) peerHandler() {
...
out:
for {
select {
// New peers connected to the server.
case p := <-s.newPeers:
s.handleAddPeerMsg(state, p) // L2100
...
case p := <-s.donePeers: // L2103
s.handleDonePeerMsg(state, p)
...
case <-s.quit:
...
}
}
...
}
L2100调用handleAddPeerMsg
函数,代码如下所示:
// negotiateOutboundProtocol [peer.go] -> readRemoteVersionMsg -> OnVersion[server.go] -> AddPeer -> peerHandler -> handleAddPeerMsg
func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool {
...
if sp.Inbound() { // L1643
state.inboundPeers[sp.ID()] = sp
} else {
state.outboundGroups[addrmgr.GroupKey(sp.NA())]++
if sp.persistent {
state.persistentPeers[sp.ID()] = sp
} else {
state.outboundPeers[sp.ID()] = sp
}
} // L1652
...
}
该函数在L1643-L1652行代码将peer
变量(sp
)登记到state
状态中,便于后面的进一步管理,如在断开连接时清理peer
相关数据。
回到outboundPeerConnected
函数中,L2034行代码调用了peerDoneHandler
函数。该函数主要用于在peer
断开连接时利用管道发出一些信号,代码如下所示:
// Start [connmanager.go] -> connHandler -> newServer [server.go] -> outboundPeerConnected -> AssociateConnection -> peerDoneHandler
func (s *server) peerDoneHandler(sp *serverPeer) {
sp.WaitForDisconnect() // L2041
s.donePeers <- sp // L2042
...
if sp.VersionKnown() { // L2045
s.syncManager.DonePeer(sp.Peer) // L2046
...
}
close(sp.quit) // L2056
}
L2041行代码等待关闭连接的信号,L2042通过donePeers
管道向server
发送信号,L2056行通过quit
管道向server
发送信号,这两个管道的另一端也都连接着server.peerHandler
函数,如4.2.1小节所示。
此外,L2045对sp
中versionKnown
字段进行了判断,该字段在readRemoteVersionMsg
函数的L1911行被赋值为true
. L2046向syncManager
发送信号,通知syncManager
停止数据的同步。
下面我们介绍节点被动接受连接的过程。
我们再将Start
函数中被动接收连接相关的代码贴在这儿:
// Start [connmanager.go]
func (cm *ConnManager) Start() {
...
if cm.cfg.OnAccept != nil { // L522
for _, listener := range cm.cfg.Listeners {
cm.wg.Add(1)
go cm.listenerHandler(listener) // L525
}
}
...
}
L522首先判断OnAccept
字段是否为nil
. OnAccept
字段也是在server.go
文件的newServer
中赋值的,代码如下所示:
// newServer [server.go]
func newServer(...) (*server, error) {
...
cmgr, err := connmgr.New(&connmgr.Config{ // L2818
...
OnAccept: s.inboundPeerConnected,
...
GetNewAddress: newAddressFunc,
})
...
}
回到Start
函数中L525行的代码,继续看listenerHandler
函数,其代码如下所示:
// Start [connmanager.go] -> listenHandler
func (cm *ConnManager) listenHandler(listener net.Listener) {
...
for atomic.LoadInt32(&cm.stop) == 0 {
conn, err := listener.Accept() // L494
...
go cm.cfg.OnAccept(conn) // L502
}
...
}
L494行通过调用golang
中net
包的Accept
方法接收连接请求,并将该请求作为参数传递给OnAccept
函数。前面已经介绍过,OnAccept
字段被赋值为inboundPeerConnected
函数,后者的代码如下所示:
// inboundPeerConnected [server.go]
func (s *server) inboundPeerConnected(conn net.Conn) {
sp := newServerPeer(s, false) // L2011
sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
sp.Peer = peer.NewInboundPeer(newPeerConfig(sp)) // L2013
sp.AssociateConnection(conn) // L2014
go s.peerDoneHandler(sp) // L2015
}
inboundPeerConnected
函数的代码和outboundPeerConnected
函数的代码大同小异,此处不再赘述。
至此,我们完成了P2P
连接建立的源码解析。
其主要分为两个部分:peer
地址的管理和peer
连接的建立。
其中peer
连接建立又可分为两种:主动发起连接和被动接收连接。
被动接受连接的代码比较简单。只需要listenerHandler
一个协程就可以完成;主动发起连接的代码相对复杂,需要NewConnReq
和connHandler
两个协程来完成。
在此,我们没有介绍P2P
连接的断开过程,这部分代码后面有机会再来分析。