当前位置: 首页 > 工具软件 > btcd > 使用案例 >

btcd交易流程之交易的验证(二)

马高谊
2023-12-01

交易的验证

btcd.go中的main函数开始,启动btcd调用了btcdMainbtcdMain中有以下代码:

	// Create server and start it.
	server, err := newServer(cfg.Listeners, cfg.AgentBlacklist,
		cfg.AgentWhitelist, db, activeNetParams.Params, interrupt)

分析到底这是什么Server:

// newServer returns a new btcd server configured to listen on addr for the
// bitcoin network type specified by chainParams.  Use start to begin accepting
// connections from peers.
func newServer(listenAddrs, agentBlacklist, agentWhitelist []string,
	db database.DB, chainParams *chaincfg.Params,
	interrupt <-chan struct{}) (*server, error) {

由注释可知,这个Server用于监听对等方的连接,btcdMain在得到Server对象后,随后开启该服务器:

	server.Start()

该方法上的注释:

// Start begins accepting connections from peers.
func (s *server) Start() {

说明btcd开始接受对等方的连接。
在该Start方法中又有以下代码:

	// Start the peer handler which in turn starts the address and block
	// managers.
	s.wg.Add(1)
	go s.peerHandler()

创建了一个goroutine作为对等方处理器,该处理器依次开启地址处理器块处理器
继续看peerHandler完成了什么工作:

// peerHandler is used to handle peer operations such as adding and removing
// peers to and from the server, banning peers, and broadcasting messages to
// peers.  It must be run in a goroutine.
func (s *server) peerHandler() {

peerHandler用于处理对等方操作,比如添加、移除、禁止对等方,以及广播消息到对等方。
当一个对等方发布一个交易时,需要靠该服务器将交易广播到对等方,进而让矿工将交易打包到区块中。
peerHandler开启了两个管理器,分别是地址管理器同步管理器

	// Start the address manager and sync manager, both of which are needed
	// by peers.  This is done here since their lifecycle is closely tied
	// to this handler and rather than adding more channels to sychronize
	// things, it's easier and slightly faster to simply start and stop them
	// in this handler.
	s.addrManager.Start()
	s.syncManager.Start()

SyncManagerStart方法如下:

// Start begins the core block handler which processes block and inv messages.
func (sm *SyncManager) Start() {
	// Already started?
	if atomic.AddInt32(&sm.started, 1) != 1 {
		return
	}

	log.Trace("Starting sync manager")
	sm.wg.Add(1)
	go sm.blockHandler()
}

SyncManager开启了一个blockHandler块处理器,其代码如下:

// blockHandler is the main handler for the sync manager.  It must be run as a
// goroutine.  It processes block and inv messages in a separate goroutine
// from the peer handlers so the block (MsgBlock) messages are handled by a
// single thread without needing to lock memory data structures.  This is
// important because the sync manager controls which blocks are needed and how
// the fetching should proceed.
func (sm *SyncManager) blockHandler() {
	stallTicker := time.NewTicker(stallSampleInterval)
	defer stallTicker.Stop()

out:
	for {
		select {
		case m := <-sm.msgChan:
			switch msg := m.(type) {
			case *newPeerMsg:
				sm.handleNewPeerMsg(msg.peer)
			
			...
			case *txMsg:
				sm.handleTxMsg(msg)
				msg.reply <- struct{}{}
			...
			
			default:
				log.Warnf("Invalid message type in block "+
					"handler: %T", msg)
			}

		case <-stallTicker.C:
			sm.handleStallSample()

		case <-sm.quit:
			break out
		}
	}

	sm.wg.Done()
	log.Trace("Block handler done")
}

其中,handleTxMsg是处理交易的入口。那么这个txMsg类型的消息是由谁发过来的呢?
我们先看txMsg的结构体定义:

// txMsg packages a bitcoin tx message and the peer it came from together
// so the block handler has access to that information.
type txMsg struct {
	tx    *btcutil.Tx
	peer  *peerpkg.Peer
	reply chan struct{}
}

它包含了一个交易、一个对等方、用于回复的通道。
现在先分析是谁往通道中发送txMsg这种结构体:
peer.Config中,有这样一个域:

	// OnTx is invoked when a peer receives a tx bitcoin message.
	OnTx func(p *Peer, msg *wire.MsgTx)

由注释可以知道,OnTx这个方法在对等方收到一个交易的消息后就会执行。我们继续看OnTx的实现:

// OnTx is invoked when a peer receives a tx bitcoin message.  It blocks
// until the bitcoin transaction has been fully processed.  Unlock the block
// handler this does not serialize all transactions through a single thread
// transactions don't rely on the previous one in a linear fashion like blocks.
func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) {
	...

	// Queue the transaction up to be handled by the sync manager and
	// intentionally block further receives until the transaction is fully
	// processed and known good or bad.  This helps prevent a malicious peer
	// from queuing up a bunch of bad transactions before disconnecting (or
	// being disconnected) and wasting memory.
	sp.server.syncManager.QueueTx(tx, sp.Peer, sp.txProcessed)
	<-sp.txProcessed
}

QueueTx这个方法将交易排在队列中,等待同步管理器的处理。而在QueueTx中有这样一段:

sm.msgChan <- &txMsg{tx: tx, peer: peer, reply: done}

到这里可以知道,当对等方收到一个交易的时候,就会往同步管理器的msgChan发送这个交易。这时同步管理器的blockHandler方法收到这个结构体,就会进一步调用handleTxMsg处理这个交易。

handleTxMsg方法如下:

// handleTxMsg handles transaction messages from all peers.
func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {
	peer := tmsg.peer
	state, exists := sm.peerStates[peer]
	if !exists {
		log.Warnf("Received tx message from unknown peer %s", peer)
		return
	}

	// NOTE:  BitcoinJ, and possibly other wallets, don't follow the spec of
	// sending an inventory message and allowing the remote peer to decide
	// whether or not they want to request the transaction via a getdata
	// message.  Unfortunately, the reference implementation permits
	// unrequested data, so it has allowed wallets that don't follow the
	// spec to proliferate.  While this is not ideal, there is no check here
	// to disconnect peers for sending unsolicited transactions to provide
	// interoperability.
	txHash := tmsg.tx.Hash()

	// Ignore transactions that we have already rejected.  Do not
	// send a reject message here because if the transaction was already
	// rejected, the transaction was unsolicited.
	if _, exists = sm.rejectedTxns[*txHash]; exists {
		log.Debugf("Ignoring unsolicited previously rejected "+
			"transaction %v from %s", txHash, peer)
		return
	}

	// Process the transaction to include validation, insertion in the
	// memory pool, orphan handling, etc.
	acceptedTxs, err := sm.txMemPool.ProcessTransaction(tmsg.tx,
		true, true, mempool.Tag(peer.ID()))

	// Remove transaction from request maps. Either the mempool/chain
	// already knows about it and as such we shouldn't have any more
	// instances of trying to fetch it, or we failed to insert and thus
	// we'll retry next time we get an inv.
	delete(state.requestedTxns, *txHash)
	delete(sm.requestedTxns, *txHash)

	if err != nil {
		// Do not request this transaction again until a new block
		// has been processed.
		limitAdd(sm.rejectedTxns, *txHash, maxRejectedTxns)

		// When the error is a rule error, it means the transaction was
		// simply rejected as opposed to something actually going wrong,
		// so log it as such.  Otherwise, something really did go wrong,
		// so log it as an actual error.
		if _, ok := err.(mempool.RuleError); ok {
			log.Debugf("Rejected transaction %v from %s: %v",
				txHash, peer, err)
		} else {
			log.Errorf("Failed to process transaction %v: %v",
				txHash, err)
		}

		// Convert the error into an appropriate reject message and
		// send it.
		code, reason := mempool.ErrToRejectErr(err)
		peer.PushRejectMsg(wire.CmdTx, code, reason, txHash, false)
		return
	}

	sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
}

handleTxMsg执行流程大概如下:

  1. 取得交易的哈希;
  2. 忽略已经拒绝过的交易;
  3. 处理交易,包括验证、将它插入memory pool, 处理 orphan 等;
  4. 将该交易从请求映射表中删除。

上文已经看过txMsg的结构,它包含一个对等方、一个交易、一个回复用的通道。再看btcutil.Tx的结构:

// Tx defines a bitcoin transaction that provides easier and more efficient
// manipulation of raw transactions.  It also memoizes the hash for the
// transaction on its first access so subsequent accesses don't have to repeat
// the relatively expensive hashing operations.
type Tx struct {
	msgTx         *wire.MsgTx     // Underlying MsgTx
	txHash        *chainhash.Hash // Cached transaction hash
	txHashWitness *chainhash.Hash // Cached transaction witness hash
	txHasWitness  *bool           // If the transaction has witness data
	txIndex       int             // Position within a block or TxIndexUnknown
}

Tx包括一个底层的交易对象,交易哈希值,交易witness的哈希值,交易是否由witness,交易的下标。
wire.MsgTx的代码如下:

// MsgTx implements the Message interface and represents a bitcoin tx message.
// It is used to deliver transaction information in response to a getdata
// message (MsgGetData) for a given transaction.
//
// Use the AddTxIn and AddTxOut functions to build up the list of transaction
// inputs and outputs.
type MsgTx struct {
	Version  int32
	TxIn     []*TxIn
	TxOut    []*TxOut
	LockTime uint32
}

其中的TxInTxOut就是一个交易的输入和输出,他们的定义如下:

// TxIn defines a bitcoin transaction input.
type TxIn struct {
	PreviousOutPoint OutPoint
	SignatureScript  []byte
	Witness          TxWitness
	Sequence         uint32
}
// TxOut defines a bitcoin transaction output.
type TxOut struct {
	Value    int64
	PkScript []byte
}

查看TxWitness的结构,实际上是一个二维byte数组:

// TxWitness defines the witness for a TxIn. A witness is to be interpreted as
// a slice of byte slices, or a stack with one or many elements.
type TxWitness [][]byte

那么到底witness是什么:
引自 https://github.com/bitcoin/bips/blob/master/bip-0141.mediawiki:

This BIP defines a new structure called a “witness” that is committed to blocks separately from the transaction merkle tree. This structure contains data required to check transaction validity but not required to determine transaction effects. In particular, scripts and signatures are moved into this new structure.
The witness is committed in a tree that is nested into the block’s existing merkle root via the coinbase transaction for the purpose of making this BIP soft fork compatible. A future hard fork can place this tree in its own branch.

一个witness包含检查事务有效性所需的数据,但不包含确定事务影响所需的数据。特别地,交易脚本和签名被移到了这个结构中。
了解了这些内容后,继续查看处理交易的mempool/mempool.go下的ProcessTransaction方法:

// ProcessTransaction is the main workhorse for handling insertion of new
// free-standing transactions into the memory pool.  It includes functionality
// such as rejecting duplicate transactions, ensuring transactions follow all
// rules, orphan transaction handling, and insertion into the memory pool.
//
// It returns a slice of transactions added to the mempool.  When the
// error is nil, the list will include the passed transaction itself along
// with any additional orphan transaactions that were added as a result of
// the passed one being accepted.
//
// This function is safe for concurrent access.
func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) {
	log.Tracef("Processing transaction %v", tx.Hash())

	// Protect concurrent access.
	mp.mtx.Lock()
	defer mp.mtx.Unlock()

	// Potentially accept the transaction to the memory pool.
	missingParents, txD, err := mp.maybeAcceptTransaction(tx, true, rateLimit,
		true)
	if err != nil {
		return nil, err
	}

	if len(missingParents) == 0 {
		// Accept any orphan transactions that depend on this
		// transaction (they may no longer be orphans if all inputs
		// are now available) and repeat for those accepted
		// transactions until there are no more.
		newTxs := mp.processOrphans(tx)
		acceptedTxs := make([]*TxDesc, len(newTxs)+1)

		// Add the parent transaction first so remote nodes
		// do not add orphans.
		acceptedTxs[0] = txD
		copy(acceptedTxs[1:], newTxs)

		return acceptedTxs, nil
	}

	// The transaction is an orphan (has inputs missing).  Reject
	// it if the flag to allow orphans is not set.
	if !allowOrphan {
		// Only use the first missing parent transaction in
		// the error message.
		//
		// NOTE: RejectDuplicate is really not an accurate
		// reject code here, but it matches the reference
		// implementation and there isn't a better choice due
		// to the limited number of reject codes.  Missing
		// inputs is assumed to mean they are already spent
		// which is not really always the case.
		str := fmt.Sprintf("orphan transaction %v references "+
			"outputs of unknown or fully-spent "+
			"transaction %v", tx.Hash(), missingParents[0])
		return nil, txRuleError(wire.RejectDuplicate, str)
	}

	// Potentially add the orphan transaction to the orphan pool.
	err = mp.maybeAddOrphan(tx, tag)
	return nil, err
}

ProcessTransaction负责将交易插入到内存池中。它的功能包括:拒绝重复的交易、确保交易符合规定、孤儿交易处理、插入内存池。它实际调用了maybeAcceptTransaction将交易放到内存池中:

	// Potentially accept the transaction to the memory pool.	missingParents, txD, err := mp.maybeAcceptTransaction(tx, true, rateLimit, true)

maybeAcceptTransaction是一个非常巨型的方法,这里不再列出,它主要做了以下事情:

1. 检查交易是否已经在内存池中;
2. 对事物执行初步健全性检查;
3. 检查这个独立的交易是否coinbase交易;
4. 如果是非标准交易,检查配置的网络参数是否禁止了这种交易;
5. 在内存池中检查这个交易是否发生了double spending(检查这个交易是否用了已经被使用过的交易输出);
6. 从UTXO拿到该交易的所有输入,同时检查UTXO是否已经存在该交易,如果该交易存在主链上并且已经被使用,拒绝它;
7. 如果引用的交易不存在,这个交易是孤儿交易,孤儿交易由调用者处理;
8. 检查交易的sequence lock是否活跃;
9. 使用规定区块链中允许进入区块的交易的不变规则,对交易输入执行多项检查。
10. 检查交易是否有过多签名;
11. 检查交易是否交易费过低,size不超过一定值的free tx仍被允许;
12. 检查free tx是否有足够的优先级被放到下一个区块,并限制free tx的速率,防止penny-flooding
13. 为交易的每个输入验证签名,如果验证不通过则拒绝;
14. 将交易添加到内存池中。

maybeAcceptTransaction通过调用addTransaction将交易放到内存池中,查看addTransaction代码,它将交易放到内存池中:

mp.pool[*tx.Hash()] = txD

查看TxPool结构体:

type TxPool struct {
	// The following variables must only be used atomically.
	lastUpdated int64 // last time pool was updated

	mtx           sync.RWMutex
	cfg           Config
	pool          map[chainhash.Hash]*TxDesc
	orphans       map[chainhash.Hash]*orphanTx
	orphansByPrev map[wire.OutPoint]map[chainhash.Hash]*btcutil.Tx
	outpoints     map[wire.OutPoint]*btcutil.Tx
	pennyTotal    float64 // exponentially decaying total for penny spends.
	lastPennyUnix int64   // unix time of last ``penny spend''

	// nextExpireScan is the time after which the orphan pool will be
	// scanned in order to evict orphans.  This is NOT a hard deadline as
	// the scan will only run when an orphan is added to the pool as opposed
	// to on an unconditional timer.
	nextExpireScan time.Time
}

其中pool以交易的哈希为键,以交易描述符为值,而交易描述符则包含了交易。
TxPool就是内存池,它实现了TxSource接口:

// TxSource represents a source of transactions to consider for inclusion in
// new blocks.
//
// The interface contract requires that all of these methods are safe for
// concurrent access with respect to the source.
type TxSource interface {
	// LastUpdated returns the last time a transaction was added to or
	// removed from the source pool.
	LastUpdated() time.Time

	// MiningDescs returns a slice of mining descriptors for all the
	// transactions in the source pool.
	MiningDescs() []*TxDesc

	// HaveTransaction returns whether or not the passed transaction hash
	// exists in the source pool.
	HaveTransaction(hash *chainhash.Hash) bool
}

由注释可以知道,TxSource代表了一个源,里面存放了可用于创建新区块的交易。
至此,新的交易被放到内存池中,当决定创建一个新区块时,btcd将从内存池中选择合适的交易打包到新区块中。

 类似资料: