当前位置: 首页 > 工具软件 > quorum > 使用案例 >

quorum中的BFT

楚望
2023-12-01

这块内容主要是一个拜占庭的过程。本文掐头去尾,只讲一下在 quorum 中是如何实现拜占庭的,也就是共识接口 Seal() 下向拜占庭发送了一个区块的请求事件开始。请求事件如下:

// post block into Istanbul engine
go sb.EventMux().Post(istanbul.RequestEvent{
	Proposal: block,
})

前言

在说正文之前,先讲四个变量:sequence round state Code,因为这四个变量覆盖了 BFT 的整个流程,可以说 BFT 玩的就是这几个变量。

// 视图包括round和sequence两个数字
//sequence是我们即将提交的区块号
//每个round都有一个数字,而且包含三步:preprepare, prepare and commit

// 如果给定一个区块没有被验证者接收,round将会改变,并且验证者将使用 round+1 这个新的round开始新的验证周期
type View struct {
	Round    *big.Int
	Sequence *big.Int
}

state 表述了每个 round 周期中,进行到了哪一步,是每个 round 周期的状态。

type State uint64

const (
	StateAcceptRequest State = iota
	StatePreprepared
	StatePrepared
	StateCommitted
)

标明发送消息的类型

const (
	msgPreprepare uint64 = iota
	msgPrepare
	msgCommit
	msgRoundChange
	msgAll   // 未使用
)

type message struct {
	Code          uint64  // 消息的类型
	Msg           []byte  // 消息的内容
	Address       common.Address // 发送消息的人
	Signature     []byte  // 消息的签名
	CommittedSeal []byte  // 提交消息到区块额外字段时用到
}

BFT正文

从这里开始正式对 quorumistanbul 共识的 BFT 流程进行说明,开始的文件是 consensus/istanbul/core/handler.go

启动BFT

在启动挖矿的时候会启动 BFT,此时开始一个新的 BFT 轮次,然后订阅事件并启动对事件的监听及处理。

func (c *core) Start() error {
	// Start a new round from last sequence + 1
	// 开始一个新的轮次
	c.startNewRound(common.Big0)

	// Tests will handle events itself, so we have to make subscribeEvents()
	// be able to call in test.
	// 订阅事件,事件包括:
	// |-- 普通拜占庭事件
	// 	 |-- 区块提案请求事件
	// 	 |-- 消息事件,又分为四种:
	// 	 	|--preprepare
	// 	 	|--prepare
	// 	 	|--commit
	// 	 	|--roundChange
	//	 |-- 积压消息事件
	// |-- round周期超时事件
	// |--区块头提交事件
	c.subscribeEvents()
	// 启动一个协程来处理不同的事件
	go c.handleEvents()

	return nil
}

处理不同的事件

要处理的事件可分为三大类:普通拜占庭事件、round周期超时事件、区块头提交事件普通拜占庭事件监听的是 BFT 一个轮次的流程。round周期超时事件 则是监听每个 BFT 轮次的时间,只要超时就会结束本次 BFT 流程,然后开始一个新的 BFT 流程。区块头提交事件 则是只要矿工收到区块头,就结束本次 BFT,然后开始一个新块的 BFT

func (c *core) handleEvents() {
	// Clear state
	defer func() {
		c.current = nil
		c.handlerWg.Done()
	}()

	c.handlerWg.Add(1)

	for {
		select {
		case event, ok := <-c.events.Chan():
			if !ok {
				return
			}
			// A real event arrived, process interesting content
			// 一个真正人事件到达,处理有趣的内容
			switch ev := event.Data.(type) {
			case istanbul.RequestEvent: // 普通拜占庭事件
				r := &istanbul.Request{ // 区块提案请求事件
					Proposal: ev.Proposal,
				}
				// 处理区块提案请求
				err := c.handleRequest(r)
				// 如果是未来的请求则保存
				if err == errFutureMessage {
					c.storeRequestMsg(r)
				}
			case istanbul.MessageEvent: // 各自消息事件
				if err := c.handleMsg(ev.Payload); err == nil {
					// 如果没错误,通过gossip协议发送给其它验证器(不包括自己)
					c.backend.Gossip(c.valSet, ev.Payload)
				}
			case backlogEvent: // 积压消息事件
				// No need to check signature for internal messages
				// 没必要检查内部消息的签名,因为消息事件中已检查过了
				if err := c.handleCheckedMsg(ev.msg, ev.src); err == nil {
					p, err := ev.msg.Payload()
					if err != nil {
						c.logger.Warn("Get message payload failed", "err", err)
						continue
					}
					c.backend.Gossip(c.valSet, p)
				}
			}
		case _, ok := <-c.timeoutSub.Chan():// 拜占庭round周期超时事件
			if !ok {
				return
			}
			c.handleTimeoutMsg()
		case event, ok := <-c.finalCommittedSub.Chan(): //区块头提交事件
			if !ok {
				return
			}
			switch event.Data.(type) {
			case istanbul.FinalCommittedEvent:
				c.handleFinalCommitted()
			}
		}
	}
}

