当前位置: 首页 > 工具软件 > btcd > 使用案例 >

btcd交易流程之交易的打包上链(三)

林曦之
2023-12-01

交易打包上链的概述

btcd首先从内存池取得交易,用交易填充区块,并往区块上填入必要的信息。随后区块进行POW计算。当计算符合难度值的区块哈希后,btcd对新区块进行最后一步的验证,将新区块连接到本地的主链上,并广播这个新区块给对等方。

BitCoin RPCs

为了了解交易是怎么被打包到区块,最后发布的,首先从查看相关的RPC调用。

getblocktemplate

getblocktemplate ( "template_request" )

其描述如下:

If the request parameters include a ‘mode’ key, that is used to explicitly select between the default ‘template’ request or a ‘proposal’.
It returns data needed to construct a block to work on.

getblocktemplate RPC将返回构造一个区块所需要的数据。

generateblock

generateblock "output" ["rawtx/txid",...]

其描述如下:

Mine a block with a set of ordered transactions immediately to a specified address or descriptor (before the RPC call returns)

generateblock RPC用给定的排好序的交易挖矿,需要指定受益人的地址。

submitblock

submitblock "hexdata" ( "dummy" )

其描述如下:

Attempts to submit new block to network.

尝试提交一个新的区块到网络中。

setgenerate

Set the server to generate coins (mine) or not.
NOTE: Since btcd does not have the wallet integrated to provide payment addresses, btcd must be configured via the --miningaddr option to provide which payment addresses to pay created blocks to for this RPC to function.

setgenerate设置server是否开启mining。如果开启,服务器将动用CPU mining。需要用–miningaddr指定受益人地址。

mining总体流程分析

有了这三个RPC,我们可以大概确定从打包交易到发布区块的流程。
首先通过getblocktemplate RPC得到组装一个区块需要的信息,然后通过generateblock RPC产生一个区块,最后通过submitblock RPC将区块发布到网络上。
也可以通过setgenerate RPC设置服务器开启mining,在服务器上完成区块的生成、挖矿、提交。
以下选取getblocktemplate RPCsetgenerate RPC进行分析。

getblocktemplate

Client发出RPC开始。该RPC位于rpcclient/mining.go,其代码如下:

// GetBlockTemplate returns a new block template for mining.
func (c *Client) GetBlockTemplate(req *btcjson.TemplateRequest) (*btcjson.GetBlockTemplateResult, error) {
	return c.GetBlockTemplateAsync(req).Receive()
}

它实际调用了异步版本GetBlockTemplateAsync,并调用其Receive方法阻塞等待结果返回:

// GetBlockTemplateAsync returns an instance of a type that can be used to get the
// result of the RPC at some future time by invoking the Receive function on the
// returned instance.
//
// See GetBlockTemplate for the blocking version and more details.
func (c *Client) GetBlockTemplateAsync(req *btcjson.TemplateRequest) FutureGetBlockTemplateResponse {
	cmd := btcjson.NewGetBlockTemplateCmd(req)
	return c.SendCmd(cmd)
}

调用了ClientSendCmd方法:

// SendCmd sends the passed command to the associated server and returns a
// response channel on which the reply will be delivered at some point in the
// future.  It handles both websocket and HTTP POST mode depending on the
// configuration of the client.
func (c *Client) SendCmd(cmd interface{}) chan *Response {
	...
	responseChan := make(chan *Response, 1)
	jReq := &jsonRequest{
		id:             id,
		method:         method,
		cmd:            cmd,
		marshalledJSON: marshalledJSON,
		responseChan:   responseChan,
	}

	c.sendRequest(jReq)

	return responseChan
}

该方法声明了一个容量为1的回复用的通道,说明调用者GetBlockTemplateAsync不会阻塞在等待该方法返回结果。
SendCmd进一步调用了ClientsendRequest方法,之后就是用POST方法将请求发到服务器上。

在服务器上,RPC的处理方法位于rpcserver.go,处理方法为handleGetBlockTemplate,其代码如下:

// handleGetBlockTemplate implements the getblocktemplate command.
func handleGetBlockTemplate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
	c := cmd.(*btcjson.GetBlockTemplateCmd)
	request := c.Request

	// Set the default mode and override it if supplied.
	mode := "template"
	if request != nil && request.Mode != "" {
		mode = request.Mode
	}

	switch mode {
	case "template":
		return handleGetBlockTemplateRequest(s, request, closeChan)
	case "proposal":
		return handleGetBlockTemplateProposal(s, request)
	}

	return nil, &btcjson.RPCError{
		Code:    btcjson.ErrRPCInvalidParameter,
		Message: "Invalid mode",
	}
}

该方法先判断请求的模式mode是什么,默认为template。以template为例,它调用了handleGetBlockTemplateRequest方法,其代码如下:

func handleGetBlockTemplateRequest(s *rpcServer, request *btcjson.TemplateRequest, closeChan <-chan struct{}) (interface{}, error) {
	...

	// Get and return a block template.  A new block template will be
	// generated when the current best block has changed or the transactions
	// in the memory pool have been updated and it has been at least five
	// seconds since the last template was generated.  Otherwise, the
	// timestamp for the existing block template is updated (and possibly
	// the difficulty on testnet per the consesus rules).
	if err := state.updateBlockTemplate(s, useCoinbaseValue); err != nil {
		return nil, err
	}
	return state.blockTemplateResult(useCoinbaseValue, nil)
}

该方法调用了blockTemplateResult方法。查看该方法可知blockTemplateResult返回了目前和state关联的区块模板。
首先看state是什么,其结构体定义如下:

// gbtWorkState houses state that is used in between multiple RPC invocations to
// getblocktemplate.
type gbtWorkState struct {
	sync.Mutex
	lastTxUpdate  time.Time
	lastGenerated time.Time
	prevHash      *chainhash.Hash
	minTimestamp  time.Time
	template      *mining.BlockTemplate
	notifyMap     map[chainhash.Hash]map[int64]chan struct{}
	timeSource    blockchain.MedianTimeSource
}

gbtWorkState保存了多个getblocktemplate RPC调用请求间的状态。其中BlockTemplate域保存了我们需要的区块模板。那么这个template *mining.BlockTemplate是什么时候设置的呢?其实在handleGetBlockTemplateRequest中,调用的updateBlockTemplate方法设置了这个template

func (state *gbtWorkState) updateBlockTemplate(s *rpcServer, useCoinbaseValue bool) error {
		
		blkTemplate, err := generator.NewBlockTemplate(payAddr)
		...
		template = blkTemplate
		...
		state.template = template
		...
}

NewBlockTemplate方法则真正创建了一个新的区块模板。该区块模板使用来自内存池的交易创建区块。其中,传入的payToAddress用于创建coinbase交易。

NewBlockTemplate选择交易的策略

选择和包含的交易根据几个因素进行优先级排序:

  1. 每个事务都有一个基于其值、输入时间和大小计算的优先级。由较大金额、较旧输入和较小规模组成的事务具有最高优先级;
  2. 计算每笔交易的每千字节费用。首选每千字节费用较高的交易。
  3. 将所有与块生成相关的策略设置都考虑在内。

仅使用块链中已有的其他交易的输出的交易将立即添加到优先级队列中,该队列根据优先级(然后是每千字节的费用)或每千字节的费用(然后是优先级)进行优先级排序取决于BlockPrioritySize策略设置是否为高优先级交易分配空间。使用源池中其他交易的输出的交易将添加到依赖关系映射中,以便在包含它们所依赖的交易后,可以将它们添加到优先级队列中。一旦高优先级区域(如果已配置)已填满交易,或者优先级低于高优先级,优先级队列将更新为按每千字节的费用(然后是优先级)排列优先级。当每千字节的费用低于TxMinFreeFee策略设置时,将跳过交易,除非BlockMinSize策略设置为非零,在这种情况下,块将填充低费用/免费交易,直到块大小达到该最小大小。跳过任何会导致块超过BlockMaxSize策略设置、超过每个块允许的最大签名操作数或以其他方式导致块无效的交易。

至此,NewBlockTemplate方法返回了一个新的区块模板:

	return &BlockTemplate{
		Block:             &msgBlock,
		Fees:              txFees,
		SigOpCosts:        txSigOpCosts,
		Height:            nextBlockHeight,
		ValidPayAddress:   payToAddress != nil,
		WitnessCommitment: witnessCommitment,
	}, nil

