这块内容主要是一个拜占庭的过程。本文掐头去尾,只讲一下在 quorum
中是如何实现拜占庭的,也就是共识接口 Seal()
下向拜占庭发送了一个区块的请求事件开始。请求事件如下:
// post block into Istanbul engine
go sb.EventMux().Post(istanbul.RequestEvent{
Proposal: block,
})
在说正文之前,先讲四个变量:sequence round state Code
,因为这四个变量覆盖了 BFT
的整个流程,可以说 BFT
玩的就是这几个变量。
// 视图包括round和sequence两个数字
//sequence是我们即将提交的区块号
//每个round都有一个数字,而且包含三步:preprepare, prepare and commit
// 如果给定一个区块没有被验证者接收,round将会改变,并且验证者将使用 round+1 这个新的round开始新的验证周期
type View struct {
Round *big.Int
Sequence *big.Int
}
state
表述了每个 round
周期中,进行到了哪一步,是每个 round
周期的状态。
type State uint64
const (
StateAcceptRequest State = iota
StatePreprepared
StatePrepared
StateCommitted
)
标明发送消息的类型
const (
msgPreprepare uint64 = iota
msgPrepare
msgCommit
msgRoundChange
msgAll // 未使用
)
type message struct {
Code uint64 // 消息的类型
Msg []byte // 消息的内容
Address common.Address // 发送消息的人
Signature []byte // 消息的签名
CommittedSeal []byte // 提交消息到区块额外字段时用到
}
从这里开始正式对 quorum
的 istanbul
共识的 BFT
流程进行说明,开始的文件是 consensus/istanbul/core/handler.go
。
在启动挖矿的时候会启动 BFT
,此时开始一个新的 BFT
轮次,然后订阅事件并启动对事件的监听及处理。
func (c *core) Start() error {
// Start a new round from last sequence + 1
// 开始一个新的轮次
c.startNewRound(common.Big0)
// Tests will handle events itself, so we have to make subscribeEvents()
// be able to call in test.
// 订阅事件,事件包括:
// |-- 普通拜占庭事件
// |-- 区块提案请求事件
// |-- 消息事件,又分为四种:
// |--preprepare
// |--prepare
// |--commit
// |--roundChange
// |-- 积压消息事件
// |-- round周期超时事件
// |--区块头提交事件
c.subscribeEvents()
// 启动一个协程来处理不同的事件
go c.handleEvents()
return nil
}
要处理的事件可分为三大类:普通拜占庭事件、round周期超时事件、区块头提交事件
。普通拜占庭事件
监听的是 BFT
一个轮次的流程。round周期超时事件
则是监听每个 BFT
轮次的时间,只要超时就会结束本次 BFT
流程,然后开始一个新的 BFT
流程。区块头提交事件
则是只要矿工收到区块头,就结束本次 BFT
,然后开始一个新块的 BFT
。
func (c *core) handleEvents() {
// Clear state
defer func() {
c.current = nil
c.handlerWg.Done()
}()
c.handlerWg.Add(1)
for {
select {
case event, ok := <-c.events.Chan():
if !ok {
return
}
// A real event arrived, process interesting content
// 一个真正人事件到达,处理有趣的内容
switch ev := event.Data.(type) {
case istanbul.RequestEvent: // 普通拜占庭事件
r := &istanbul.Request{ // 区块提案请求事件
Proposal: ev.Proposal,
}
// 处理区块提案请求
err := c.handleRequest(r)
// 如果是未来的请求则保存
if err == errFutureMessage {
c.storeRequestMsg(r)
}
case istanbul.MessageEvent: // 各自消息事件
if err := c.handleMsg(ev.Payload); err == nil {
// 如果没错误,通过gossip协议发送给其它验证器(不包括自己)
c.backend.Gossip(c.valSet, ev.Payload)
}
case backlogEvent: // 积压消息事件
// No need to check signature for internal messages
// 没必要检查内部消息的签名,因为消息事件中已检查过了
if err := c.handleCheckedMsg(ev.msg, ev.src); err == nil {
p, err := ev.msg.Payload()
if err != nil {
c.logger.Warn("Get message payload failed", "err", err)
continue
}
c.backend.Gossip(c.valSet, p)
}
}
case _, ok := <-c.timeoutSub.Chan():// 拜占庭round周期超时事件
if !ok {
return
}
c.handleTimeoutMsg()
case event, ok := <-c.finalCommittedSub.Chan(): //区块头提交事件
if !ok {
return
}
switch event.Data.(type) {
case istanbul.FinalCommittedEvent:
c.handleFinalCommitted()
}
}
}
}
// round 超时后处理方式
func (c *core) handleTimeoutMsg() {
// If we're not waiting for round change yet, we can try to catch up
// the max round with F+1 round change message. We only need to catch up
// if the max round is larger than current round.
// 如果不需要等待round值修改
if !c.waitingForRoundChange {
//消息数量大于等于 f+1 的round中, 取round值最大的那个round值,
maxRound := c.roundChangeSet.MaxRound(c.valSet.F() + 1)
if maxRound != nil && maxRound.Cmp(c.current.Round()) > 0 {
//修改 round 轮次
c.sendRoundChange(maxRound)
return
}
}
// 拿到最后一次申请的提案
lastProposal, _ := c.backend.LastProposal()
// 如果提案不是空的,而且提案不小于当前的区块号,则开始生产新块的轮次
if lastProposal != nil && lastProposal.Number().Cmp(c.current.Sequence()) >= 0 {
c.logger.Trace("round change timeout, catch up latest sequence", "number", lastProposal.Number().Uint64())
// 开始一个全新round轮次
c.startNewRound(common.Big0)
} else {
// 否则,进入下一个round轮次,也就是round+1
c.sendNextRoundChange()
}
}
区块头提交事件
func (c *core) handleFinalCommitted() error {
logger := c.logger.New("state", c.state)
logger.Trace("Received a final committed proposal")
// 开始一个新的轮次
c.startNewRound(common.Big0)
return nil
}
然后开始重头戏,BFT的正常流程。
——开始一个新的 round
轮次,round=0
,sequence=当前区块链高度+1
,state状态置为StateAcceptRequest
,启动 round
的计时器。
——区块提案申请人提交提案。
——收到请求后判断,如果是未来消息,则保存该请求到待处理请求的优先队列中。否则发送 preprepare
消息。
——把当前 View(round sequence)
和请求提案(区块)包装成 preprepare
消息,广播出去,广播前会对消息进行签名处理。
——收到preprepare消息后处理消息,如果验证区块提案时发现这是一个未来区块,则把它放到积压消息的优先队列中,到期处理。如果 区块Hash
被锁定且一致,则发送一个 commit消息
。如果是正常流程,则切换 state状态
为 StatePreparepared
并发送 prepare消息
。
——把 View(round sequence)
和 Hash
包装成 prepare消息
,广播出去,广播前会对消息进行签名处理。
——收到 prepare消息
后处理消息,检查 round、sequence
来确保消息的有效性,验证消息签名的正确性,当收到 2f+1
条消息后,锁定 Hash
、切换 state
状态为 StatePrepared
并发送 commit消息
。
——把 View(round sequence)
和 Hash
包装成 commit消息
,广播出去,广播前会对消息进行签名处理。
——收到 commit消息
后处理消息,检查 round、sequence
来确保消息的有效性,验证消息签名的正确性,当收到 2f+1
条消息后,锁定 Hash
并提交。
——切换 state
状态为 StateCommitted
,然后整理共识数据并插入区块的 额外字段extra
中,如果插入失败,启动当前区块的下一轮 round
共识。
接下来开始详细介绍,处理普通拜占庭事件时,又会监听三种事件:区块提案请求事件、各种消息事件、积压消息事件
在说这三种事件之前,先介绍 startNewRound(round *big.Int)
方法,这个方法是启动一个新的 BFT
轮次,如果传入的 round
值是 0
,代表要生产一个新的区块。
首先从区块链中获取最高的区块,然后和 sequence
做比较来判断共识的区块是否正确,如果 >sequence
,证明当前共识的区块提案已过时,需要共识后面的区块,如果 =sequence-1
,证明要共识的区块提案没问题了,然后用 传入 round
和 当前round
比较来判断,如果 传入round=0
,则证明当前轮次已开始,不必重复开始当前轮次,直接结束,如果 传入round
大于 当前round
,证明需要修改 round
值,则直接修改 round
值,此时 sequence
值不变(如果不需要修改 round
值时,sequence
值为 最高区块号+1
,round
值为 0
,通过最高区块获取验证器)。接下来做一些本次 round
轮次的 准备工作,清空当前轮次无效的信息集全,准备存放收到的消息,更新 current
,计算新的申请人。
计算新的申请人的方法,
首先得到一个种子seed,
然后种子seed对验证者数量取模pick,
pick做为验证者集合的偏移量,得到验证者。
计算种子的方法有两种,roundRobinProposer和stickyProposer,选择方式是在创世区块配置文件中配置的。
roundRobinProposer:
如果上一个区块的生产者是空,种子seed取round。
否则上一个区块的生产者在集合中的偏移量+round+1
stickyProposer:
如果上一个区块的生产者是空,种子seed取round。
否则上一个区块的生产者在集合中的偏移量+round
如果 round
修改了,而且当前节点是申请人,则发送一个 preprepare消息
,如果 Hash
被锁定,则发送当前区块提案,如果当前待处理请求队列中有内容,则发送待处理请求队列中的内容。最后启动一个 roundChange计时器
。
func (c *core) handleEvents() {
// Clear state
defer func() {
c.current = nil
c.handlerWg.Done()
}()
c.handlerWg.Add(1)
for {
select {
case event, ok := <-c.events.Chan():
if !ok {
return
}
// A real event arrived, process interesting content
// 一个真正人事件到达,处理有趣的内容
switch ev := event.Data.(type) {
case istanbul.RequestEvent: // 普通拜占庭事件
r := &istanbul.Request{ // 区块提案请求事件
Proposal: ev.Proposal,
}
// 处理区块提案请求
err := c.handleRequest(r)
// 如果是未来的请求则保存
if err == errFutureMessage {
c.storeRequestMsg(r)
}
case istanbul.MessageEvent: // 各自消息事件
if err := c.handleMsg(ev.Payload); err == nil {
// 如果没错误,通过gossip协议发送给其它验证器(不包括自己)
c.backend.Gossip(c.valSet, ev.Payload)
}
case backlogEvent: // 积压消息事件
// No need to check signature for internal messages
// 没必要检查内部消息的签名,因为消息事件中已检查过了
if err := c.handleCheckedMsg(ev.msg, ev.src); err == nil {
p, err := ev.msg.Payload()
if err != nil {
c.logger.Warn("Get message payload failed", "err", err)
continue
}
c.backend.Gossip(c.valSet, p)
}
}
case _, ok := <-c.timeoutSub.Chan():// 拜占庭round周期超时事件
if !ok {
return
}
c.handleTimeoutMsg()
case event, ok := <-c.finalCommittedSub.Chan(): //区块头提交事件
if !ok {
return
}
switch event.Data.(type) {
case istanbul.FinalCommittedEvent:
c.handleFinalCommitted()
}
}
}
}
当 istanbul共识
需要调用拜占庭达成共识时,会是 Seal()
接口中发送区块提案请求给拜占庭。
// post block into Istanbul engine
go sb.EventMux().Post(istanbul.RequestEvent{
Proposal: block,
})
然后拜占庭就收到了 istanbul.RequestEvent
事件,然后处理该请求。
func (c *core) handleRequest(request *istanbul.Request) error {
logger := c.logger.New("state", c.state, "seq", c.current.sequence)
// 检查消息是无效的?过时的?未来的?
if err := c.checkRequestMsg(request); err != nil {
if err == errInvalidMessage {
logger.Warn("invalid request")
return err
}
logger.Warn("unexpected request", "err", err, "number", request.Proposal.Number(), "hash", request.Proposal.Hash())
return err
}
logger.Trace("handleRequest", "number", request.Proposal.Number(), "hash", request.Proposal.Hash())
// 如果检查没问题,则发送一条preprepare消息
c.current.pendingRequest = request
if c.state == StateAcceptRequest {
c.sendPreprepare(request)
}
return nil
}
处理请求的第一步是检查消息。
// 检查request状态
func (c *core) checkRequestMsg(request *istanbul.Request) error {
if request == nil || request.Proposal == nil {
// 如果消息无效
return errInvalidMessage
}
// 检查到哪一步骤了
if c := c.current.sequence.Cmp(request.Proposal.Number()); c > 0 {
// 如果消息过时
return errOldMessage
} else if c < 0 {
// 如果消息是未来的,这里会把消息保存起来,下文有介绍
return errFutureMessage
} else {
return nil
}
}
如果是由于收到未来消息而报错,则需要保存请求的消息到待处理请求的优先队列中。
// 保存请求消息到待处理队列
func (c *core) storeRequestMsg(request *istanbul.Request) {
logger := c.logger.New("state", c.state)
logger.Trace("Store future request", "number", request.Proposal.Number(), "hash", request.Proposal.Hash())
c.pendingRequestsMu.Lock()
defer c.pendingRequestsMu.Unlock()
// 未来的消息保存到优先队列
c.pendingRequests.Push(request, float32(-request.Proposal.Number().Int64()))
}
那么什么时候处理这些请求呢?在修改状态 state
的时候,而且是当把 state
修改为 StateAcceptRequest
时。修改 state
时也会处理积压的消息。那如何处理待处理请求的优先队列中的请求呢?
// 处理待处理的请求
func (c *core) processPendingRequests() {
c.pendingRequestsMu.Lock()
defer c.pendingRequestsMu.Unlock()
for !(c.pendingRequests.Empty()) {
// 从优先队列中取出请求
m, prio := c.pendingRequests.Pop()
r, ok := m.(*istanbul.Request)
if !ok {
c.logger.Warn("Malformed request, skip", "msg", m)
continue
}
// Push back if it's a future message
// 查检请求的状态
err := c.checkRequestMsg(r)
if err != nil {
// 如果是未来的请求,还放回待处理队列,然后找下一条
if err == errFutureMessage {
c.logger.Trace("Stop processing request", "number", r.Proposal.Number(), "hash", r.Proposal.Hash())
c.pendingRequests.Push(m, prio)
break
}
c.logger.Trace("Skip the pending request", "number", r.Proposal.Number(), "hash", r.Proposal.Hash(), "err", err)
continue
}
c.logger.Trace("Post pending request", "number", r.Proposal.Number(), "hash", r.Proposal.Hash())
// 如果检查没问题,则发送请求事件来处理该请求
go c.sendEvent(istanbul.RequestEvent{
Proposal: r.Proposal,
})
}
}
上文说到发送 preprepare
消息,这个消息就是在这里监听处理的。首先检查消息的签名,然后检查消息的地址,如果都没问题,则分别处理不同的消息。
func (c *core) handleMsg(payload []byte) error {
logger := c.logger.New()
// Decode message and check its signature
// 解码消息并检查它的签名
msg := new(message)
if err := msg.FromPayload(payload, c.validateFn); err != nil {
logger.Error("Failed to decode message from payload", "err", err)
return err
}
// Only accept message if the address is valid
// 只接收地址是有效的消息
_, src := c.valSet.GetByAddress(msg.Address)
if src == nil {
logger.Error("Invalid address in message", "msg", msg)
return istanbul.ErrUnauthorizedAddress
}
return c.handleCheckedMsg(msg, src)
}
检查消息签名的方法,需要的参数是消息笔验证的回调函数。
func (m *message) FromPayload(b []byte, validateFn func([]byte, []byte) (common.Address, error)) error {
// Decode message
// 解码消息
err := rlp.DecodeBytes(b, &m)
if err != nil {
return err
}
// Validate message (on a message without Signature)
if validateFn != nil {
var payload []byte
// 拿到没有签名的消息对象,即把签名的字段置空
payload, err = m.PayloadNoSig()
if err != nil {
return err
}
// 调用回调函数来检查消息的签名是否正确
_, err = validateFn(payload, m.Signature)
}
// Still return the message even the err is not nil
// 返回的消息不是err就是nil
return err
}
这里再介绍一下验证消息的回调函数
func CheckValidatorSignature(valSet ValidatorSet, data []byte, sig []byte) (common.Address, error) {
// 1. Get signature address
// 拿到签名地址
signer, err := GetSignatureAddress(data, sig)
if err != nil {
log.Error("Failed to get signer address", "err", err)
return common.Address{}, err
}
// 2. Check validator
// 检查签名者地址是不是申请人的
if _, val := valSet.GetByAddress(signer); val != nil {
return val.Address(), nil
}
return common.Address{}, ErrUnauthorizedAddress
}
然后分别处理四种不同的消息,如果处理的消息是未来的消息,则把未来的消息保存到积压消息的优先队列中。
func (c *core) handleCheckedMsg(msg *message, src istanbul.Validator) error {
logger := c.logger.New("address", c.address, "from", src)
// Store the message if it's a future message
// 如果是未来的消息,就保存起来
testBacklog := func(err error) error {
if err == errFutureMessage {
c.storeBacklog(msg, src)
}
return err
}
switch msg.Code {
case msgPreprepare:
return testBacklog(c.handlePreprepare(msg, src))
case msgPrepare:
return testBacklog(c.handlePrepare(msg, src))
case msgCommit:
return testBacklog(c.handleCommit(msg, src))
case msgRoundChange:
return testBacklog(c.handleRoundChange(msg, src))
default:
logger.Error("Invalid message", "msg", msg)
}
return errInvalidMessage
}
由于保存未来消息是个单独的模块,这里先讲保存未来的消息,然后依次介绍如何处理其他四种消息。
(1)保存未来的消息
保存未来的消息,简单来说,就是搞一个优先队列,然后分别把不同的消息放进队列中即可。
func (c *core) storeBacklog(msg *message, src istanbul.Validator) {
logger := c.logger.New("from", src, "state", c.state)
// 不保存自己的消息
if src.Address() == c.Address() {
logger.Warn("Backlog from self")
return
}
logger.Trace("Store future message")
c.backlogsMu.Lock()
defer c.backlogsMu.Unlock()
// 搞一个优先队列
logger.Debug("Retrieving backlog queue", "for", src.Address(), "backlogs_size", len(c.backlogs))
backlog := c.backlogs[src.Address()]
// 如果通过验证者地址没找到对应的优先队列,则创建一个新的
if backlog == nil {
backlog = prque.New()
}
// 消息和消息的优先级保存起来
switch msg.Code {
case msgPreprepare: // 处理preprepare消息
// 解码
var p *istanbul.Preprepare
err := msg.Decode(&p)
if err == nil {
// 放进队列保存
backlog.Push(msg, toPriority(msg.Code, p.View))
}
// for msgRoundChange, msgPrepare and msgCommit cases
default: // 处理其它三种消息,因为保存消息的数据结构不一样,所以分别处理
var p *istanbul.Subject
err := msg.Decode(&p)
if err == nil {
backlog.Push(msg, toPriority(msg.Code, p.View))
}
}
c.backlogs[src.Address()] = backlog
}
保存消息到优先级队列中,那优先级是怎么定义的呢?
如果是msgRoundChange消息,则-(sequence * 1000)
如果是其它消息,则 -(sequence x 1000 + round x 10 + 消息的优先级)
消息的优先级:msgPreprepare > msgCommit > msgPrepare,分别是1,2,3
保存后的消息是什么时候处理的呢,就是上文提到的,修改 state
状态的时候,由于保存的消息涵盖了消息的所有类型(四种),所以 state
的任何一次修改,都要处理一下积压的未来消息,那是如何处理的呢?
// 处理队列中积压的消息
func (c *core) processBacklog() {
c.backlogsMu.Lock()
defer c.backlogsMu.Unlock()
// 遍历保存的所有的优先队列
for srcAddress, backlog := range c.backlogs {
//如果得到的队列是空,慢找下一个队列
if backlog == nil {
continue
}
// 如果地址为空,则证明该队列中消息取完了,则删除该队列
_, src := c.valSet.GetByAddress(srcAddress)
if src == nil {
// validator is not available
delete(c.backlogs, srcAddress)
continue
}
logger := c.logger.New("from", src, "state", c.state)
isFuture := false // 搞一个未来消息的标识
// We stop processing if
// 1. backlog is empty
// 2. The first message in queue is a future message
// 依次取出队列中的消息,但是不处理空的队列和未来消息
for !(backlog.Empty() || isFuture) {
m, prio := backlog.Pop() // 取出消息和优先级
msg := m.(*message)
var view *istanbul.View
switch msg.Code { // 根据不同的消息取出不同的视图view,作用是判断消息的有效性
case msgPreprepare:
var m *istanbul.Preprepare
err := msg.Decode(&m)
if err == nil {
view = m.View
}
// for msgRoundChange, msgPrepare and msgCommit cases
default:
var sub *istanbul.Subject
err := msg.Decode(&sub)
if err == nil {
view = sub.View
}
}
if view == nil {
logger.Debug("Nil view", "msg", msg)
continue
}
// Push back if it's a future message
// 检查消息的有效性,判定消息是过去?未来的?
// 如果是未来的消息,还放回原处
err := c.checkMessage(msg.Code, view)
// 如果有错误,取下一条消息
if err != nil {
if err == errFutureMessage {
logger.Trace("Stop processing backlog", "msg", msg)
backlog.Push(msg, prio)
isFuture = true
break
}
logger.Trace("Skip the backlog event", "msg", msg, "err", err)
continue
}
logger.Trace("Post backlog event", "msg", msg)
// 如果没问题,则广播积压的消息事件
go c.sendEvent(backlogEvent{
src: src,
msg: msg,
})
}
}
}
(2)处理msgPreprepare消息
这里要处理 preprepare消息
,上文的区块提案请求中提到发送 preprepare消息
,那是怎么发的呢?这是先说发,后说处理。发送消息说起来也简单,如果当前节点是申请人,并且当前的区块序列和提案的区块序列一致,则把 request
中的区块提案包装成 preprepare消息
,然后广播出去。
func (c *core) sendPreprepare(request *istanbul.Request) {
logger := c.logger.New("state", c.state)
// If I'm the proposer and I have the same sequence with the proposal
// 如果当前节点是申请人,并且当前的区块序列和提案的区块序列一致
if c.current.Sequence().Cmp(request.Proposal.Number()) == 0 && c.IsProposer() {
curView := c.currentView()
// 把request对象包装成preprepare消息对象
preprepare, err := Encode(&istanbul.Preprepare{
View: curView,
Proposal: request.Proposal,
})
if err != nil {
logger.Error("Failed to encode", "view", curView)
return
}
// 把preprepare消息对象广播出去
c.broadcast(&message{
Code: msgPreprepare,
Msg: preprepare,
})
}
}
在广播消息的时候,会对消息进行签名、rlp编码
生成最终消息,处理完成后,才会调用广播的方式把消息广播出去,生成最终消息的处理方式如下:
func (c *core) finalizeMessage(msg *message) ([]byte, error) {
var err error
// Add sender address 消息的地址是当前节点的地址
msg.Address = c.Address()
// Add proof of consensus 在广播commit消息的时候,填充这个字段,最终会放在区块的额外字段中
msg.CommittedSeal = []byte{}
// Assign the CommittedSeal if it's a COMMIT message and proposal is not nil
if msg.Code == msgCommit && c.current.Proposal() != nil {
seal := PrepareCommittedSeal(c.current.Proposal().Hash())
msg.CommittedSeal, err = c.backend.Sign(seal)
if err != nil {
return nil, err
}
}
// Sign message 拿到未签名的消息数据
data, err := msg.PayloadNoSig()
if err != nil {
return nil, err
}
// 消息签名
msg.Signature, err = c.backend.Sign(data)
if err != nil {
return nil, err
}
// Convert to payload 对签名后的消息进行rlp编码
payload, err := msg.Payload()
if err != nil {
return nil, err
}
return payload, nil
}
在这里开始真正地处理 preprepare消息
。首先检查消息的有效性,如果消息是旧消息,而且区块提案存在且申请人匹配,则广播一个旧区块的 comit消息
。检查消息的申请人是否正确,验证区块是否有问题,如果这是一个未来的 区块,则到期广播该消息,如果是其它错误,则进入下一个共识轮次。如果前面检查没问题,则进入真正的 preprepare消息
处理流程中,如果 当前Hash
没有被锁定,则修改 state
状态并发送 prepare消息
,否则判断 锁定的Hash
是否是 当前提案的Hash
,如果是,则修改 state状态
并发送 commit消息
,如果不是,则进入下一个共识轮次。
func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {
logger := c.logger.New("from", src, "state", c.state)
// Decode PRE-PREPARE 把preprepare消息解码为preprepare对象
var preprepare *istanbul.Preprepare
err := msg.Decode(&preprepare)
if err != nil {
return errFailedDecodePreprepare
}
// Ensure we have the same view with the PRE-PREPARE message
// If it is old message, see if we need to broadcast COMMIT
// 检查消息的有效性,判定消息是过去?未来的?
if err := c.checkMessage(msgPreprepare, preprepare.View); err != nil {
// 如果收到的是旧新消息
if err == errOldMessage {
// Get validator set for the given proposal
// 获取给定区块的验证器
valSet := c.backend.ParentValidators(preprepare.Proposal).Copy()
// 获取前一个提案的申请人
previousProposer := c.backend.GetProposer(preprepare.Proposal.Number().Uint64() - 1)
// 计算区块提案的申请人
valSet.CalcProposer(previousProposer, preprepare.View.Round.Uint64())
// Broadcast COMMIT if it is an existing block
// 1. The proposer needs to be a proposer matches the given (Sequence + Round)
// 2. The given block must exist
// 如果这个区块已存在,广播一个commit消息
// 1.区块提案的申请人必须匹配
// 2.这个区块提案必须存在
if valSet.IsProposer(src.Address()) && c.backend.HasPropsal(preprepare.Proposal.Hash(), preprepare.Proposal.Number()) {
c.sendCommitForOldBlock(preprepare.View, preprepare.Proposal.Hash())
return nil
}
}
return err
}
// Check if the message comes from current proposer
// 检查消息是否来自当前申请人
if !c.valSet.IsProposer(src.Address()) {
logger.Warn("Ignore preprepare messages from non-proposer")
return errNotFromProposer
}
// Verify the proposal we received
// 验证我们收到的区块
if duration, err := c.backend.Verify(preprepare.Proposal); err != nil {
// 区块验证失败
logger.Warn("Failed to verify proposal", "err", err, "duration", duration)
// if it's a future block, we will handle it again after the duration
// 如果这是一个未来的区块,我们将到期处理它,因为消息提前了duration时间
if err == consensus.ErrFutureBlock {
c.stopFuturePreprepareTimer()
//duration 时间后广播该事件
c.futurePreprepareTimer = time.AfterFunc(duration, func() {
c.sendEvent(backlogEvent{
src: src,
msg: msg,
})
})
} else {
// 如果是其它错误,进入下一个轮次的共识
c.sendNextRoundChange()
}
return err
}
// Here is about to accept the PRE-PREPARE
// 这里真正处理preprepare消息
if c.state == StateAcceptRequest {
// Send ROUND CHANGE if the locked proposal and the received proposal are different
// 如果区块被锁定而且收到的区块是不同的,发送一个改变round的事件进入下一个轮次
if c.current.IsHashLocked() {
// 如果当前提案正好是锁定的Hash,直接发一个commit消息
if preprepare.Proposal.Hash() == c.current.GetLockedHash() {
// Broadcast COMMIT and enters Prepared state directly
// 设置preprepare消息,状态变成 prepare,广播 commit 消息
c.acceptPreprepare(preprepare)
c.setState(StatePrepared)
c.sendCommit()
} else {
// Send round change
// 进入下一个轮次
c.sendNextRoundChange()
}
} else {
// Either
// 1. the locked proposal and the received proposal match
// 2. we have no locked proposal
// 设置preprepare消息,状态变成 preprepare,广播 prepare 消息
c.acceptPreprepare(preprepare)
c.setState(StatePreprepared)
c.sendPrepare()
}
}
return nil
}
上文说到发送 prepare消息
,这是介绍一下如何发送。首先,把消息编码成 prepare消息
,然后广播出去,广播方式上文介绍过了,这里不再赘述。
然后开始处理 prepare消息
,首先把消息解码成 prepare消息
,然后检查区块的有效性,确保消息不是未来的,也不是过去的,接着验证 prepare消息
和当前轮次 区块的Hash
是一致的,之后保存消息到当前轮次。如果 当前Hash
被锁定且等消息中带的 区块Hash
,或者收到了 2f及以上
的投票且状态在 StatePrepared
之前,则锁定 Hash
,修改状态为 StatePrepared
,并发送 commit消息
。
func (c *core) handlePrepare(msg *message, src istanbul.Validator) error {
// Decode PREPARE message
// 解码 prepare 消息
var prepare *istanbul.Subject
err := msg.Decode(&prepare)
if err != nil {
return errFailedDecodePrepare
}
// 检查 prepare 消息,确保视图相同,轮次和顺序
if err := c.checkMessage(msgPrepare, prepare.View); err != nil {
return err
}
// If it is locked, it can only process on the locked block.
// Passing verifyPrepare and checkMessage implies it is processing on the locked block since it was verified in the Preprepared state.
// 验证 prepare 消息
if err := c.verifyPrepare(prepare, src); err != nil {
return err
}
c.acceptPrepare(msg, src)
// Change to Prepared state if we've received enough PREPARE messages or it is locked
// and we are in earlier state before Prepared state.
//如果当前Hash被锁定且等消息中带的区块Hash,或者收到了2f及以上的投票且状态在StatePrepared之前
// 如果收到了 2f 的确认票, 且未广播commit消息,修改状态为 prepare,然后广播 commit 消息
if ((c.current.IsHashLocked() && prepare.Digest == c.current.GetLockedHash()) || c.current.GetPrepareOrCommitSize() > 2*c.valSet.F()) &&
c.state.Cmp(StatePrepared) < 0 {
c.current.LockHash()
c.setState(StatePrepared)
c.sendCommit()
}
return nil
}
发送 commit消息
就不细说了,但是在广播 commit消息
的时候,需要把 消息的Hash
和 消息的代码Code
组合起来被私钥签名,保存到消息的 CommittedSeal
字段中,以备后续保存到区块的额外字段中。
这里着重介绍如何处理 commit消息
。首先把消息解码成 commit消息
,然后检查区块的有效性,确保消息不是未来的,也不是过去的,接着验证 commit消息
和当前轮次区块的 Hash
是一致的,之后保存消息到当前轮次。如果收到了 2f及以上
的投票,且状态在 StateCommitted
之前,则锁定 Hash
,并提交。
提交的时候,首先修改状态 state
为 StateCommitted
,然后整理共识数据,即把 commit
池中的消息取出来,最终会把这些消息放入区块头和额外字段中,然后更新区块的区块头,通过管道通知矿工共识完成,可以把区块加入区块链并广播区块了。
(5)处理msgRoundChange消息
先说一下什么情况下会发送 roundChange消息
,有三种情况:1.round周期超时
,2.需要开始下一个round轮次
,3.处理roundChange消息的时候
。
round周期超时
后,会选择 round轮次
中收到的消息数据不小于 f+1
的 round
中,round
值最大的那个 round
值。需要开始下一个 round
轮次则是 前一个round+1
的 round
值。至于第三种情况,下文详细介绍,这里先介绍如何发送 roundChange消息
。
首先就是修改 round
,而 sequence
不变,然后把该消息编码并广播出去。
// 发送一个给定的 round 来修改 round
func (c *core) sendRoundChange(round *big.Int) {
logger := c.logger.New("state", c.state)
cv := c.currentView()
// 保证新修改的round不得小于原来的round
if cv.Round.Cmp(round) >= 0 {
logger.Error("Cannot send out the round change", "current round", cv.Round, "target round", round)
return
}
// 修改round
c.catchUpRound(&istanbul.View{
// The round number we'd like to transfer to.
Round: new(big.Int).Set(round),
Sequence: new(big.Int).Set(cv.Sequence),
})
// Now we have the new round number and sequence number
// 现在我们拥有新的 round 和 sequence
cv = c.currentView()
rc := &istanbul.Subject{
View: cv,
Digest: common.Hash{},
}
// 修改round的消息编码
payload, err := Encode(rc)
if err != nil {
logger.Error("Failed to encode ROUND CHANGE", "rc", rc, "err", err)
return
}
// 把修改round的消息广播出去
c.broadcast(&message{
Code: msgRoundChange,
Msg: payload,
})
}
这个过程中更新 round
的方法需要重点介绍一下。首先打开等待 roundChange
修改的开关,然后修改 round
,清空修改后的 round
中的内容,启动一个新的 roundChange计时器
。
func (c *core) catchUpRound(view *istanbul.View) {
logger := c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence(), "old_proposer", c.valSet.GetProposer())
if view.Round.Cmp(c.current.Round()) > 0 {
c.roundMeter.Mark(new(big.Int).Sub(view.Round, c.current.Round()).Int64())
}
c.waitingForRoundChange = true
// Need to keep block locked for round catching up
// 修改round
c.updateRoundState(view, c.valSet, true)
c.roundChangeSet.Clear(view.Round) // 清空当前round中的内容
c.newRoundChangeTimer() // 搞一个新的计时器
logger.Trace("Catch up round", "new_round", view.Round, "new_seq", view.Sequence, "new_proposer", c.valSet)
}
修改round的本质就是创建一个新的 roundState
放在 current属性
中。
从这里开始处理 roundChange消息
,解码和检查消息的有效性不说了,然后 把消息添加到 roundChange消息
集中并返回这个区块的这个轮次中共有多少消息,如果数量等于 f+1
,继续发 roundChange消息
,如果 数量等于 2f+1
,开始一个新的round
轮次。
func (c *core) handleRoundChange(msg *message, src istanbul.Validator) error {
logger := c.logger.New("state", c.state, "from", src.Address().Hex())
// Decode ROUND CHANGE message
// 解码 修改round 消息
var rc *istanbul.Subject
if err := msg.Decode(&rc); err != nil {
logger.Error("Failed to decode ROUND CHANGE", "err", err)
return errInvalidMessage
}
if err := c.checkMessage(msgRoundChange, rc.View); err != nil {
return err
}
cv := c.currentView()
roundView := rc.View
// Add the ROUND CHANGE message to its message set and return how many
// messages we've got with the same round number and sequence number.
// 把消息添加到roundChange消息集中并返回这个区块的这个轮次中共有多少消息
num, err := c.roundChangeSet.Add(roundView.Round, msg)
if err != nil {
logger.Warn("Failed to add round change message", "from", src, "msg", msg, "err", err)
return err
}
// Once we received f+1 ROUND CHANGE messages, those messages form a weak certificate.
// If our round number is smaller than the certificate's round number, we would
// try to catch up the round number.
// 一旦收到 f+1 条 修改消息,这些消息组成一个弱证书
// 如果我们的 round 号小于 证书的 round 号, 我们将试着加到round号
// 如果需要等待roundChange修改,且收到的消息数量等于 f+1
if c.waitingForRoundChange && num == int(c.valSet.F()+1) {
// 如果当前轮次的round小于收到的消息的round
if cv.Round.Cmp(roundView.Round) < 0 {
// 继续发送roundChange消息
c.sendRoundChange(roundView.Round)
}
return nil
} else if num == int(2*c.valSet.F()+1) && (c.waitingForRoundChange || cv.Round.Cmp(roundView.Round) < 0) {
// We've received 2f+1 ROUND CHANGE messages, start a new round immediately.
// 收到 2f+1 条修改消息,立即开始一个新的round
// 如果收到的消息数量等于 2f+1, 且 需要等待roundChange修改或者当前轮次的round小于收到的消息的round,则开始一个新的round轮次
c.startNewRound(roundView.Round)
return nil
} else if cv.Round.Cmp(roundView.Round) < 0 {
// Only gossip the message with current round to other validators.
return errIgnored
}
return nil
}
这块内容是处理积压的消息,由于这些消息来得早了,先保存起来,到期触发本事件执行,本文不详细介绍这块内容了,因为它和上文 istanbul.MessageEvent 各自消息事件
的处理方式基本一样,唯一的差别就是,没有检查这些消息的签名,因为在保存消息前已经检查过了,所以消息的签名是一定没问题的。