后两个比较简单,这里贴一下代码和注释。
round周期超时事件
// round 超时后处理方式
func (c *core) handleTimeoutMsg() {
	// If we're not waiting for round change yet, we can try to catch up
	// the max round with F+1 round change message. We only need to catch up
	// if the max round is larger than current round.
	// 如果不需要等待round值修改
	if !c.waitingForRoundChange {
		//消息数量大于等于 f+1 的round中, 取round值最大的那个round值,
		maxRound := c.roundChangeSet.MaxRound(c.valSet.F() + 1)
		if maxRound != nil && maxRound.Cmp(c.current.Round()) > 0 {
			//修改 round 轮次
			c.sendRoundChange(maxRound)
			return
		}
	}

	// 拿到最后一次申请的提案
	lastProposal, _ := c.backend.LastProposal()
	// 如果提案不是空的,而且提案不小于当前的区块号,则开始生产新块的轮次
	if lastProposal != nil && lastProposal.Number().Cmp(c.current.Sequence()) >= 0 {
		c.logger.Trace("round change timeout, catch up latest sequence", "number", lastProposal.Number().Uint64())
		// 开始一个全新round轮次
		c.startNewRound(common.Big0)
	} else {
		// 否则,进入下一个round轮次,也就是round+1
		c.sendNextRoundChange()
	}
}

区块头提交事件

func (c *core) handleFinalCommitted() error {
	logger := c.logger.New("state", c.state)
	logger.Trace("Received a final committed proposal")
	// 开始一个新的轮次
	c.startNewRound(common.Big0)
	return nil
}

然后开始重头戏,BFT的正常流程。

先简单介绍一下BFT的正常流程:

——开始一个新的 round 轮次,round=0sequence=当前区块链高度+1state状态置为StateAcceptRequest,启动 round 的计时器。

——区块提案申请人提交提案。

——收到请求后判断,如果是未来消息,则保存该请求到待处理请求的优先队列中。否则发送 preprepare 消息。

——把当前 View(round sequence) 和请求提案(区块)包装成 preprepare 消息,广播出去,广播前会对消息进行签名处理。

——收到preprepare消息后处理消息,如果验证区块提案时发现这是一个未来区块,则把它放到积压消息的优先队列中,到期处理。如果 区块Hash 被锁定且一致,则发送一个 commit消息。如果是正常流程,则切换 state状态StatePreparepared 并发送 prepare消息

——把 View(round sequence)Hash 包装成 prepare消息,广播出去,广播前会对消息进行签名处理。

——收到 prepare消息 后处理消息,检查 round、sequence 来确保消息的有效性,验证消息签名的正确性,当收到 2f+1 条消息后,锁定 Hash、切换 state 状态为 StatePrepared 并发送 commit消息

——把 View(round sequence)Hash 包装成 commit消息,广播出去,广播前会对消息进行签名处理。

——收到 commit消息 后处理消息,检查 round、sequence 来确保消息的有效性,验证消息签名的正确性,当收到 2f+1 条消息后,锁定 Hash 并提交。

——切换 state 状态为 StateCommitted,然后整理共识数据并插入区块的 额外字段extra 中,如果插入失败,启动当前区块的下一轮 round 共识。


接下来开始详细介绍,处理普通拜占庭事件时,又会监听三种事件:区块提案请求事件、各种消息事件、积压消息事件

启动新的BFT轮次

在说这三种事件之前,先介绍 startNewRound(round *big.Int) 方法,这个方法是启动一个新的 BFT 轮次,如果传入的 round 值是 0,代表要生产一个新的区块。
首先从区块链中获取最高的区块,然后和 sequence 做比较来判断共识的区块是否正确,如果 >sequence,证明当前共识的区块提案已过时,需要共识后面的区块,如果 =sequence-1,证明要共识的区块提案没问题了,然后用 传入 round当前round 比较来判断,如果 传入round=0,则证明当前轮次已开始,不必重复开始当前轮次,直接结束,如果 传入round 大于 当前round,证明需要修改 round 值,则直接修改 round 值,此时 sequence 值不变(如果不需要修改 round 值时,sequence 值为 最高区块号+1round 值为 0,通过最高区块获取验证器)。接下来做一些本次 round 轮次的 准备工作,清空当前轮次无效的信息集全,准备存放收到的消息,更新 current,计算新的申请人。

计算新的申请人的方法,
首先得到一个种子seed,
然后种子seed对验证者数量取模pick,
pick做为验证者集合的偏移量,得到验证者。

计算种子的方法有两种,roundRobinProposer和stickyProposer,选择方式是在创世区块配置文件中配置的。
roundRobinProposer:
如果上一个区块的生产者是空,种子seed取round。
否则上一个区块的生产者在集合中的偏移量+round+1
stickyProposer:
如果上一个区块的生产者是空,种子seed取round。
否则上一个区块的生产者在集合中的偏移量+round

如果 round 修改了,而且当前节点是申请人,则发送一个 preprepare消息,如果 Hash 被锁定,则发送当前区块提案,如果当前待处理请求队列中有内容,则发送待处理请求队列中的内容。最后启动一个 roundChange计时器

下面来处理拜占庭监听


func (c *core) handleEvents() {
	// Clear state
	defer func() {
		c.current = nil
		c.handlerWg.Done()
	}()

	c.handlerWg.Add(1)

	for {
		select {
		case event, ok := <-c.events.Chan():
			if !ok {
				return
			}
			// A real event arrived, process interesting content
			// 一个真正人事件到达,处理有趣的内容
			switch ev := event.Data.(type) {
			case istanbul.RequestEvent: // 普通拜占庭事件
				r := &istanbul.Request{ // 区块提案请求事件
					Proposal: ev.Proposal,
				}
				// 处理区块提案请求
				err := c.handleRequest(r)
				// 如果是未来的请求则保存
				if err == errFutureMessage {
					c.storeRequestMsg(r)
				}
			case istanbul.MessageEvent: // 各自消息事件
				if err := c.handleMsg(ev.Payload); err == nil {
					// 如果没错误,通过gossip协议发送给其它验证器(不包括自己)
					c.backend.Gossip(c.valSet, ev.Payload)
				}
			case backlogEvent: // 积压消息事件
				// No need to check signature for internal messages
				// 没必要检查内部消息的签名,因为消息事件中已检查过了
				if err := c.handleCheckedMsg(ev.msg, ev.src); err == nil {
					p, err := ev.msg.Payload()
					if err != nil {
						c.logger.Warn("Get message payload failed", "err", err)
						continue
					}
					c.backend.Gossip(c.valSet, p)
				}
			}
		case _, ok := <-c.timeoutSub.Chan():// 拜占庭round周期超时事件
			if !ok {
				return
			}
			c.handleTimeoutMsg()
		case event, ok := <-c.finalCommittedSub.Chan(): //区块头提交事件
			if !ok {
				return
			}
			switch event.Data.(type) {
			case istanbul.FinalCommittedEvent:
				c.handleFinalCommitted()
			}
		}
	}
}

istanbul.RequestEvent 普通拜占庭事件

istanbul共识 需要调用拜占庭达成共识时,会是 Seal() 接口中发送区块提案请求给拜占庭。

// post block into Istanbul engine
	go sb.EventMux().Post(istanbul.RequestEvent{
		Proposal: block,
	})

然后拜占庭就收到了 istanbul.RequestEvent 事件,然后处理该请求。

func (c *core) handleRequest(request *istanbul.Request) error {
	logger := c.logger.New("state", c.state, "seq", c.current.sequence)

	// 检查消息是无效的?过时的?未来的?
	if err := c.checkRequestMsg(request); err != nil {
		if err == errInvalidMessage { 
			logger.Warn("invalid request")
			return err
		}
		logger.Warn("unexpected request", "err", err, "number", request.Proposal.Number(), "hash", request.Proposal.Hash())
		return err
	}

	logger.Trace("handleRequest", "number", request.Proposal.Number(), "hash", request.Proposal.Hash())

	// 如果检查没问题,则发送一条preprepare消息
	c.current.pendingRequest = request
	if c.state == StateAcceptRequest {
		c.sendPreprepare(request)
	}
	return nil
}

处理请求的第一步是检查消息。

// 检查request状态
func (c *core) checkRequestMsg(request *istanbul.Request) error {
	if request == nil || request.Proposal == nil {
		// 如果消息无效
		return errInvalidMessage
	}

	// 检查到哪一步骤了
	if c := c.current.sequence.Cmp(request.Proposal.Number()); c > 0 {
		// 如果消息过时
		return errOldMessage
	} else if c < 0 {
		// 如果消息是未来的,这里会把消息保存起来,下文有介绍
		return errFutureMessage
	} else {
		return nil
	}
}

如果是由于收到未来消息而报错,则需要保存请求的消息到待处理请求的优先队列中。

// 保存请求消息到待处理队列
func (c *core) storeRequestMsg(request *istanbul.Request) {
	logger := c.logger.New("state", c.state)

	logger.Trace("Store future request", "number", request.Proposal.Number(), "hash", request.Proposal.Hash())

	c.pendingRequestsMu.Lock()
	defer c.pendingRequestsMu.Unlock()

	// 未来的消息保存到优先队列
	c.pendingRequests.Push(request, float32(-request.Proposal.Number().Int64()))
}

那么什么时候处理这些请求呢?在修改状态 state 的时候,而且是当把 state 修改为 StateAcceptRequest 时。修改 state 时也会处理积压的消息。那如何处理待处理请求的优先队列中的请求呢?

// 处理待处理的请求
func (c *core) processPendingRequests() {
	c.pendingRequestsMu.Lock()
	defer c.pendingRequestsMu.Unlock()

	for !(c.pendingRequests.Empty()) {
		// 从优先队列中取出请求
		m, prio := c.pendingRequests.Pop()
		r, ok := m.(*istanbul.Request)
		if !ok {
			c.logger.Warn("Malformed request, skip", "msg", m)
			continue
		}
		// Push back if it's a future message
		// 查检请求的状态
		err := c.checkRequestMsg(r)
		if err != nil {
			// 如果是未来的请求,还放回待处理队列,然后找下一条
			if err == errFutureMessage {
				c.logger.Trace("Stop processing request", "number", r.Proposal.Number(), "hash", r.Proposal.Hash())
				c.pendingRequests.Push(m, prio)
				break
			}
			c.logger.Trace("Skip the pending request", "number", r.Proposal.Number(), "hash", r.Proposal.Hash(), "err", err)
			continue
		}
		c.logger.Trace("Post pending request", "number", r.Proposal.Number(), "hash", r.Proposal.Hash())

		// 如果检查没问题,则发送请求事件来处理该请求
		go c.sendEvent(istanbul.RequestEvent{
			Proposal: r.Proposal,
		})
	}
}

istanbul.MessageEvent 各自消息事件

上文说到发送 preprepare 消息,这个消息就是在这里监听处理的。首先检查消息的签名,然后检查消息的地址,如果都没问题,则分别处理不同的消息。

func (c *core) handleMsg(payload []byte) error {
	logger := c.logger.New()

	// Decode message and check its signature
	// 解码消息并检查它的签名
	msg := new(message)
	if err := msg.FromPayload(payload, c.validateFn); err != nil {
		logger.Error("Failed to decode message from payload", "err", err)
		return err
	}

	// Only accept message if the address is valid
	// 只接收地址是有效的消息
	_, src := c.valSet.GetByAddress(msg.Address)
	if src == nil {
		logger.Error("Invalid address in message", "msg", msg)
		return istanbul.ErrUnauthorizedAddress
	}

	return c.handleCheckedMsg(msg, src)
}

检查消息签名的方法,需要的参数是消息笔验证的回调函数。

func (m *message) FromPayload(b []byte, validateFn func([]byte, []byte) (common.Address, error)) error {
	// Decode message
	// 解码消息
	err := rlp.DecodeBytes(b, &m)
	if err != nil {
		return err
	}

	// Validate message (on a message without Signature)
	if validateFn != nil {
		var payload []byte
		// 拿到没有签名的消息对象,即把签名的字段置空
		payload, err = m.PayloadNoSig()
		if err != nil {
			return err
		}
		
		// 调用回调函数来检查消息的签名是否正确
		_, err = validateFn(payload, m.Signature)
	}
	// Still return the message even the err is not nil
	// 返回的消息不是err就是nil
	return err
}

这里再介绍一下验证消息的回调函数

func CheckValidatorSignature(valSet ValidatorSet, data []byte, sig []byte) (common.Address, error) {
	// 1. Get signature address
	// 拿到签名地址
	signer, err := GetSignatureAddress(data, sig)
	if err != nil {
		log.Error("Failed to get signer address", "err", err)
		return common.Address{}, err
	}

	// 2. Check validator
	// 检查签名者地址是不是申请人的
	if _, val := valSet.GetByAddress(signer); val != nil {
		return val.Address(), nil
	}

	return common.Address{}, ErrUnauthorizedAddress
}

然后分别处理四种不同的消息,如果处理的消息是未来的消息,则把未来的消息保存到积压消息的优先队列中。

func (c *core) handleCheckedMsg(msg *message, src istanbul.Validator) error {
	logger := c.logger.New("address", c.address, "from", src)

	// Store the message if it's a future message
	// 如果是未来的消息,就保存起来
	testBacklog := func(err error) error {
		if err == errFutureMessage {
			c.storeBacklog(msg, src)
		}

		return err
	}

	switch msg.Code {
	case msgPreprepare:
		return testBacklog(c.handlePreprepare(msg, src))
	case msgPrepare:
		return testBacklog(c.handlePrepare(msg, src))
	case msgCommit:
		return testBacklog(c.handleCommit(msg, src))
	case msgRoundChange:
		return testBacklog(c.handleRoundChange(msg, src))
	default:
		logger.Error("Invalid message", "msg", msg)
	}

	return errInvalidMessage
}

由于保存未来消息是个单独的模块,这里先讲保存未来的消息,然后依次介绍如何处理其他四种消息。

(1)保存未来的消息

保存未来的消息,简单来说,就是搞一个优先队列,然后分别把不同的消息放进队列中即可。

func (c *core) storeBacklog(msg *message, src istanbul.Validator) {
	logger := c.logger.New("from", src, "state", c.state)

	// 不保存自己的消息
	if src.Address() == c.Address() {
		logger.Warn("Backlog from self")
		return
	}

	logger.Trace("Store future message")

	c.backlogsMu.Lock()
	defer c.backlogsMu.Unlock()

	// 搞一个优先队列
	logger.Debug("Retrieving backlog queue", "for", src.Address(), "backlogs_size", len(c.backlogs))
	backlog := c.backlogs[src.Address()]
	// 如果通过验证者地址没找到对应的优先队列,则创建一个新的
	if backlog == nil {
		backlog = prque.New()
	}
	// 消息和消息的优先级保存起来
	switch msg.Code {
	case msgPreprepare: // 处理preprepare消息
		// 解码
		var p *istanbul.Preprepare
		err := msg.Decode(&p)
		if err == nil {
			// 放进队列保存
			backlog.Push(msg, toPriority(msg.Code, p.View))
		}
		// for msgRoundChange, msgPrepare and msgCommit cases
	default: // 处理其它三种消息,因为保存消息的数据结构不一样,所以分别处理
		var p *istanbul.Subject
		err := msg.Decode(&p)
		if err == nil {
			backlog.Push(msg, toPriority(msg.Code, p.View))
		}
	}
	c.backlogs[src.Address()] = backlog
}

保存消息到优先级队列中,那优先级是怎么定义的呢?

如果是msgRoundChange消息,则-(sequence * 1000)
如果是其它消息,则 -(sequence x 1000 + round x 10 + 消息的优先级)
消息的优先级:msgPreprepare > msgCommit > msgPrepare,分别是1,2,3

保存后的消息是什么时候处理的呢,就是上文提到的,修改 state 状态的时候,由于保存的消息涵盖了消息的所有类型(四种),所以 state 的任何一次修改,都要处理一下积压的未来消息,那是如何处理的呢?


// 处理队列中积压的消息
func (c *core) processBacklog() {
	c.backlogsMu.Lock()
	defer c.backlogsMu.Unlock()

	// 遍历保存的所有的优先队列
	for srcAddress, backlog := range c.backlogs {
		//如果得到的队列是空,慢找下一个队列
		if backlog == nil {
			continue
		}
		// 如果地址为空,则证明该队列中消息取完了,则删除该队列
		_, src := c.valSet.GetByAddress(srcAddress)
		if src == nil {
			// validator is not available
			delete(c.backlogs, srcAddress)
			continue
		}
		logger := c.logger.New("from", src, "state", c.state)
		isFuture := false // 搞一个未来消息的标识

		// We stop processing if
		//   1. backlog is empty
		//   2. The first message in queue is a future message
		// 依次取出队列中的消息,但是不处理空的队列和未来消息
		for !(backlog.Empty() || isFuture) {
			m, prio := backlog.Pop() // 取出消息和优先级
			msg := m.(*message)
			var view *istanbul.View
			switch msg.Code { // 根据不同的消息取出不同的视图view,作用是判断消息的有效性
			case msgPreprepare:
				var m *istanbul.Preprepare
				err := msg.Decode(&m)
				if err == nil {
					view = m.View
				}
				// for msgRoundChange, msgPrepare and msgCommit cases
			default:
				var sub *istanbul.Subject
				err := msg.Decode(&sub)
				if err == nil {
					view = sub.View
				}
			}
			if view == nil {
				logger.Debug("Nil view", "msg", msg)
				continue
			}
			// Push back if it's a future message
			// 检查消息的有效性,判定消息是过去?未来的?
			// 如果是未来的消息,还放回原处
			err := c.checkMessage(msg.Code, view)
			// 如果有错误,取下一条消息
			if err != nil {
				if err == errFutureMessage {
					logger.Trace("Stop processing backlog", "msg", msg)
					backlog.Push(msg, prio)
					isFuture = true
					break
				}
				logger.Trace("Skip the backlog event", "msg", msg, "err", err)
				continue
			}
			logger.Trace("Post backlog event", "msg", msg)

			// 如果没问题,则广播积压的消息事件
			go c.sendEvent(backlogEvent{
				src: src,
				msg: msg,
			})
		}
	}
}


(2)处理msgPreprepare消息

这里要处理 preprepare消息,上文的区块提案请求中提到发送 preprepare消息,那是怎么发的呢?这是先说发,后说处理。发送消息说起来也简单,如果当前节点是申请人,并且当前的区块序列和提案的区块序列一致,则把 request 中的区块提案包装成 preprepare消息,然后广播出去。

func (c *core) sendPreprepare(request *istanbul.Request) {
	logger := c.logger.New("state", c.state)

	// If I'm the proposer and I have the same sequence with the proposal
	// 如果当前节点是申请人,并且当前的区块序列和提案的区块序列一致
	if c.current.Sequence().Cmp(request.Proposal.Number()) == 0 && c.IsProposer() {
		curView := c.currentView()
		// 把request对象包装成preprepare消息对象
		preprepare, err := Encode(&istanbul.Preprepare{
			View:     curView,
			Proposal: request.Proposal,
		})
		if err != nil {
			logger.Error("Failed to encode", "view", curView)
			return
		}

		// 把preprepare消息对象广播出去
		c.broadcast(&message{
			Code: msgPreprepare,
			Msg:  preprepare,
		})
	}
}

在广播消息的时候,会对消息进行签名、rlp编码 生成最终消息,处理完成后,才会调用广播的方式把消息广播出去,生成最终消息的处理方式如下:

func (c *core) finalizeMessage(msg *message) ([]byte, error) {
	var err error
	// Add sender address 消息的地址是当前节点的地址
	msg.Address = c.Address()

	// Add proof of consensus 在广播commit消息的时候,填充这个字段,最终会放在区块的额外字段中
	msg.CommittedSeal = []byte{}
	// Assign the CommittedSeal if it's a COMMIT message and proposal is not nil
	if msg.Code == msgCommit && c.current.Proposal() != nil {
		seal := PrepareCommittedSeal(c.current.Proposal().Hash())
		msg.CommittedSeal, err = c.backend.Sign(seal)
		if err != nil {
			return nil, err
		}
	}

	// Sign message 拿到未签名的消息数据
	data, err := msg.PayloadNoSig()
	if err != nil {
		return nil, err
	}
	// 消息签名
	msg.Signature, err = c.backend.Sign(data)
	if err != nil {
		return nil, err
	}

	// Convert to payload 对签名后的消息进行rlp编码
	payload, err := msg.Payload()
	if err != nil {
		return nil, err
	}

	return payload, nil
}

在这里开始真正地处理 preprepare消息。首先检查消息的有效性,如果消息是旧消息,而且区块提案存在且申请人匹配,则广播一个旧区块的 comit消息。检查消息的申请人是否正确,验证区块是否有问题,如果这是一个未来的 区块,则到期广播该消息,如果是其它错误,则进入下一个共识轮次。如果前面检查没问题,则进入真正的 preprepare消息 处理流程中,如果 当前Hash 没有被锁定,则修改 state 状态并发送 prepare消息,否则判断 锁定的Hash 是否是 当前提案的Hash,如果是,则修改 state状态 并发送 commit消息,如果不是,则进入下一个共识轮次。


func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {
	logger := c.logger.New("from", src, "state", c.state)

	// Decode PRE-PREPARE  把preprepare消息解码为preprepare对象
	var preprepare *istanbul.Preprepare
	err := msg.Decode(&preprepare)
	if err != nil {
		return errFailedDecodePreprepare
	}

	// Ensure we have the same view with the PRE-PREPARE message
	// If it is old message, see if we need to broadcast COMMIT
	// 检查消息的有效性,判定消息是过去?未来的?
	if err := c.checkMessage(msgPreprepare, preprepare.View); err != nil {
		// 如果收到的是旧新消息
		if err == errOldMessage {
			// Get validator set for the given proposal

			// 获取给定区块的验证器
			valSet := c.backend.ParentValidators(preprepare.Proposal).Copy()
			// 获取前一个提案的申请人
			previousProposer := c.backend.GetProposer(preprepare.Proposal.Number().Uint64() - 1)
			// 计算区块提案的申请人
			valSet.CalcProposer(previousProposer, preprepare.View.Round.Uint64())
			// Broadcast COMMIT if it is an existing block
			// 1. The proposer needs to be a proposer matches the given (Sequence + Round)
			// 2. The given block must exist
			// 如果这个区块已存在,广播一个commit消息
			// 1.区块提案的申请人必须匹配
			// 2.这个区块提案必须存在
			if valSet.IsProposer(src.Address()) && c.backend.HasPropsal(preprepare.Proposal.Hash(), preprepare.Proposal.Number()) {
				c.sendCommitForOldBlock(preprepare.View, preprepare.Proposal.Hash())
				return nil
			}
		}
		return err
	}

	// Check if the message comes from current proposer
	// 检查消息是否来自当前申请人
	if !c.valSet.IsProposer(src.Address()) {
		logger.Warn("Ignore preprepare messages from non-proposer")
		return errNotFromProposer
	}

	// Verify the proposal we received
	// 验证我们收到的区块
	if duration, err := c.backend.Verify(preprepare.Proposal); err != nil {
		// 区块验证失败
		logger.Warn("Failed to verify proposal", "err", err, "duration", duration)
		// if it's a future block, we will handle it again after the duration
		// 如果这是一个未来的区块,我们将到期处理它,因为消息提前了duration时间
		if err == consensus.ErrFutureBlock {
			c.stopFuturePreprepareTimer()
			//duration 时间后广播该事件
			c.futurePreprepareTimer = time.AfterFunc(duration, func() {
				c.sendEvent(backlogEvent{
					src: src,
					msg: msg,
				})
			})
		} else {
			// 如果是其它错误,进入下一个轮次的共识
			c.sendNextRoundChange()
		}
		return err
	}

	// Here is about to accept the PRE-PREPARE
	// 这里真正处理preprepare消息
	if c.state == StateAcceptRequest {
		// Send ROUND CHANGE if the locked proposal and the received proposal are different
		// 如果区块被锁定而且收到的区块是不同的,发送一个改变round的事件进入下一个轮次
		if c.current.IsHashLocked() {
			// 如果当前提案正好是锁定的Hash,直接发一个commit消息
			if preprepare.Proposal.Hash() == c.current.GetLockedHash() {
				// Broadcast COMMIT and enters Prepared state directly
				// 设置preprepare消息,状态变成 prepare,广播 commit 消息
				c.acceptPreprepare(preprepare)
				c.setState(StatePrepared)
				c.sendCommit()
			} else {
				// Send round change
				// 进入下一个轮次
				c.sendNextRoundChange()
			}
		} else {
			// Either
			//   1. the locked proposal and the received proposal match
			//   2. we have no locked proposal
			// 设置preprepare消息,状态变成 preprepare,广播 prepare 消息
			c.acceptPreprepare(preprepare)
			c.setState(StatePreprepared)
			c.sendPrepare()
		}
	}

	return nil
}


**(3)处理msgPrepare消息**

上文说到发送 prepare消息,这是介绍一下如何发送。首先,把消息编码成 prepare消息,然后广播出去,广播方式上文介绍过了,这里不再赘述。


然后开始处理 prepare消息,首先把消息解码成 prepare消息,然后检查区块的有效性,确保消息不是未来的,也不是过去的,接着验证 prepare消息 和当前轮次 区块的Hash 是一致的,之后保存消息到当前轮次。如果 当前Hash 被锁定且等消息中带的 区块Hash,或者收到了 2f及以上 的投票且状态在 StatePrepared 之前,则锁定 Hash,修改状态为 StatePrepared,并发送 commit消息


func (c *core) handlePrepare(msg *message, src istanbul.Validator) error {
	// Decode PREPARE message
	// 解码 prepare 消息
	var prepare *istanbul.Subject
	err := msg.Decode(&prepare)
	if err != nil {
		return errFailedDecodePrepare
	}

	// 检查 prepare 消息,确保视图相同,轮次和顺序
	if err := c.checkMessage(msgPrepare, prepare.View); err != nil {
		return err
	}

	// If it is locked, it can only process on the locked block.
	// Passing verifyPrepare and checkMessage implies it is processing on the locked block since it was verified in the Preprepared state.
	// 验证 prepare 消息
	if err := c.verifyPrepare(prepare, src); err != nil {
		return err
	}

	c.acceptPrepare(msg, src)

	// Change to Prepared state if we've received enough PREPARE messages or it is locked
	// and we are in earlier state before Prepared state.
	//如果当前Hash被锁定且等消息中带的区块Hash,或者收到了2f及以上的投票且状态在StatePrepared之前
	// 如果收到了 2f 的确认票, 且未广播commit消息,修改状态为 prepare,然后广播 commit 消息
	if ((c.current.IsHashLocked() && prepare.Digest == c.current.GetLockedHash()) || c.current.GetPrepareOrCommitSize() > 2*c.valSet.F()) &&
		c.state.Cmp(StatePrepared) < 0 {
		c.current.LockHash()
		c.setState(StatePrepared)
		c.sendCommit()
	}

	return nil
}

**(4)处理msgCommit消息**

发送 commit消息 就不细说了,但是在广播 commit消息 的时候,需要把 消息的Hash消息的代码Code 组合起来被私钥签名,保存到消息的 CommittedSeal 字段中,以备后续保存到区块的额外字段中。


这里着重介绍如何处理 commit消息。首先把消息解码成 commit消息,然后检查区块的有效性,确保消息不是未来的,也不是过去的,接着验证 commit消息 和当前轮次区块的 Hash 是一致的,之后保存消息到当前轮次。如果收到了 2f及以上 的投票,且状态在 StateCommitted 之前,则锁定 Hash,并提交。


提交的时候,首先修改状态 stateStateCommitted,然后整理共识数据,即把 commit 池中的消息取出来,最终会把这些消息放入区块头和额外字段中,然后更新区块的区块头,通过管道通知矿工共识完成,可以把区块加入区块链并广播区块了。


(5)处理msgRoundChange消息

先说一下什么情况下会发送 roundChange消息,有三种情况:1.round周期超时2.需要开始下一个round轮次3.处理roundChange消息的时候


round周期超时 后,会选择 round轮次 中收到的消息数据不小于 f+1round 中,round 值最大的那个 round 值。需要开始下一个 round 轮次则是 前一个round+1round 值。至于第三种情况,下文详细介绍,这里先介绍如何发送 roundChange消息


首先就是修改 round,而 sequence 不变,然后把该消息编码并广播出去。

// 发送一个给定的 round 来修改 round
func (c *core) sendRoundChange(round *big.Int) {
	logger := c.logger.New("state", c.state)

	cv := c.currentView()
	// 保证新修改的round不得小于原来的round
	if cv.Round.Cmp(round) >= 0 {
		logger.Error("Cannot send out the round change", "current round", cv.Round, "target round", round)
		return
	}

	// 修改round
	c.catchUpRound(&istanbul.View{
		// The round number we'd like to transfer to.
		Round:    new(big.Int).Set(round),
		Sequence: new(big.Int).Set(cv.Sequence),
	})

	// Now we have the new round number and sequence number
	// 现在我们拥有新的 round 和 sequence
	cv = c.currentView()
	rc := &istanbul.Subject{
		View:   cv,
		Digest: common.Hash{},
	}

	// 修改round的消息编码
	payload, err := Encode(rc)
	if err != nil {
		logger.Error("Failed to encode ROUND CHANGE", "rc", rc, "err", err)
		return
	}

	// 把修改round的消息广播出去
	c.broadcast(&message{
		Code: msgRoundChange,
		Msg:  payload,
	})
}

这个过程中更新 round 的方法需要重点介绍一下。首先打开等待 roundChange 修改的开关,然后修改 round,清空修改后的 round 中的内容,启动一个新的 roundChange计时器

func (c *core) catchUpRound(view *istanbul.View) {
	logger := c.logger.New("old_round", c.current.Round(), "old_seq", c.current.Sequence(), "old_proposer", c.valSet.GetProposer())

	if view.Round.Cmp(c.current.Round()) > 0 {
		c.roundMeter.Mark(new(big.Int).Sub(view.Round, c.current.Round()).Int64())
	}
	c.waitingForRoundChange = true

	// Need to keep block locked for round catching up
	// 修改round
	c.updateRoundState(view, c.valSet, true)
	c.roundChangeSet.Clear(view.Round) // 清空当前round中的内容
	c.newRoundChangeTimer() // 搞一个新的计时器

	logger.Trace("Catch up round", "new_round", view.Round, "new_seq", view.Sequence, "new_proposer", c.valSet)
}

修改round的本质就是创建一个新的 roundState 放在 current属性 中。


从这里开始处理 roundChange消息,解码和检查消息的有效性不说了,然后 把消息添加到 roundChange消息 集中并返回这个区块的这个轮次中共有多少消息,如果数量等于 f+1,继续发 roundChange消息,如果 数量等于 2f+1,开始一个新的round轮次。


func (c *core) handleRoundChange(msg *message, src istanbul.Validator) error {
	logger := c.logger.New("state", c.state, "from", src.Address().Hex())

	// Decode ROUND CHANGE message
	// 解码 修改round 消息
	var rc *istanbul.Subject
	if err := msg.Decode(&rc); err != nil {
		logger.Error("Failed to decode ROUND CHANGE", "err", err)
		return errInvalidMessage
	}

	if err := c.checkMessage(msgRoundChange, rc.View); err != nil {
		return err
	}

	cv := c.currentView()
	roundView := rc.View

	// Add the ROUND CHANGE message to its message set and return how many
	// messages we've got with the same round number and sequence number.
	// 把消息添加到roundChange消息集中并返回这个区块的这个轮次中共有多少消息
	num, err := c.roundChangeSet.Add(roundView.Round, msg)
	if err != nil {
		logger.Warn("Failed to add round change message", "from", src, "msg", msg, "err", err)
		return err
	}

	// Once we received f+1 ROUND CHANGE messages, those messages form a weak certificate.
	// If our round number is smaller than the certificate's round number, we would
	// try to catch up the round number.
	// 一旦收到 f+1 条  修改消息,这些消息组成一个弱证书
	// 如果我们的 round 号小于 证书的 round 号, 我们将试着加到round号

	// 如果需要等待roundChange修改,且收到的消息数量等于 f+1
	if c.waitingForRoundChange && num == int(c.valSet.F()+1) {
		// 如果当前轮次的round小于收到的消息的round
		if cv.Round.Cmp(roundView.Round) < 0 {
			// 继续发送roundChange消息
			c.sendRoundChange(roundView.Round)
		}
		return nil
	} else if num == int(2*c.valSet.F()+1) && (c.waitingForRoundChange || cv.Round.Cmp(roundView.Round) < 0) {
		// We've received 2f+1 ROUND CHANGE messages, start a new round immediately.
		// 收到 2f+1 条修改消息,立即开始一个新的round

		// 如果收到的消息数量等于 2f+1, 且 需要等待roundChange修改或者当前轮次的round小于收到的消息的round,则开始一个新的round轮次
		c.startNewRound(roundView.Round)
		return nil
	} else if cv.Round.Cmp(roundView.Round) < 0 {
		// Only gossip the message with current round to other validators.
		return errIgnored
	}
	return nil
}

backlogEvent 积压消息事件

这块内容是处理积压的消息,由于这些消息来得早了,先保存起来,到期触发本事件执行,本文不详细介绍这块内容了,因为它和上文 istanbul.MessageEvent 各自消息事件 的处理方式基本一样,唯一的差别就是,没有检查这些消息的签名,因为在保存消息前已经检查过了,所以消息的签名是一定没问题的。

 类似资料: