从btcd.go
中的main
函数开始,启动btcd调用了btcdMain
,btcdMain
中有以下代码:
// Create server and start it.
server, err := newServer(cfg.Listeners, cfg.AgentBlacklist,
cfg.AgentWhitelist, db, activeNetParams.Params, interrupt)
分析到底这是什么Server:
// newServer returns a new btcd server configured to listen on addr for the
// bitcoin network type specified by chainParams. Use start to begin accepting
// connections from peers.
func newServer(listenAddrs, agentBlacklist, agentWhitelist []string,
db database.DB, chainParams *chaincfg.Params,
interrupt <-chan struct{}) (*server, error) {
由注释可知,这个Server用于监听对等方的连接,btcdMain
在得到Server对象后,随后开启该服务器:
server.Start()
该方法上的注释:
// Start begins accepting connections from peers.
func (s *server) Start() {
说明btcd开始接受对等方的连接。
在该Start
方法中又有以下代码:
// Start the peer handler which in turn starts the address and block
// managers.
s.wg.Add(1)
go s.peerHandler()
创建了一个goroutine作为对等方处理器,该处理器依次开启地址处理器和块处理器。
继续看peerHandler
完成了什么工作:
// peerHandler is used to handle peer operations such as adding and removing
// peers to and from the server, banning peers, and broadcasting messages to
// peers. It must be run in a goroutine.
func (s *server) peerHandler() {
peerHandler
用于处理对等方操作,比如添加、移除、禁止对等方,以及广播消息到对等方。
当一个对等方发布一个交易时,需要靠该服务器将交易广播到对等方,进而让矿工将交易打包到区块中。
peerHandler
开启了两个管理器,分别是地址管理器和同步管理器:
// Start the address manager and sync manager, both of which are needed
// by peers. This is done here since their lifecycle is closely tied
// to this handler and rather than adding more channels to sychronize
// things, it's easier and slightly faster to simply start and stop them
// in this handler.
s.addrManager.Start()
s.syncManager.Start()
SyncManager
的Start
方法如下:
// Start begins the core block handler which processes block and inv messages.
func (sm *SyncManager) Start() {
// Already started?
if atomic.AddInt32(&sm.started, 1) != 1 {
return
}
log.Trace("Starting sync manager")
sm.wg.Add(1)
go sm.blockHandler()
}
SyncManager
开启了一个blockHandler
块处理器,其代码如下:
// blockHandler is the main handler for the sync manager. It must be run as a
// goroutine. It processes block and inv messages in a separate goroutine
// from the peer handlers so the block (MsgBlock) messages are handled by a
// single thread without needing to lock memory data structures. This is
// important because the sync manager controls which blocks are needed and how
// the fetching should proceed.
func (sm *SyncManager) blockHandler() {
stallTicker := time.NewTicker(stallSampleInterval)
defer stallTicker.Stop()
out:
for {
select {
case m := <-sm.msgChan:
switch msg := m.(type) {
case *newPeerMsg:
sm.handleNewPeerMsg(msg.peer)
...
case *txMsg:
sm.handleTxMsg(msg)
msg.reply <- struct{}{}
...
default:
log.Warnf("Invalid message type in block "+
"handler: %T", msg)
}
case <-stallTicker.C:
sm.handleStallSample()
case <-sm.quit:
break out
}
}
sm.wg.Done()
log.Trace("Block handler done")
}
其中,handleTxMsg
是处理交易的入口。那么这个txMsg
类型的消息是由谁发过来的呢?
我们先看txMsg
的结构体定义:
// txMsg packages a bitcoin tx message and the peer it came from together
// so the block handler has access to that information.
type txMsg struct {
tx *btcutil.Tx
peer *peerpkg.Peer
reply chan struct{}
}
它包含了一个交易、一个对等方、用于回复的通道。
现在先分析是谁往通道中发送txMsg
这种结构体:
在peer.Config
中,有这样一个域:
// OnTx is invoked when a peer receives a tx bitcoin message.
OnTx func(p *Peer, msg *wire.MsgTx)
由注释可以知道,OnTx
这个方法在对等方收到一个交易的消息后就会执行。我们继续看OnTx
的实现:
// OnTx is invoked when a peer receives a tx bitcoin message. It blocks
// until the bitcoin transaction has been fully processed. Unlock the block
// handler this does not serialize all transactions through a single thread
// transactions don't rely on the previous one in a linear fashion like blocks.
func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) {
...
// Queue the transaction up to be handled by the sync manager and
// intentionally block further receives until the transaction is fully
// processed and known good or bad. This helps prevent a malicious peer
// from queuing up a bunch of bad transactions before disconnecting (or
// being disconnected) and wasting memory.
sp.server.syncManager.QueueTx(tx, sp.Peer, sp.txProcessed)
<-sp.txProcessed
}
QueueTx
这个方法将交易排在队列中,等待同步管理器的处理。而在QueueTx
中有这样一段:
sm.msgChan <- &txMsg{tx: tx, peer: peer, reply: done}
到这里可以知道,当对等方收到一个交易的时候,就会往同步管理器的msgChan
发送这个交易。这时同步管理器的blockHandler
方法收到这个结构体,就会进一步调用handleTxMsg
处理这个交易。
其handleTxMsg
方法如下:
// handleTxMsg handles transaction messages from all peers.
func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {
peer := tmsg.peer
state, exists := sm.peerStates[peer]
if !exists {
log.Warnf("Received tx message from unknown peer %s", peer)
return
}
// NOTE: BitcoinJ, and possibly other wallets, don't follow the spec of
// sending an inventory message and allowing the remote peer to decide
// whether or not they want to request the transaction via a getdata
// message. Unfortunately, the reference implementation permits
// unrequested data, so it has allowed wallets that don't follow the
// spec to proliferate. While this is not ideal, there is no check here
// to disconnect peers for sending unsolicited transactions to provide
// interoperability.
txHash := tmsg.tx.Hash()
// Ignore transactions that we have already rejected. Do not
// send a reject message here because if the transaction was already
// rejected, the transaction was unsolicited.
if _, exists = sm.rejectedTxns[*txHash]; exists {
log.Debugf("Ignoring unsolicited previously rejected "+
"transaction %v from %s", txHash, peer)
return
}
// Process the transaction to include validation, insertion in the
// memory pool, orphan handling, etc.
acceptedTxs, err := sm.txMemPool.ProcessTransaction(tmsg.tx,
true, true, mempool.Tag(peer.ID()))
// Remove transaction from request maps. Either the mempool/chain
// already knows about it and as such we shouldn't have any more
// instances of trying to fetch it, or we failed to insert and thus
// we'll retry next time we get an inv.
delete(state.requestedTxns, *txHash)
delete(sm.requestedTxns, *txHash)
if err != nil {
// Do not request this transaction again until a new block
// has been processed.
limitAdd(sm.rejectedTxns, *txHash, maxRejectedTxns)
// When the error is a rule error, it means the transaction was
// simply rejected as opposed to something actually going wrong,
// so log it as such. Otherwise, something really did go wrong,
// so log it as an actual error.
if _, ok := err.(mempool.RuleError); ok {
log.Debugf("Rejected transaction %v from %s: %v",
txHash, peer, err)
} else {
log.Errorf("Failed to process transaction %v: %v",
txHash, err)
}
// Convert the error into an appropriate reject message and
// send it.
code, reason := mempool.ErrToRejectErr(err)
peer.PushRejectMsg(wire.CmdTx, code, reason, txHash, false)
return
}
sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
}
handleTxMsg
执行流程大概如下:
上文已经看过txMsg
的结构,它包含一个对等方、一个交易、一个回复用的通道。再看btcutil.Tx
的结构:
// Tx defines a bitcoin transaction that provides easier and more efficient
// manipulation of raw transactions. It also memoizes the hash for the
// transaction on its first access so subsequent accesses don't have to repeat
// the relatively expensive hashing operations.
type Tx struct {
msgTx *wire.MsgTx // Underlying MsgTx
txHash *chainhash.Hash // Cached transaction hash
txHashWitness *chainhash.Hash // Cached transaction witness hash
txHasWitness *bool // If the transaction has witness data
txIndex int // Position within a block or TxIndexUnknown
}
Tx
包括一个底层的交易对象,交易哈希值,交易witness
的哈希值,交易是否由witness
,交易的下标。
wire.MsgTx
的代码如下:
// MsgTx implements the Message interface and represents a bitcoin tx message.
// It is used to deliver transaction information in response to a getdata
// message (MsgGetData) for a given transaction.
//
// Use the AddTxIn and AddTxOut functions to build up the list of transaction
// inputs and outputs.
type MsgTx struct {
Version int32
TxIn []*TxIn
TxOut []*TxOut
LockTime uint32
}
其中的TxIn
和TxOut
就是一个交易的输入和输出,他们的定义如下:
// TxIn defines a bitcoin transaction input.
type TxIn struct {
PreviousOutPoint OutPoint
SignatureScript []byte
Witness TxWitness
Sequence uint32
}
// TxOut defines a bitcoin transaction output.
type TxOut struct {
Value int64
PkScript []byte
}
查看TxWitness
的结构,实际上是一个二维byte
数组:
// TxWitness defines the witness for a TxIn. A witness is to be interpreted as
// a slice of byte slices, or a stack with one or many elements.
type TxWitness [][]byte
那么到底witness
是什么:
引自 https://github.com/bitcoin/bips/blob/master/bip-0141.mediawiki:
This BIP defines a new structure called a “witness” that is committed to blocks separately from the transaction merkle tree. This structure contains data required to check transaction validity but not required to determine transaction effects. In particular, scripts and signatures are moved into this new structure.
The witness is committed in a tree that is nested into the block’s existing merkle root via the coinbase transaction for the purpose of making this BIP soft fork compatible. A future hard fork can place this tree in its own branch.
一个witness
包含检查事务有效性所需的数据,但不包含确定事务影响所需的数据。特别地,交易脚本和签名被移到了这个结构中。
了解了这些内容后,继续查看处理交易的mempool/mempool.go
下的ProcessTransaction
方法:
// ProcessTransaction is the main workhorse for handling insertion of new
// free-standing transactions into the memory pool. It includes functionality
// such as rejecting duplicate transactions, ensuring transactions follow all
// rules, orphan transaction handling, and insertion into the memory pool.
//
// It returns a slice of transactions added to the mempool. When the
// error is nil, the list will include the passed transaction itself along
// with any additional orphan transaactions that were added as a result of
// the passed one being accepted.
//
// This function is safe for concurrent access.
func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) {
log.Tracef("Processing transaction %v", tx.Hash())
// Protect concurrent access.
mp.mtx.Lock()
defer mp.mtx.Unlock()
// Potentially accept the transaction to the memory pool.
missingParents, txD, err := mp.maybeAcceptTransaction(tx, true, rateLimit,
true)
if err != nil {
return nil, err
}
if len(missingParents) == 0 {
// Accept any orphan transactions that depend on this
// transaction (they may no longer be orphans if all inputs
// are now available) and repeat for those accepted
// transactions until there are no more.
newTxs := mp.processOrphans(tx)
acceptedTxs := make([]*TxDesc, len(newTxs)+1)
// Add the parent transaction first so remote nodes
// do not add orphans.
acceptedTxs[0] = txD
copy(acceptedTxs[1:], newTxs)
return acceptedTxs, nil
}
// The transaction is an orphan (has inputs missing). Reject
// it if the flag to allow orphans is not set.
if !allowOrphan {
// Only use the first missing parent transaction in
// the error message.
//
// NOTE: RejectDuplicate is really not an accurate
// reject code here, but it matches the reference
// implementation and there isn't a better choice due
// to the limited number of reject codes. Missing
// inputs is assumed to mean they are already spent
// which is not really always the case.
str := fmt.Sprintf("orphan transaction %v references "+
"outputs of unknown or fully-spent "+
"transaction %v", tx.Hash(), missingParents[0])
return nil, txRuleError(wire.RejectDuplicate, str)
}
// Potentially add the orphan transaction to the orphan pool.
err = mp.maybeAddOrphan(tx, tag)
return nil, err
}
ProcessTransaction
负责将交易插入到内存池中。它的功能包括:拒绝重复的交易、确保交易符合规定、孤儿交易处理、插入内存池。它实际调用了maybeAcceptTransaction
将交易放到内存池中:
// Potentially accept the transaction to the memory pool. missingParents, txD, err := mp.maybeAcceptTransaction(tx, true, rateLimit, true)
maybeAcceptTransaction
是一个非常巨型的方法,这里不再列出,它主要做了以下事情:
1. 检查交易是否已经在内存池中;
2. 对事物执行初步健全性检查;
3. 检查这个独立的交易是否coinbase交易;
4. 如果是非标准交易,检查配置的网络参数是否禁止了这种交易;
5. 在内存池中检查这个交易是否发生了double spending(检查这个交易是否用了已经被使用过的交易输出);
6. 从UTXO拿到该交易的所有输入,同时检查UTXO是否已经存在该交易,如果该交易存在主链上并且已经被使用,拒绝它;
7. 如果引用的交易不存在,这个交易是孤儿交易,孤儿交易由调用者处理;
8. 检查交易的sequence lock是否活跃;
9. 使用规定区块链中允许进入区块的交易的不变规则,对交易输入执行多项检查。
10. 检查交易是否有过多签名;
11. 检查交易是否交易费过低,size不超过一定值的free tx仍被允许;
12. 检查free tx是否有足够的优先级被放到下一个区块,并限制free tx的速率,防止penny-flooding
13. 为交易的每个输入验证签名,如果验证不通过则拒绝;
14. 将交易添加到内存池中。
maybeAcceptTransaction
通过调用addTransaction
将交易放到内存池中,查看addTransaction
代码,它将交易放到内存池中:
mp.pool[*tx.Hash()] = txD
查看TxPool
结构体:
type TxPool struct {
// The following variables must only be used atomically.
lastUpdated int64 // last time pool was updated
mtx sync.RWMutex
cfg Config
pool map[chainhash.Hash]*TxDesc
orphans map[chainhash.Hash]*orphanTx
orphansByPrev map[wire.OutPoint]map[chainhash.Hash]*btcutil.Tx
outpoints map[wire.OutPoint]*btcutil.Tx
pennyTotal float64 // exponentially decaying total for penny spends.
lastPennyUnix int64 // unix time of last ``penny spend''
// nextExpireScan is the time after which the orphan pool will be
// scanned in order to evict orphans. This is NOT a hard deadline as
// the scan will only run when an orphan is added to the pool as opposed
// to on an unconditional timer.
nextExpireScan time.Time
}
其中pool
以交易的哈希为键,以交易描述符为值,而交易描述符则包含了交易。
TxPool
就是内存池,它实现了TxSource
接口:
// TxSource represents a source of transactions to consider for inclusion in
// new blocks.
//
// The interface contract requires that all of these methods are safe for
// concurrent access with respect to the source.
type TxSource interface {
// LastUpdated returns the last time a transaction was added to or
// removed from the source pool.
LastUpdated() time.Time
// MiningDescs returns a slice of mining descriptors for all the
// transactions in the source pool.
MiningDescs() []*TxDesc
// HaveTransaction returns whether or not the passed transaction hash
// exists in the source pool.
HaveTransaction(hash *chainhash.Hash) bool
}
由注释可以知道,TxSource
代表了一个源,里面存放了可用于创建新区块的交易。
至此,新的交易被放到内存池中,当决定创建一个新区块时,btcd将从内存池中选择合适的交易打包到新区块中。