setgenerate

前面说到,setgenerate设置server是否开启挖矿。如果开启,服务器将动用CPU挖矿。需要用–miningaddr指定受益人地址。
首先查看处理setgennerate的方法handleSetGenerate

// handleSetGenerate implements the setgenerate command.
func handleSetGenerate(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
		...
		// It's safe to call start even if it's already started.
		s.cfg.CPUMiner.SetNumWorkers(int32(genProcLimit))
		s.cfg.CPUMiner.Start()
		...
}

handleSetGenerate先设置了挖矿的worker数量,然后调用CPUMinerStart方法开始挖矿。继续查看Start方法:

// Start begins the CPU mining process as well as the speed monitor used to
// track hashing metrics.  Calling this function when the CPU miner has
// already been started will have no effect.
//
// This function is safe for concurrent access.
func (m *CPUMiner) Start() {
	m.Lock()
	defer m.Unlock()

	// Nothing to do if the miner is already running or if running in
	// discrete mode (using GenerateNBlocks).
	if m.started || m.discreteMining {
		return
	}

	m.quit = make(chan struct{})
	m.speedMonitorQuit = make(chan struct{})
	m.wg.Add(2)
	go m.speedMonitor()
	go m.miningWorkerController()

	m.started = true
	log.Infof("CPU miner started")
}

Start方法开启了一个速度监控speedMonitor协程和一个工作线程控制器miningWorkerController协程。
这里主要看miningWorkerController的实现:

// miningWorkerController launches the worker goroutines that are used to
// generate block templates and solve them.  It also provides the ability to
// dynamically adjust the number of running worker goroutines.
func (m *CPUMiner) miningWorkerController() {
	// launchWorkers groups common code to launch a specified number of
	// workers for generating blocks.
	var runningWorkers []chan struct{}
	launchWorkers := func(numWorkers uint32) {
		for i := uint32(0); i < numWorkers; i++ {
			quit := make(chan struct{})
			runningWorkers = append(runningWorkers, quit)

			m.workerWg.Add(1)
			go m.generateBlocks(quit)
		}
	}

	// Launch the current number of workers by default.
	runningWorkers = make([]chan struct{}, 0, m.numWorkers)
	launchWorkers(m.numWorkers)

out:
	for {
		select {
		// Update the number of running workers.
		case <-m.updateNumWorkers:
			// No change.
			numRunning := uint32(len(runningWorkers))
			if m.numWorkers == numRunning {
				continue
			}

			// Add new workers.
			if m.numWorkers > numRunning {
				launchWorkers(m.numWorkers - numRunning)
				continue
			}

			// Signal the most recently created goroutines to exit.
			for i := numRunning - 1; i >= m.numWorkers; i-- {
				close(runningWorkers[i])
				runningWorkers[i] = nil
				runningWorkers = runningWorkers[:i]
			}

		case <-m.quit:
			for _, quit := range runningWorkers {
				close(quit)
			}
			break out
		}
	}

	// Wait until all workers shut down to stop the speed monitor since
	// they rely on being able to send updates to it.
	m.workerWg.Wait()
	close(m.speedMonitorQuit)
	m.wg.Done()
}


for循环中,通过go m.generateBlocks(quit)开启了各个worker协程。随后miningWorkerController监控挖矿worker数有没有变化,对挖矿worker数进行调整,并监听退出消息,进而退出挖矿。

generateBlocks就是所谓的worker,继续查看generateBlocks方法是怎么挖矿的:

// generateBlocks is a worker that is controlled by the miningWorkerController.
// It is self contained in that it creates block templates and attempts to solve
// them while detecting when it is performing stale work and reacting
// accordingly by generating a new block template.  When a block is solved, it
// is submitted.
//
// It must be run as a goroutine.
func (m *CPUMiner) generateBlocks(quit chan struct{}) {
	log.Tracef("Starting generate blocks worker")

	// Start a ticker which is used to signal checks for stale work and
	// updates to the speed monitor.
	ticker := time.NewTicker(time.Second * hashUpdateSecs)
	defer ticker.Stop()
out:
	for {
		// Quit when the miner is stopped.
		select {
		case <-quit:
			break out
		default:
			// Non-blocking select to fall through
		}

		// Wait until there is a connection to at least one other peer
		// since there is no way to relay a found block or receive
		// transactions to work on when there are no connected peers.
		if m.cfg.ConnectedCount() == 0 {
			time.Sleep(time.Second)
			continue
		}

		// No point in searching for a solution before the chain is
		// synced.  Also, grab the same lock as used for block
		// submission, since the current block will be changing and
		// this would otherwise end up building a new block template on
		// a block that is in the process of becoming stale.
		m.submitBlockLock.Lock()
		curHeight := m.g.BestSnapshot().Height
		if curHeight != 0 && !m.cfg.IsCurrent() {
			m.submitBlockLock.Unlock()
			time.Sleep(time.Second)
			continue
		}

		// Choose a payment address at random.
		rand.Seed(time.Now().UnixNano())
		payToAddr := m.cfg.MiningAddrs[rand.Intn(len(m.cfg.MiningAddrs))]

		// Create a new block template using the available transactions
		// in the memory pool as a source of transactions to potentially
		// include in the block.
		template, err := m.g.NewBlockTemplate(payToAddr)
		m.submitBlockLock.Unlock()
		if err != nil {
			errStr := fmt.Sprintf("Failed to create new block "+
				"template: %v", err)
			log.Errorf(errStr)
			continue
		}

		// Attempt to solve the block.  The function will exit early
		// with false when conditions that trigger a stale block, so
		// a new block template can be generated.  When the return is
		// true a solution was found, so submit the solved block.
		if m.solveBlock(template.Block, curHeight+1, ticker, quit) {
			block := btcutil.NewBlock(template.Block)
			m.submitBlock(block)
		}
	}

	m.workerWg.Done()
	log.Tracef("Generate blocks worker done")
}

generateBlocks做了以下这些事情:

  1. 监听quit通道,如果该通道被关闭,则worker退出;
  2. 如果没有连接的对等方,则阻塞一秒钟;
  3. 检查主链是否已经同步,否则阻塞一秒等待主链同步完成;
  4. 从设定的受益人地址中任选一个;
  5. 调用前述NewBlockTemplate获取一个区块模板;
  6. 调用solveBlock来求解符合难度的nonce值;
  7. 成功找到符合要求的nonce后,调用NewBlock组装区块,调用submitBlock提交区块。

solveBlock是真正在做POW的地方,其代码如下:

// solveBlock attempts to find some combination of a nonce, extra nonce, and
// current timestamp which makes the passed block hash to a value less than the
// target difficulty.  The timestamp is updated periodically and the passed
// block is modified with all tweaks during this process.  This means that
// when the function returns true, the block is ready for submission.
//
// This function will return early with false when conditions that trigger a
// stale block such as a new block showing up or periodically when there are
// new transactions and enough time has elapsed without finding a solution.
func (m *CPUMiner) solveBlock(msgBlock *wire.MsgBlock, blockHeight int32,
	ticker *time.Ticker, quit chan struct{}) bool {

	// Choose a random extra nonce offset for this block template and
	// worker.
	enOffset, err := wire.RandomUint64()
	if err != nil {
		log.Errorf("Unexpected error while generating random "+
			"extra nonce offset: %v", err)
		enOffset = 0
	}

	// Create some convenience variables.
	header := &msgBlock.Header
	targetDifficulty := blockchain.CompactToBig(header.Bits)

	// Initial state.
	lastGenerated := time.Now()
	lastTxUpdate := m.g.TxSource().LastUpdated()
	hashesCompleted := uint64(0)

	// Note that the entire extra nonce range is iterated and the offset is
	// added relying on the fact that overflow will wrap around 0 as
	// provided by the Go spec.
	for extraNonce := uint64(0); extraNonce < maxExtraNonce; extraNonce++ {
		// Update the extra nonce in the block template with the
		// new value by regenerating the coinbase script and
		// setting the merkle root to the new value.
		m.g.UpdateExtraNonce(msgBlock, blockHeight, extraNonce+enOffset)

		// Search through the entire nonce range for a solution while
		// periodically checking for early quit and stale block
		// conditions along with updates to the speed monitor.
		for i := uint32(0); i <= maxNonce; i++ {
			select {
			case <-quit:
				return false

			case <-ticker.C:
				m.updateHashes <- hashesCompleted
				hashesCompleted = 0

				// The current block is stale if the best block
				// has changed.
				best := m.g.BestSnapshot()
				if !header.PrevBlock.IsEqual(&best.Hash) {
					return false
				}

				// The current block is stale if the memory pool
				// has been updated since the block template was
				// generated and it has been at least one
				// minute.
				if lastTxUpdate != m.g.TxSource().LastUpdated() &&
					time.Now().After(lastGenerated.Add(time.Minute)) {

					return false
				}

				m.g.UpdateBlockTime(msgBlock)

			default:
				// Non-blocking select to fall through
			}

			// Update the nonce and hash the block header.  Each
			// hash is actually a double sha256 (two hashes), so
			// increment the number of hashes completed for each
			// attempt accordingly.
			header.Nonce = i
			hash := header.BlockHash()
			hashesCompleted += 2

			// The block is solved when the new block hash is less
			// than the target difficulty.  Yay!
			if blockchain.HashToBig(&hash).Cmp(targetDifficulty) <= 0 {
				m.updateHashes <- hashesCompleted
				return true
			}
		}
	}

	return false
}

solveBlock试图通过设置nonceextra noncetimestamp来使得给定的区块的哈希小于目标难度值。当一个新区块出现时、或者当出现了新的交易并且经过了一定的时间,这个方法会提早返回。
solveBlock首先在外层循环设置extraNonce,每个循环对extraNonce增一;在内层循环设置nonce,每个循环对nonce增一;在循环内部,solveBlock能够接收退出消息,并且会周期性地检查是否出现了新的区块;如果新的交易出现,并且时间已经过去一分钟,则退出;
hash := header.BlockHash()对区块头进行了哈希。如果哈希满足了要求,则将hashesCompleted发到m.updateHashes,并返回true。(将hashesCompleted发到m.updateHashes是为了让速度监视器监控每秒钟的哈希数)

回到generateBlocks,成功找到符合要求的nonce, extra nonce, timestamp组合后,solveBlock返回true,调用者generateBlocks继续调用NewBlocktemplate.Block对区块进行简单封装后调用submitBlock对区块进行提交。

submitBlock代码如下:

// submitBlock submits the passed block to network after ensuring it passes all
// of the consensus validation rules.
func (m *CPUMiner) submitBlock(block *btcutil.Block) bool {
	m.submitBlockLock.Lock()
	defer m.submitBlockLock.Unlock()

	// Ensure the block is not stale since a new block could have shown up
	// while the solution was being found.  Typically that condition is
	// detected and all work on the stale block is halted to start work on
	// a new block, but the check only happens periodically, so it is
	// possible a block was found and submitted in between.
	msgBlock := block.MsgBlock()
	if !msgBlock.Header.PrevBlock.IsEqual(&m.g.BestSnapshot().Hash) {
		log.Debugf("Block submitted via CPU miner with previous "+
			"block %s is stale", msgBlock.Header.PrevBlock)
		return false
	}

	// Process this block using the same rules as blocks coming from other
	// nodes.  This will in turn relay it to the network like normal.
	isOrphan, err := m.cfg.ProcessBlock(block, blockchain.BFNone)
	if err != nil {
		// Anything other than a rule violation is an unexpected error,
		// so log that error as an internal error.
		if _, ok := err.(blockchain.RuleError); !ok {
			log.Errorf("Unexpected error while processing "+
				"block submitted via CPU miner: %v", err)
			return false
		}

		log.Debugf("Block submitted via CPU miner rejected: %v", err)
		return false
	}
	if isOrphan {
		log.Debugf("Block submitted via CPU miner is an orphan")
		return false
	}

	// The block was accepted.
	coinbaseTx := block.MsgBlock().Transactions[0].TxOut[0]
	log.Infof("Block submitted via CPU miner accepted (hash %s, "+
		"amount %v)", block.Hash(), btcutil.Amount(coinbaseTx.Value))
	return true
}

submitBlock最后检查一次新挖出的区块是否已经过期(通过对比新挖出的区块的上一区块哈希和目前主链最新区块哈希)。
通过ProcessBlock方法,将区块广播到对等方,这个ProcessBlock方法由cpuminer中的Config结构体作为方法变量持有,实际上这个方法变量被赋值为SyncManagerProcessBlock方法,其代码如下:

// ProcessBlock makes use of ProcessBlock on an internal instance of a block
// chain.
func (sm *SyncManager) ProcessBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) {
	reply := make(chan processBlockResponse, 1)
	sm.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply}
	response := <-reply
	return response.isOrphan, response.err
}

它将processBlockMsg发送到msgChan通道,随后试图从processBlockMsg附带的回复通道读取回复消息,并将其返回给调用者。这时再次回到了熟悉的blockHandler方法(在提交交易的部分曾提到),接收processBlockMsg的代码如下:

out:
	for {
		select {
		case m := <-sm.msgChan:
			switch msg := m.(type) {
			...
			case processBlockMsg:
				_, isOrphan, err := sm.chain.ProcessBlock(
					msg.block, msg.flags)
				if err != nil {
					msg.reply <- processBlockResponse{
						isOrphan: false,
						err:      err,
					}
				}

				msg.reply <- processBlockResponse{
					isOrphan: isOrphan,
					err:      nil,
				}
			...
		...
		}
	}


当从msgChan收到的消息为processBlockMsg类型时,调用ProcessBlock方法,并返回响应。
BlockChain对象的ProcessBlock方法就是区块上链的最后一步。它包括了blockExists拒绝重复块、checkBlockSanity确保区块遵守所有规则、addOrphanBlock孤儿处理、maybeAcceptBlock将区块插入到区块链中。

maybeAcceptBlock基本上将一个区块放到区块链上,它会返回一个区块最终是否在主链上。

  1. 它将执行几个取决于其位置的验证检查(新区块的高度是引用的前一区块高度加一,难度是否符合要求等…),才试图将其添加到主链上;
  2. 将区块保存到数据库中;
  3. 创建一个blockNode对象,并对其进行索引;
  4. connectBestChain将区块连接到主链上。

connectBestChain将区块连接到主链上。典型情况下,只需将新区块连接到主链上。然而它可能延长了一个侧链,它既可能变成主链,也可能不成为主链。
connectBestChain将区块索引更新的数据库;
再次执行几项检查验证区块能够连接到主链上;
主要注意的是何时更新UTXO:

		if fastAdd {
			err := view.fetchInputUtxos(b.db, block)
			if err != nil {
				return false, err
			}
			err = view.connectTransactions(block, &stxos)
			if err != nil {
				return false, err
			}
		}

将区块中的交易中inputs包含的UTXOs拿来,并将区块中的交易中的outputs作为新的UTXOs。
调用connectBlock将区块连接到主链上。在connectBlock中有以下代码:

err := b.indexManager.ConnectBlock(dbTx, block, stxos)

ConnectBlock执行了一系列更新,最终将新的区块连接到主链上。
虽然新区块连接到了主链上,但这只是本地的一条区块链,网络中的其他节点并未得知已经产生了一个新的区块。一般来说,一个节点连接了一个新的区块,应该尽快通知尽可能多的节点,否则可能有其它节点也找到了符合要求的nonce,也连接了一个新的区块,那么当前节点连接的区块就有可能成为侧链。我们重新回到maybeAcceptBlock方法:

当顺利将新区块连接到主链上后,后续有以下代码:

	// Notify the caller that the new block was accepted into the block
	// chain.  The caller would typically want to react by relaying the
	// inventory to other peers.
	b.chainLock.Unlock()
	b.sendNotification(NTBlockAccepted, block)
	b.chainLock.Lock()

maybeAcceptBlock通过调用sendNotification通知调用者,新的区块已经被接受。其代码如下:

// sendNotification sends a notification with the passed type and data if the
// caller requested notifications by providing a callback function in the call
// to New.
func (b *BlockChain) sendNotification(typ NotificationType, data interface{}) {
	// Generate and send the notification.
	n := Notification{Type: typ, Data: data}
	b.notificationsLock.RLock()
	for _, callback := range b.notifications {
		callback(&n)
	}
	b.notificationsLock.RUnlock()
}


该方法先构造了一个Notification,然后遍历BlockChain对象的notifications,用构造的Notification作为参数回调notifications中的每一个函数,其中,Subscribe方法能够订阅该通知:

// Subscribe to block chain notifications. Registers a callback to be executed
// when various events take place. See the documentation on Notification and
// NotificationType for details on the types and contents of notifications.
func (b *BlockChain) Subscribe(callback NotificationCallback) {
	b.notificationsLock.Lock()
	b.notifications = append(b.notifications, callback)
	b.notificationsLock.Unlock()
}

查看该方法的Usage可知,RPCServerSyncManager均订阅了该消息,并传入各自的handleBlockchainNotification方法作为回调函数。
先查看rpcServerhandleBlockchainNotification对应的分支:

case blockchain.NTBlockAccepted:
		block, ok := notification.Data.(*btcutil.Block)
		if !ok {
			rpcsLog.Warnf("Chain accepted notification is not a block.")
			break
		}

		// Allow any clients performing long polling via the
		// getblocktemplate RPC to be notified when the new block causes
		// their old block template to become stale.
		s.gbtWorkState.NotifyBlockConnected(block.Hash())

当新的区块让他们的旧区块模板过期,则通过NotifyBlockConnected方法对“通过getblocktemplate RPC执行long polling的客户端”进行通知。

在查看SyncManagerhandleBlockchainNotification对应的分支:

	// A block has been accepted into the block chain.  Relay it to other
	// peers.
	case blockchain.NTBlockAccepted:
		// Don't relay if we are not current. Other peers that are
		// current should already know about it.
		if !sm.current() {
			return
		}

		block, ok := notification.Data.(*btcutil.Block)
		if !ok {
			log.Warnf("Chain accepted notification is not a block.")
			break
		}

		// Generate the inventory vector and relay it.
		iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
		sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)

这里将新区块传递到其他对等方。生成一个库存向量并传递它。

// RelayInventory relays the passed inventory vector to all connected peers
// that are not already known to have it.
func (s *server) RelayInventory(invVect *wire.InvVect, data interface{}) {
	s.relayInv <- relayMsg{invVect: invVect, data: data}
}

RelayInventory将库存向量传给了serverrelayInv
这个通道被server的peerHandler读取:

		// New inventory to potentially be relayed to other peers.
		case invMsg := <-s.relayInv:
			s.handleRelayInvMsg(state, invMsg)

handleRelayInvMsg执行了一个闭包:

state.forAllPeers(func(sp *serverPeer) {
		if !sp.Connected() {
			return
		}

		// If the inventory is a block and the peer prefers headers,
		// generate and send a headers message instead of an inventory
		// message.
		if msg.invVect.Type == wire.InvTypeBlock && sp.WantsHeaders() {
			blockHeader, ok := msg.data.(wire.BlockHeader)
			if !ok {
				peerLog.Warnf("Underlying data for headers" +
					" is not a block header")
				return
			}
			msgHeaders := wire.NewMsgHeaders()
			if err := msgHeaders.AddBlockHeader(&blockHeader); err != nil {
				peerLog.Errorf("Failed to add block"+
					" header: %v", err)
				return
			}
			sp.QueueMessage(msgHeaders, nil)
			return
		}
		...
	})

最后将封装的信息放到了对等方发送队列中。QueueMessage代码如下:

// QueueMessage adds the passed bitcoin message to the peer send queue.
//
// This function is safe for concurrent access.
func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) {
	p.QueueMessageWithEncoding(msg, doneChan, wire.BaseEncoding)
}

最后经过一步一步传递,交由infrastruture包中的代码完成将新的区块发送到所有已知对等方。

至此,完成了从区块创建到将区块传递给对等方的源码分析。对等方在接收到新区块后,将对其进行验证,并决定是否将其连接到主链上。该区块最终能否上链,取决于网络中的大多数结点是否最终接受了它。如果网络中同时存在其它结点新挖出的区块,上链的胜负将继续交由新区块的下一区块产生的先后决定。

 类似资料: