当前位置: 首页 > 工具软件 > btcd > 使用案例 >

btcd源码解析——节点P2P连接建立的过程 (2)

端木桐
2023-12-01

4. 与其他peer建立P2P连接

我们接着上一篇博客,继续讲解P2P主动连接的管理被动接受连接的过程。

4.2 主动连接的管理

前面介绍到Start [connmanager.go]函数中调用的connHandler函数,主要用来管理主动连接。此外,在4.1节中我们提及requests管道的接收工作就是在connHandler函数中完成的。本小节我们就来看看connHandler函数的具体细节,先看看其代码:

// Start [connmanager.go] -> connHandler
func (cm *ConnManager) connHandler() {
    ...
    var (
        ...
        pending = make(map[uint64]*ConnReq)                		  // L233
        ...
        conns = make(map[uint64]*ConnReq, cm.cfg.TargetOutbound)                    // L236
    )
    
out:
    for {       
        select {
        case req := <.cm.requests:                                // L242
            switch msg :=req.(type) {
            
            case registerPending:                                 // L245
                connReq := msg.c
                connReq.updateState(ConnPending)
                pending[msg.c.id] = connReq
                close(msg.done)
                
            case handleConnected:                                 // L251
                connReq := msg.c

                connReq.updateState(ConnEstablished)              // L254
                connReq.conn = msg.conn
                conns[connReq.id] = connReq
                log.Debugf("Connected to %v", connReq)
                connReq.retryCount = 0
                cm.failedAttempts = 0

                delete(pending, connReq.id)                       // L270

                if cm.cfg.OnConnection != nil {                   // L272
                    go cm.cfg.OnConnection(connReq, msg.conn)     // L273
                }

            case handleDisconnected:
                ...
            case handleFailed:
                ...
            }
        case <-cm.quit: 
            break out
        }
    }
    ...
}

L242行代码接收requests管道中发过来的数据,判断数据类型后,交由不同的case处理。下面主要介绍registerPendinghandleConneted两种类型的数据

  • L245行代码处理registerPending类型的信息,其主要将conn变量的状态更新后加入到pending变量中
  • L251行代码处理handleConnected类型的信息,其首先也是对conn变量的状态进行了更新,将conn加入到conns变量中,并从pending中删除;然后利用OnConnection函数进行连接后的处理。OnConnection函数也是在server.go文件的newServer中赋值的,代码如下所示:
// Start [connmanager.go] -> connHandler -> newServer [server.go]
func newServer(...) (*server, error) {
    ...
    cmgr, err := connmgr.New(&connmgr.Config{   		       // L2818
        ... 
        OnConnection:   s.outboundPeerConnected,
        ...
        GetNewAddress:  newAddressFunc,
    })
    ...
    ...
}

outboundPeerConnected函数的定义如下所示:

//  Start [connmanager.go] -> connHandler -> newServer [server.go] -> outboundPeerConnected
func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
    sp := newServerPeer(s, c.Permanent)                          // L2024
    p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())          
    ...
    sp.Peer = p
    sp.connReq = c
    sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())          // L2032
    sp.AssociateConnection(conn)                                 // L2033
    go s.peerDoneHandler(sp)                                     // L2034
    s.addrManager.Attempt(sp.NA())                               // L2035
}

L2024-L2032代码创建了一个serverPeer变量,该变量包含了一个Peer变量;L2033行代码将sp变量与conn进行了绑定,并在AssociateConnection函数中启动了Peer变量,代码如下所示:

// Start [connmanager.go] -> connHandler -> newServer [server.go] -> outboundPeerConnected -> AssociateConnection
func (p *Peer) AssociateConnection(conn net.Conn) {
    ...
    p.conn = conn                   		                    // L2118
    ...
    go func() {       
        if err := p.start(); err != nil {                       // L2137
            ...
        }
    } ()
}

L2118行代码将conn赋值给Peer变量中相应的字段,L2137行代码的start函数用来处理P2P连接中的数据传输,start函数代码如下所示:

// Start [connmanager.go] -> connHandler -> newServer [server.go] -> outboundPeerConnected -> AssociateConnection -> start [peer.go]
func (p *Peer) start() error {
    ...
    negotiateErr := make(chan error, 1)                			// L2075
    go func() {                                                                         
        if p.inbound {             
            negotiateErr <- p.negotiateInboundProtocol()		// L2078
        } else {
            negotiateErr <- p.negotiateOutboundProtocol()       // L2080
        }
    }()                                                         // L2082
    ...
    go p.stallHandler()                                         // L2099
    go p.inHandler()
    go p.queueHandler()
    go p.outHandler()
    go p.pingHandler()                       		            // L2103
    ...
}

L2075-L2082行代码主要协商双方的协议版本,判断双方版本是否兼容。L2099-L2103行代码启动了五个协程,用于发送和接收数据 (包括区块的同步、交易的发送等),具体代码细节,我们将在下一篇博客中分析。这里需要特别提一下的是negotiateInboundProtocolnegotiateOutboundProtocol函数,该函数虽然名为“协商版本函数”,但该函数完成的工作却远远不止协商版本这么简单,以下以negotiateOutboundProtocol函数为例进行简单介绍。

4.2.1 negotiateOutboundProtocol函数

negotiateOutboundProtocol函数代码如下所示:

// negotiateOutboundProtocol [peer.go]
func (p *Peer) negotiateOutboundProtocol() error {
    if err := p.writeLocalVersionMsg(); err != nil {             // L2064
        return err
    }
    
    return p.readRemoteVersionMsg()                              // L2068
}

L2064行代码用于给对方peer放松版本信息,L2068接收对方peer发来的版本信息,readRemoteVersionMsg函数代码如下:

// negotiateOutboundProtocol [peer.go] -> readRemoteVersionMsg
func (p *Peer) readRemoteVersionMsg() error {
    ...
    p.versionKnown = true                               		 // L1911
    ...
    if p.cfg.Listeners.OnVersion != nil {
        rejectMsg := p.cfg.Listeners.OnVersion(p, msg)           // L1949
        ...
    }
    ...
}

L1911行代码对versionKnown进行了赋值,该值将在4.2.2小节使用到,后文再说。L1949行代码调用OnVersion函数,该函数是MessageListeners结构中的变量,其在server.gonewPeerConfig函数中被赋值为sp.OnVersionsp.OnVersion函数的代码如下:

// negotiateOutboundProtocol [peer.go] -> readRemoteVersionMsg -> OnVersion[server.go]
func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) 
*wire.MsgReject {
    ...
    sp.server.AddPeer(sp)                                        // L501
    ...
}

L501行代码调用了AddPeer函数,该函数向管道newPeers中发送数据,代码如下所示:

// negotiateOutboundProtocol [peer.go] -> readRemoteVersionMsg -> OnVersion[server.go] -> AddPeer
func (s *server) AddPeer(sp *serverPeer) {                        // L2162
    s.newPeers <- sp
}

newPeers管道的另一端连接着server.peerHandler函数,代码如下所示:

// negotiateOutboundProtocol [peer.go] -> readRemoteVersionMsg -> OnVersion[server.go] -> AddPeer -> peerHandler
func (s *server) peerHandler() {
    ...
out:       
    for {              
        select {             
        // New peers connected to the server.              
        case p := <-s.newPeers:                     
            s.handleAddPeerMsg(state, p)                           // L2100  
        ...    
        case p := <-s.donePeers:                                   // L2103
            s.handleDonePeerMsg(state, p)
        ...
        case <-s.quit:
            ...
        }
    }
    ...
}

L2100调用handleAddPeerMsg函数,代码如下所示:

// negotiateOutboundProtocol [peer.go] -> readRemoteVersionMsg -> OnVersion[server.go] -> AddPeer -> peerHandler -> handleAddPeerMsg
func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool {
    ...
    if sp.Inbound() {                                             // L1643
        state.inboundPeers[sp.ID()] = sp
    } else {       
        state.outboundGroups[addrmgr.GroupKey(sp.NA())]++       
        if sp.persistent {              
            state.persistentPeers[sp.ID()] = sp       
        } else {              
            state.outboundPeers[sp.ID()] = sp       
        }
    }                                                             // L1652
    ...
}

该函数在L1643-L1652行代码将peer变量(sp)登记到state状态中,便于后面的进一步管理,如在断开连接时清理peer相关数据。

4.2.2 peerDoneHandler函数

回到outboundPeerConnected函数中,L2034行代码调用了peerDoneHandler函数。该函数主要用于在peer断开连接时利用管道发出一些信号,代码如下所示:

// Start [connmanager.go] -> connHandler -> newServer [server.go] -> outboundPeerConnected -> AssociateConnection -> peerDoneHandler
func (s *server) peerDoneHandler(sp *serverPeer) {
    sp.WaitForDisconnect()                                       // L2041
    s.donePeers <- sp                                            // L2042

    ...
    if sp.VersionKnown() {                                       // L2045
        s.syncManager.DonePeer(sp.Peer)                          // L2046
        ...
    }
    close(sp.quit)                             	                 // L2056
}

L2041行代码等待关闭连接的信号,L2042通过donePeers管道向server发送信号,L2056行通过quit管道向server发送信号,这两个管道的另一端也都连接着server.peerHandler函数,如4.2.1小节所示。
此外,L2045spversionKnown字段进行了判断,该字段在readRemoteVersionMsg函数的L1911行被赋值为true. L2046syncManager发送信号,通知syncManager停止数据的同步。

4.3 被动接受连接

下面我们介绍节点被动接受连接的过程。
我们再将Start函数中被动接收连接相关的代码贴在这儿:

// Start [connmanager.go]
func (cm *ConnManager) Start() {
    ...
    if cm.cfg.OnAccept != nil {                                  // L522
        for _, listener := range cm.cfg.Listeners {
            cm.wg.Add(1)
            go cm.listenerHandler(listener)                      // L525
        }
    }
   ...
}

L522首先判断OnAccept字段是否为nil. OnAccept字段也是在server.go文件的newServer中赋值的,代码如下所示:

// newServer [server.go]
func newServer(...) (*server, error) {
    ...
    cmgr, err := connmgr.New(&connmgr.Config{                    // L2818
        ... 
        OnAccept:       s.inboundPeerConnected,
        ...
        GetNewAddress:  newAddressFunc,
    })
    ...
}

回到Start函数中L525行的代码,继续看listenerHandler函数,其代码如下所示:

// Start [connmanager.go] -> listenHandler
func (cm *ConnManager) listenHandler(listener net.Listener) {       
    ...
    for atomic.LoadInt32(&cm.stop) == 0 {                                    
        conn, err := listener.Accept()                          // L494
        ...
        go cm.cfg.OnAccept(conn)                                // L502
    }       
    ...
}

L494行通过调用golangnet包的Accept方法接收连接请求,并将该请求作为参数传递给OnAccept函数。前面已经介绍过,OnAccept字段被赋值为inboundPeerConnected函数,后者的代码如下所示:

// inboundPeerConnected [server.go]
func (s *server) inboundPeerConnected(conn net.Conn) {      
    sp := newServerPeer(s, false)                              // L2011
    sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())       
    sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))           // L2013
    sp.AssociateConnection(conn)                               // L2014
    go s.peerDoneHandler(sp)        	                       // L2015
}

inboundPeerConnected函数的代码和outboundPeerConnected函数的代码大同小异,此处不再赘述。

5. 总结

至此,我们完成了P2P连接建立的源码解析。
其主要分为两个部分:peer地址的管理和peer连接的建立。
其中peer连接建立又可分为两种:主动发起连接和被动接收连接。
被动接受连接的代码比较简单。只需要listenerHandler一个协程就可以完成;主动发起连接的代码相对复杂,需要NewConnReqconnHandler两个协程来完成。
在此,我们没有介绍P2P连接的断开过程,这部分代码后面有机会再来分析。

 类似资料: