btcd首先从内存池取得交易,用交易填充区块,并往区块上填入必要的信息。随后区块进行POW计算。当计算符合难度值的区块哈希后,btcd对新区块进行最后一步的验证,将新区块连接到本地的主链上,并广播这个新区块给对等方。
为了了解交易是怎么被打包到区块,最后发布的,首先从查看相关的RPC调用。
getblocktemplate ( "template_request" )
其描述如下:
If the request parameters include a ‘mode’ key, that is used to explicitly select between the default ‘template’ request or a ‘proposal’.
It returns data needed to construct a block to work on.
getblocktemplate RPC
将返回构造一个区块所需要的数据。
generateblock "output" ["rawtx/txid",...]
其描述如下:
Mine a block with a set of ordered transactions immediately to a specified address or descriptor (before the RPC call returns)
generateblock RPC
用给定的排好序的交易挖矿,需要指定受益人的地址。
submitblock "hexdata" ( "dummy" )
其描述如下:
Attempts to submit new block to network.
尝试提交一个新的区块到网络中。
Set the server to generate coins (mine) or not.
NOTE: Since btcd does not have the wallet integrated to provide payment addresses, btcd must be configured via the --miningaddr option to provide which payment addresses to pay created blocks to for this RPC to function.
setgenerate设置server是否开启mining。如果开启,服务器将动用CPU mining。需要用–miningaddr指定受益人地址。
有了这三个RPC,我们可以大概确定从打包交易到发布区块的流程。
首先通过getblocktemplate RPC
得到组装一个区块需要的信息,然后通过generateblock RPC
产生一个区块,最后通过submitblock RPC
将区块发布到网络上。
也可以通过setgenerate RPC
设置服务器开启mining,在服务器上完成区块的生成、挖矿、提交。
以下选取getblocktemplate RPC
和setgenerate RPC
进行分析。
从Client
发出RPC开始。该RPC位于rpcclient/mining.go
,其代码如下:
// GetBlockTemplate returns a new block template for mining.
func (c *Client) GetBlockTemplate(req *btcjson.TemplateRequest) (*btcjson.GetBlockTemplateResult, error) {
return c.GetBlockTemplateAsync(req).Receive()
}
它实际调用了异步版本GetBlockTemplateAsync
,并调用其Receive
方法阻塞等待结果返回:
// GetBlockTemplateAsync returns an instance of a type that can be used to get the
// result of the RPC at some future time by invoking the Receive function on the
// returned instance.
//
// See GetBlockTemplate for the blocking version and more details.
func (c *Client) GetBlockTemplateAsync(req *btcjson.TemplateRequest) FutureGetBlockTemplateResponse {
cmd := btcjson.NewGetBlockTemplateCmd(req)
return c.SendCmd(cmd)
}
调用了Client
的SendCmd
方法:
// SendCmd sends the passed command to the associated server and returns a
// response channel on which the reply will be delivered at some point in the
// future. It handles both websocket and HTTP POST mode depending on the
// configuration of the client.
func (c *Client) SendCmd(cmd interface{}) chan *Response {
...
responseChan := make(chan *Response, 1)
jReq := &jsonRequest{
id: id,
method: method,
cmd: cmd,
marshalledJSON: marshalledJSON,
responseChan: responseChan,
}
c.sendRequest(jReq)
return responseChan
}
该方法声明了一个容量为1的回复用的通道,说明调用者GetBlockTemplateAsync
不会阻塞在等待该方法返回结果。
SendCmd
进一步调用了Client
的sendRequest
方法,之后就是用POST方法将请求发到服务器上。
在服务器上,RPC的处理方法位于rpcserver.go
,处理方法为handleGetBlockTemplate
,其代码如下:
// handleGetBlockTemplate implements the getblocktemplate command.
func handleGetBlockTemplate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.GetBlockTemplateCmd)
request := c.Request
// Set the default mode and override it if supplied.
mode := "template"
if request != nil && request.Mode != "" {
mode = request.Mode
}
switch mode {
case "template":
return handleGetBlockTemplateRequest(s, request, closeChan)
case "proposal":
return handleGetBlockTemplateProposal(s, request)
}
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCInvalidParameter,
Message: "Invalid mode",
}
}
该方法先判断请求的模式mode
是什么,默认为template
。以template
为例,它调用了handleGetBlockTemplateRequest
方法,其代码如下:
func handleGetBlockTemplateRequest(s *rpcServer, request *btcjson.TemplateRequest, closeChan <-chan struct{}) (interface{}, error) {
...
// Get and return a block template. A new block template will be
// generated when the current best block has changed or the transactions
// in the memory pool have been updated and it has been at least five
// seconds since the last template was generated. Otherwise, the
// timestamp for the existing block template is updated (and possibly
// the difficulty on testnet per the consesus rules).
if err := state.updateBlockTemplate(s, useCoinbaseValue); err != nil {
return nil, err
}
return state.blockTemplateResult(useCoinbaseValue, nil)
}
该方法调用了blockTemplateResult
方法。查看该方法可知blockTemplateResult
返回了目前和state
关联的区块模板。
首先看state
是什么,其结构体定义如下:
// gbtWorkState houses state that is used in between multiple RPC invocations to
// getblocktemplate.
type gbtWorkState struct {
sync.Mutex
lastTxUpdate time.Time
lastGenerated time.Time
prevHash *chainhash.Hash
minTimestamp time.Time
template *mining.BlockTemplate
notifyMap map[chainhash.Hash]map[int64]chan struct{}
timeSource blockchain.MedianTimeSource
}
gbtWorkState
保存了多个getblocktemplate
RPC调用请求间的状态。其中BlockTemplate
域保存了我们需要的区块模板。那么这个template *mining.BlockTemplate
是什么时候设置的呢?其实在handleGetBlockTemplateRequest
中,调用的updateBlockTemplate
方法设置了这个template
:
func (state *gbtWorkState) updateBlockTemplate(s *rpcServer, useCoinbaseValue bool) error {
blkTemplate, err := generator.NewBlockTemplate(payAddr)
...
template = blkTemplate
...
state.template = template
...
}
NewBlockTemplate
方法则真正创建了一个新的区块模板。该区块模板使用来自内存池的交易创建区块。其中,传入的payToAddress
用于创建coinbase交易。
选择和包含的交易根据几个因素进行优先级排序:
仅使用块链中已有的其他交易的输出的交易将立即添加到优先级队列中,该队列根据优先级(然后是每千字节的费用)或每千字节的费用(然后是优先级)进行优先级排序取决于BlockPrioritySize
策略设置是否为高优先级交易分配空间。使用源池中其他交易的输出的交易将添加到依赖关系映射中,以便在包含它们所依赖的交易后,可以将它们添加到优先级队列中。一旦高优先级区域(如果已配置)已填满交易,或者优先级低于高优先级,优先级队列将更新为按每千字节的费用(然后是优先级)排列优先级。当每千字节的费用低于TxMinFreeFee
策略设置时,将跳过交易,除非BlockMinSize
策略设置为非零,在这种情况下,块将填充低费用/免费交易,直到块大小达到该最小大小。跳过任何会导致块超过BlockMaxSize
策略设置、超过每个块允许的最大签名操作数或以其他方式导致块无效的交易。
至此,NewBlockTemplate
方法返回了一个新的区块模板:
return &BlockTemplate{
Block: &msgBlock,
Fees: txFees,
SigOpCosts: txSigOpCosts,
Height: nextBlockHeight,
ValidPayAddress: payToAddress != nil,
WitnessCommitment: witnessCommitment,
}, nil
前面说到,setgenerate设置server是否开启挖矿。如果开启,服务器将动用CPU挖矿。需要用–miningaddr指定受益人地址。
首先查看处理setgennerate的方法handleSetGenerate
:
// handleSetGenerate implements the setgenerate command.
func handleSetGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
...
// It's safe to call start even if it's already started.
s.cfg.CPUMiner.SetNumWorkers(int32(genProcLimit))
s.cfg.CPUMiner.Start()
...
}
handleSetGenerate
先设置了挖矿的worker数量,然后调用CPUMiner
的Start
方法开始挖矿。继续查看Start
方法:
// Start begins the CPU mining process as well as the speed monitor used to
// track hashing metrics. Calling this function when the CPU miner has
// already been started will have no effect.
//
// This function is safe for concurrent access.
func (m *CPUMiner) Start() {
m.Lock()
defer m.Unlock()
// Nothing to do if the miner is already running or if running in
// discrete mode (using GenerateNBlocks).
if m.started || m.discreteMining {
return
}
m.quit = make(chan struct{})
m.speedMonitorQuit = make(chan struct{})
m.wg.Add(2)
go m.speedMonitor()
go m.miningWorkerController()
m.started = true
log.Infof("CPU miner started")
}
Start
方法开启了一个速度监控speedMonitor
协程和一个工作线程控制器miningWorkerController
协程。
这里主要看miningWorkerController
的实现:
// miningWorkerController launches the worker goroutines that are used to
// generate block templates and solve them. It also provides the ability to
// dynamically adjust the number of running worker goroutines.
func (m *CPUMiner) miningWorkerController() {
// launchWorkers groups common code to launch a specified number of
// workers for generating blocks.
var runningWorkers []chan struct{}
launchWorkers := func(numWorkers uint32) {
for i := uint32(0); i < numWorkers; i++ {
quit := make(chan struct{})
runningWorkers = append(runningWorkers, quit)
m.workerWg.Add(1)
go m.generateBlocks(quit)
}
}
// Launch the current number of workers by default.
runningWorkers = make([]chan struct{}, 0, m.numWorkers)
launchWorkers(m.numWorkers)
out:
for {
select {
// Update the number of running workers.
case <-m.updateNumWorkers:
// No change.
numRunning := uint32(len(runningWorkers))
if m.numWorkers == numRunning {
continue
}
// Add new workers.
if m.numWorkers > numRunning {
launchWorkers(m.numWorkers - numRunning)
continue
}
// Signal the most recently created goroutines to exit.
for i := numRunning - 1; i >= m.numWorkers; i-- {
close(runningWorkers[i])
runningWorkers[i] = nil
runningWorkers = runningWorkers[:i]
}
case <-m.quit:
for _, quit := range runningWorkers {
close(quit)
}
break out
}
}
// Wait until all workers shut down to stop the speed monitor since
// they rely on being able to send updates to it.
m.workerWg.Wait()
close(m.speedMonitorQuit)
m.wg.Done()
}
在for
循环中,通过go m.generateBlocks(quit)
开启了各个worker协程。随后miningWorkerController
监控挖矿worker数有没有变化,对挖矿worker数进行调整,并监听退出消息,进而退出挖矿。
generateBlocks
就是所谓的worker,继续查看generateBlocks
方法是怎么挖矿的:
// generateBlocks is a worker that is controlled by the miningWorkerController.
// It is self contained in that it creates block templates and attempts to solve
// them while detecting when it is performing stale work and reacting
// accordingly by generating a new block template. When a block is solved, it
// is submitted.
//
// It must be run as a goroutine.
func (m *CPUMiner) generateBlocks(quit chan struct{}) {
log.Tracef("Starting generate blocks worker")
// Start a ticker which is used to signal checks for stale work and
// updates to the speed monitor.
ticker := time.NewTicker(time.Second * hashUpdateSecs)
defer ticker.Stop()
out:
for {
// Quit when the miner is stopped.
select {
case <-quit:
break out
default:
// Non-blocking select to fall through
}
// Wait until there is a connection to at least one other peer
// since there is no way to relay a found block or receive
// transactions to work on when there are no connected peers.
if m.cfg.ConnectedCount() == 0 {
time.Sleep(time.Second)
continue
}
// No point in searching for a solution before the chain is
// synced. Also, grab the same lock as used for block
// submission, since the current block will be changing and
// this would otherwise end up building a new block template on
// a block that is in the process of becoming stale.
m.submitBlockLock.Lock()
curHeight := m.g.BestSnapshot().Height
if curHeight != 0 && !m.cfg.IsCurrent() {
m.submitBlockLock.Unlock()
time.Sleep(time.Second)
continue
}
// Choose a payment address at random.
rand.Seed(time.Now().UnixNano())
payToAddr := m.cfg.MiningAddrs[rand.Intn(len(m.cfg.MiningAddrs))]
// Create a new block template using the available transactions
// in the memory pool as a source of transactions to potentially
// include in the block.
template, err := m.g.NewBlockTemplate(payToAddr)
m.submitBlockLock.Unlock()
if err != nil {
errStr := fmt.Sprintf("Failed to create new block "+
"template: %v", err)
log.Errorf(errStr)
continue
}
// Attempt to solve the block. The function will exit early
// with false when conditions that trigger a stale block, so
// a new block template can be generated. When the return is
// true a solution was found, so submit the solved block.
if m.solveBlock(template.Block, curHeight+1, ticker, quit) {
block := btcutil.NewBlock(template.Block)
m.submitBlock(block)
}
}
m.workerWg.Done()
log.Tracef("Generate blocks worker done")
}
generateBlocks
做了以下这些事情:
quit
通道,如果该通道被关闭,则worker退出;NewBlockTemplate
获取一个区块模板;solveBlock
来求解符合难度的nonce
值;nonce
后,调用NewBlock
组装区块,调用submitBlock
提交区块。solveBlock
是真正在做POW的地方,其代码如下:
// solveBlock attempts to find some combination of a nonce, extra nonce, and
// current timestamp which makes the passed block hash to a value less than the
// target difficulty. The timestamp is updated periodically and the passed
// block is modified with all tweaks during this process. This means that
// when the function returns true, the block is ready for submission.
//
// This function will return early with false when conditions that trigger a
// stale block such as a new block showing up or periodically when there are
// new transactions and enough time has elapsed without finding a solution.
func (m *CPUMiner) solveBlock(msgBlock *wire.MsgBlock, blockHeight int32,
ticker *time.Ticker, quit chan struct{}) bool {
// Choose a random extra nonce offset for this block template and
// worker.
enOffset, err := wire.RandomUint64()
if err != nil {
log.Errorf("Unexpected error while generating random "+
"extra nonce offset: %v", err)
enOffset = 0
}
// Create some convenience variables.
header := &msgBlock.Header
targetDifficulty := blockchain.CompactToBig(header.Bits)
// Initial state.
lastGenerated := time.Now()
lastTxUpdate := m.g.TxSource().LastUpdated()
hashesCompleted := uint64(0)
// Note that the entire extra nonce range is iterated and the offset is
// added relying on the fact that overflow will wrap around 0 as
// provided by the Go spec.
for extraNonce := uint64(0); extraNonce < maxExtraNonce; extraNonce++ {
// Update the extra nonce in the block template with the
// new value by regenerating the coinbase script and
// setting the merkle root to the new value.
m.g.UpdateExtraNonce(msgBlock, blockHeight, extraNonce+enOffset)
// Search through the entire nonce range for a solution while
// periodically checking for early quit and stale block
// conditions along with updates to the speed monitor.
for i := uint32(0); i <= maxNonce; i++ {
select {
case <-quit:
return false
case <-ticker.C:
m.updateHashes <- hashesCompleted
hashesCompleted = 0
// The current block is stale if the best block
// has changed.
best := m.g.BestSnapshot()
if !header.PrevBlock.IsEqual(&best.Hash) {
return false
}
// The current block is stale if the memory pool
// has been updated since the block template was
// generated and it has been at least one
// minute.
if lastTxUpdate != m.g.TxSource().LastUpdated() &&
time.Now().After(lastGenerated.Add(time.Minute)) {
return false
}
m.g.UpdateBlockTime(msgBlock)
default:
// Non-blocking select to fall through
}
// Update the nonce and hash the block header. Each
// hash is actually a double sha256 (two hashes), so
// increment the number of hashes completed for each
// attempt accordingly.
header.Nonce = i
hash := header.BlockHash()
hashesCompleted += 2
// The block is solved when the new block hash is less
// than the target difficulty. Yay!
if blockchain.HashToBig(&hash).Cmp(targetDifficulty) <= 0 {
m.updateHashes <- hashesCompleted
return true
}
}
}
return false
}
solveBlock
试图通过设置nonce
,extra nonce
和timestamp
来使得给定的区块的哈希小于目标难度值。当一个新区块出现时、或者当出现了新的交易并且经过了一定的时间,这个方法会提早返回。
solveBlock
首先在外层循环设置extraNonce
,每个循环对extraNonce
增一;在内层循环设置nonce
,每个循环对nonce
增一;在循环内部,solveBlock
能够接收退出消息,并且会周期性地检查是否出现了新的区块;如果新的交易出现,并且时间已经过去一分钟,则退出;
hash := header.BlockHash()
对区块头进行了哈希。如果哈希满足了要求,则将hashesCompleted发到m.updateHashes
,并返回true
。(将hashesCompleted
发到m.updateHashes
是为了让速度监视器监控每秒钟的哈希数)
回到generateBlocks
,成功找到符合要求的nonce
, extra nonce
, timestamp
组合后,solveBlock
返回true
,调用者generateBlocks
继续调用NewBlock
对template.Block
对区块进行简单封装后调用submitBlock
对区块进行提交。
submitBlock
代码如下:
// submitBlock submits the passed block to network after ensuring it passes all
// of the consensus validation rules.
func (m *CPUMiner) submitBlock(block *btcutil.Block) bool {
m.submitBlockLock.Lock()
defer m.submitBlockLock.Unlock()
// Ensure the block is not stale since a new block could have shown up
// while the solution was being found. Typically that condition is
// detected and all work on the stale block is halted to start work on
// a new block, but the check only happens periodically, so it is
// possible a block was found and submitted in between.
msgBlock := block.MsgBlock()
if !msgBlock.Header.PrevBlock.IsEqual(&m.g.BestSnapshot().Hash) {
log.Debugf("Block submitted via CPU miner with previous "+
"block %s is stale", msgBlock.Header.PrevBlock)
return false
}
// Process this block using the same rules as blocks coming from other
// nodes. This will in turn relay it to the network like normal.
isOrphan, err := m.cfg.ProcessBlock(block, blockchain.BFNone)
if err != nil {
// Anything other than a rule violation is an unexpected error,
// so log that error as an internal error.
if _, ok := err.(blockchain.RuleError); !ok {
log.Errorf("Unexpected error while processing "+
"block submitted via CPU miner: %v", err)
return false
}
log.Debugf("Block submitted via CPU miner rejected: %v", err)
return false
}
if isOrphan {
log.Debugf("Block submitted via CPU miner is an orphan")
return false
}
// The block was accepted.
coinbaseTx := block.MsgBlock().Transactions[0].TxOut[0]
log.Infof("Block submitted via CPU miner accepted (hash %s, "+
"amount %v)", block.Hash(), btcutil.Amount(coinbaseTx.Value))
return true
}
submitBlock
最后检查一次新挖出的区块是否已经过期(通过对比新挖出的区块的上一区块哈希和目前主链最新区块哈希)。
通过ProcessBlock
方法,将区块广播到对等方,这个ProcessBlock
方法由cpuminer
中的Config
结构体作为方法变量持有,实际上这个方法变量被赋值为SyncManager
的ProcessBlock
方法,其代码如下:
// ProcessBlock makes use of ProcessBlock on an internal instance of a block
// chain.
func (sm *SyncManager) ProcessBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) {
reply := make(chan processBlockResponse, 1)
sm.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply}
response := <-reply
return response.isOrphan, response.err
}
它将processBlockMsg
发送到msgChan
通道,随后试图从processBlockMsg
附带的回复通道读取回复消息,并将其返回给调用者。这时再次回到了熟悉的blockHandler
方法(在提交交易的部分曾提到),接收processBlockMsg
的代码如下:
out:
for {
select {
case m := <-sm.msgChan:
switch msg := m.(type) {
...
case processBlockMsg:
_, isOrphan, err := sm.chain.ProcessBlock(
msg.block, msg.flags)
if err != nil {
msg.reply <- processBlockResponse{
isOrphan: false,
err: err,
}
}
msg.reply <- processBlockResponse{
isOrphan: isOrphan,
err: nil,
}
...
...
}
}
当从msgChan
收到的消息为processBlockMsg
类型时,调用ProcessBlock
方法,并返回响应。
BlockChain
对象的ProcessBlock
方法就是区块上链的最后一步。它包括了blockExists
拒绝重复块、checkBlockSanity
确保区块遵守所有规则、addOrphanBlock
孤儿处理、maybeAcceptBlock
将区块插入到区块链中。
maybeAcceptBlock
基本上将一个区块放到区块链上,它会返回一个区块最终是否在主链上。
blockNode
对象,并对其进行索引;connectBestChain
将区块连接到主链上。connectBestChain
将区块连接到主链上。典型情况下,只需将新区块连接到主链上。然而它可能延长了一个侧链,它既可能变成主链,也可能不成为主链。
connectBestChain
将区块索引更新的数据库;
再次执行几项检查验证区块能够连接到主链上;
主要注意的是何时更新UTXO:
if fastAdd {
err := view.fetchInputUtxos(b.db, block)
if err != nil {
return false, err
}
err = view.connectTransactions(block, &stxos)
if err != nil {
return false, err
}
}
将区块中的交易中inputs包含的UTXOs拿来,并将区块中的交易中的outputs作为新的UTXOs。
调用connectBlock
将区块连接到主链上。在connectBlock
中有以下代码:
err := b.indexManager.ConnectBlock(dbTx, block, stxos)
ConnectBlock
执行了一系列更新,最终将新的区块连接到主链上。
虽然新区块连接到了主链上,但这只是本地的一条区块链,网络中的其他节点并未得知已经产生了一个新的区块。一般来说,一个节点连接了一个新的区块,应该尽快通知尽可能多的节点,否则可能有其它节点也找到了符合要求的nonce
,也连接了一个新的区块,那么当前节点连接的区块就有可能成为侧链。我们重新回到maybeAcceptBlock
方法:
当顺利将新区块连接到主链上后,后续有以下代码:
// Notify the caller that the new block was accepted into the block
// chain. The caller would typically want to react by relaying the
// inventory to other peers.
b.chainLock.Unlock()
b.sendNotification(NTBlockAccepted, block)
b.chainLock.Lock()
maybeAcceptBlock
通过调用sendNotification
通知调用者,新的区块已经被接受。其代码如下:
// sendNotification sends a notification with the passed type and data if the
// caller requested notifications by providing a callback function in the call
// to New.
func (b *BlockChain) sendNotification(typ NotificationType, data interface{}) {
// Generate and send the notification.
n := Notification{Type: typ, Data: data}
b.notificationsLock.RLock()
for _, callback := range b.notifications {
callback(&n)
}
b.notificationsLock.RUnlock()
}
该方法先构造了一个Notification
,然后遍历BlockChain
对象的notifications
,用构造的Notification
作为参数回调notifications
中的每一个函数,其中,Subscribe
方法能够订阅该通知:
// Subscribe to block chain notifications. Registers a callback to be executed
// when various events take place. See the documentation on Notification and
// NotificationType for details on the types and contents of notifications.
func (b *BlockChain) Subscribe(callback NotificationCallback) {
b.notificationsLock.Lock()
b.notifications = append(b.notifications, callback)
b.notificationsLock.Unlock()
}
查看该方法的Usage
可知,RPCServer
和SyncManager
均订阅了该消息,并传入各自的handleBlockchainNotification
方法作为回调函数。
先查看rpcServer
的handleBlockchainNotification
对应的分支:
case blockchain.NTBlockAccepted:
block, ok := notification.Data.(*btcutil.Block)
if !ok {
rpcsLog.Warnf("Chain accepted notification is not a block.")
break
}
// Allow any clients performing long polling via the
// getblocktemplate RPC to be notified when the new block causes
// their old block template to become stale.
s.gbtWorkState.NotifyBlockConnected(block.Hash())
当新的区块让他们的旧区块模板过期,则通过NotifyBlockConnected
方法对“通过getblocktemplate RPC
执行long polling
的客户端”进行通知。
在查看SyncManager
的handleBlockchainNotification
对应的分支:
// A block has been accepted into the block chain. Relay it to other
// peers.
case blockchain.NTBlockAccepted:
// Don't relay if we are not current. Other peers that are
// current should already know about it.
if !sm.current() {
return
}
block, ok := notification.Data.(*btcutil.Block)
if !ok {
log.Warnf("Chain accepted notification is not a block.")
break
}
// Generate the inventory vector and relay it.
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)
这里将新区块传递到其他对等方。生成一个库存向量并传递它。
// RelayInventory relays the passed inventory vector to all connected peers
// that are not already known to have it.
func (s *server) RelayInventory(invVect *wire.InvVect, data interface{}) {
s.relayInv <- relayMsg{invVect: invVect, data: data}
}
RelayInventory
将库存向量传给了server
的relayInv
。
这个通道被server的peerHandler
读取:
// New inventory to potentially be relayed to other peers.
case invMsg := <-s.relayInv:
s.handleRelayInvMsg(state, invMsg)
handleRelayInvMsg
执行了一个闭包:
state.forAllPeers(func(sp *serverPeer) {
if !sp.Connected() {
return
}
// If the inventory is a block and the peer prefers headers,
// generate and send a headers message instead of an inventory
// message.
if msg.invVect.Type == wire.InvTypeBlock && sp.WantsHeaders() {
blockHeader, ok := msg.data.(wire.BlockHeader)
if !ok {
peerLog.Warnf("Underlying data for headers" +
" is not a block header")
return
}
msgHeaders := wire.NewMsgHeaders()
if err := msgHeaders.AddBlockHeader(&blockHeader); err != nil {
peerLog.Errorf("Failed to add block"+
" header: %v", err)
return
}
sp.QueueMessage(msgHeaders, nil)
return
}
...
})
最后将封装的信息放到了对等方发送队列中。QueueMessage
代码如下:
// QueueMessage adds the passed bitcoin message to the peer send queue.
//
// This function is safe for concurrent access.
func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) {
p.QueueMessageWithEncoding(msg, doneChan, wire.BaseEncoding)
}
最后经过一步一步传递,交由infrastruture
包中的代码完成将新的区块发送到所有已知对等方。
至此,完成了从区块创建到将区块传递给对等方的源码分析。对等方在接收到新区块后,将对其进行验证,并决定是否将其连接到主链上。该区块最终能否上链,取决于网络中的大多数结点是否最终接受了它。如果网络中同时存在其它结点新挖出的区块,上链的胜负将继续交由新区块的下一区块产生的先后决定。