在阅读本文之前请现阅读《etcd的raft实现之log》,笔者的etcd的raft系列通过一个点(就是log)进行展开,能够让读者比较容易的理解etcd的raft实现,解决读者无从下手的难题。
在开始分析之前,先做一些名词解释,在《etcd的raft实现之log》中提到的概念本文就不在重复了:
tracker是etcd的raft单独的一个包,其核心类是ProgressTracker。从类名上看是Progress的Tracker,所以raft的tracker是用来跟踪Progress的,理解了什么是Progress才能明白ProgressTracker的是干什么的。
Progress翻译成中文叫进度,和所谓进度?先来看看代码中注释是怎么解释的,翻译的不好还请见谅:
// Progress represents a follower’s progress in the view of the leader. Leader
// maintains progresses of all followers, and sends entries to the follower
// based on its progress.
//
// NB(tbg): Progress is basically a state machine whose transitions are mostly
// strewn around `*raft.raft`. Additionally, some fields are only used when in a
// certain State. All of this isn't ideal.
Progress代表了Leader视角的Follower进度,Leader拥有所有Follower的进度,并根据其进度向Follower
发送日志。进度基本上是一个状态机,其转换主要散布在`* raft.raft`周围(这句话不知道什么意思)。
从官方注释来应该能看出一些端倪,笔者一句话概括:Progress是Follower追随Leader状态的进度,此处的状态主要指定的是日志的状态。对于raft来说系统状态的决策者是Leader,其他Follower都是从Leader同步的,确切的说是Leader发送给Follower的。作为Leader,需要知道所有Peer已经同步到什么程度了,所以就有了Progress。比如Leader已经有10条日志,Leader需要把这10条日志发给所有的Peer,那么Leader就需要记录Peer1已经发送到了第3条,Peer2已经发送到了第4条,以此类推。当然,Leader不可能就这么简单的跟踪每个peer的当前日志的进度,接下来看看Progress的定义:
// 代码源自go.etcd.io/etcd/raft/tracker/progress.go
type Progress struct {
// Leader与Follower之间的状态同步是异步的,Leader将日志发给Follower,Follower再回复接收
// 到了哪些日志。出于效率考虑,Leader不会每条日志以类似同步调用的方式发送给Follower,而是
// 只要Leader有新的日志就发送,Next就是用来记录下一次发送日志起始索引。换句话说就是发送给Peer
// 的最大日志索引是Next-1,而Match的就是经过Follower确认接收的最大日志索引,Next-Match-1
// 就是还在飞行中或者还在路上的日志数量(Inflights)。Inflights还是比较形象的,下面会有详细
// 说明。
Match, Next uint64
// 此处顺便把StateType这个类型详细说明一下,StateType的代码在go.etcd.io/etcd/raft/tracker/state.go
// Progress一共有三种状态,分别为探测(StateProbe)、复制(StateReplicate)、快照(StateSnapshot)
// 探测:一般是系统选举完成后,Leader不知道所有Follower都是什么进度,所以需要发消息探测一下,从
// Follower的回复消息获取进度。在还没有收到回消息前都还是探测状态。因为不确定Follower是
// 否活跃,所以发送太多的探测消息意义不大,只发送一个探测消息即可。
// 复制:当Peer回复探测消息后,消息中有该节点接收的最大日志索引,如果回复的最大索引大于Match,
// 以此索引更新Match,Progress就进入了复制状态,开启高速复制模式。复制制状态不同于
// 探测状态,Leader会发送更多的日志消息来提升IO效率,就是上面提到的异步发送。这里就要引入
// Inflight概念了,飞行中的日志,意思就是已经发送给Follower还没有被确认接收的日志数据,
// 后面会有inflight介绍。
// 快照:快照状态说明Follower正在复制Leader的快照
State StateType
// 在快照状态时,快照的索引值
PendingSnapshot uint64
// 变量名字就能看出来,表示Follower最近是否活跃,只要Leader收到任何一个消息就表示节点是最近
// 是活跃的。如果新一轮的选举,那么新的Leader默认为都是不活跃的。
RecentActive bool
// 探测状态时才有用,表示探测消息是否已经发送了,如果发送了就不会再发了,避免不必要的IO。
ProbeSent bool
// Inflight前面提到了,在复制状态有作用,后面有他的代码解析,此处只需要知道他是个限流的作用即可。
// Leader不能无休止的向Follower发送日志,飞行中的日志量太大对网络和节点都是负担。而且一个日志
// 丢失其后面的日志都要重发,所以过大的飞行中的日志失败后的重发成本也很大。
Inflights *Inflights
// 是否是Learner,对于本文,这个变量作用不大,会在其他文章中使用
IsLearner bool
}
通过Progress的定义来看,总结如下:
此处需要解释一下,raft没有专门的探测消息,他是借助于其他消息实现的,比如心跳消息,日志消息等。任何消息的回复都算是一种探测,笔者会在其他文章的代码注释中提到。接下来再来看看Progress几个接口函数:
// 代码源自go.etcd.io/etcd/raft/tracker/progress.go
// Progress进入探测状态
func (pr *Progress) BecomeProbe() {
// 代码注释翻译:如果原始状态是快照,说明快照已经被Peer接收了,那么Next=pendingSnapshot+1,
// 意思就是从快照索引的下一个索引开始发送。
if pr.State == StateSnapshot {
// 此处用临时变量的原因是因为ResetState()会pr.PendingSnapshot=nil
pendingSnapshot := pr.PendingSnapshot
pr.ResetState(StateProbe)
pr.Next = max(pr.Match+1, pendingSnapshot+1)
} else {
// ResetState的代码注释在下面
pr.ResetState(StateProbe)
// 上面的逻辑是Peer接收完快照后再探测一次才能继续发日志,而这里的逻辑是Peer从复制状态转
// 到探测状态,这在Peer拒绝了日志、日志消息丢失的情况会发生,此时Leader不知道从哪里开始,
// 倒不如从Match+1开始,因为Match是节点明确已经收到的。
pr.Next = pr.Match + 1
}
}
// Progress进入复制状态
func (pr *Progress) BecomeReplicate() {
// 除了复位一下状态就是调整Next,为什么Next也是Match+1?进入复制状态肯定是收到了探测消息的
// 反馈,此时Match会被更新,那从Match+1也就理所当然了。
pr.ResetState(StateReplicate)
pr.Next = pr.Match + 1
}
// Progress进入快照状态
func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
// 除了复位一下状态就是设置快照的索引,此处为什么不需要调整Next?因为这个状态无需在发日志给
// peer,知道快照完成后才能继续
pr.ResetState(StateSnapshot)
pr.PendingSnapshot = snapshoti
}
// 复位状态
func (pr *Progress) ResetState(state StateType) {
// 代码简单到无需解释
pr.ProbeSent = false
pr.PendingSnapshot = 0
pr.State = state
pr.Inflights.reset()
}
除了以上几个切换状态函数,还有几个函数也很重要:
// 代码源自go.etcd.io/etcd/raft/tracker/progress.go
// 更新Progress的状态,为什么有个maybe呢?因为不确定是否更新,raft代码中有很多maybeXxx系列函数,
// 大多是尝试性操作,毕竟是分布式系统,消息重发、网络分区可能会让某些操作失败。这个函数是在节点回复
// 追加日志消息时被调用的,在反馈消息中节点告知Leader日志消息中最有一条日志的索引已经被接收,就是
// 下面的参数n,Leader尝试更新节点的Progress的状态。
func (pr *Progress) MaybeUpdate(n uint64) bool {
var updated bool
// 只有n比Match大才更新,否则可能是节点的进度根本没有变化。n小于Match笔者猜可能是过时的消息。
if pr.Match < n {
pr.Match = n
updated = true
// 这个函数就是把ProbeSent设置为false,试问为什么在这个条件下才算是确认收到探测包?
// 这就要从探测消息说起了,raft可以把日志消息、心跳消息当做探测消息,此处是把日志消息
// 当做探测消息的处理逻辑。新的日志肯定会让Match更新,只有收到了比Match更大的回复才
// 能算是这个节点收到了新日志消息,其他的反馈都可以视为过时消息。比如Match=9,新的日志
// 索引是10,只有收到了>=10的反馈才能确定节点收到了当做探测消息的日志。
pr.ProbeAcked()
}
// 这会发生在什么时候?Next是Leader认为发送给Peer最大的日志索引了,Peer怎么可能会回复一个
// 比Next更大的日志索引呢?这个其实是在系统初始化的时候亦或是每轮选举完成后,新的Leader
// 还不知道Leer的接收的最大日志索引,所以此时的Next还是个初识值。
if pr.Next < n+1 {
pr.Next = n + 1
}
return updated
}
// 源码注释翻译:Progress状态,当收到Peer拒绝的消息的时候使用,参数rejected、last是Peer拒绝的
// 和最后的日志的索引。因为消息的无序和重复发送可能会造成Peer的拒绝,因为Progress通过Match记录
// 了先前Peer已经确认收到的索引,所以这些是不需要调整状态的.如果拒绝超出了Progress预料,则明智地
// 降低Next。
func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool {
// 复制状态下Match是有效的,可以通过Match判断拒绝的日志是否已经无效了
if pr.State == StateReplicate {
// 拒绝的日志索引比Match小,可能是重复日志的回复,所以可以忽略
if rejected <= pr.Match {
return false
}
// 源码注释:直接把Next调整到Match+1。源码注释还有一句是如果last更大为什么不用他?
// last有可能比Match大么?让我们分析一下,因为在复制状态下Leader会发送多个日志信息
// 给Peer再等待Peer的回复,例如:Match+1,Match+2,Match+3,Match+4,此时如果
// Match+3丢了,那么Match+4肯定好会被拒绝,此时last应该是Match+2,Next=last+1
// 应该更合理。但是从peer的角度看,如果收到了Match+2的日志就会给leader一次回复,这个
// 回复理论上是早于当前这个拒绝消息的,所以当Leader收到Match+4拒绝消息,此时的Match
// 已经更新到Match+2,如果Peer回复的消息也丢包了Match可能也没有更新。所以Match+1
// 大概率和last相同,少数情况可能last更好,但是用Match+1做可能更保险一点。
pr.Next = pr.Match + 1
return true
}
// 源码注释翻译:如果拒绝日志索引不是Next-1,肯定是陈旧消息这是因为非复制状态探测消息一次只
// 发送一条日志。这句话是什么意思呢,读者需要注意,代码执行到这里说明Progress不是复制状态,
// 应该是探测状态。为了效率考虑,Leader向Peer发送日志消息一次会带多条日志,比如一个日志消息
// 会带有10条日志。上面Match+1,Match+2,Match+3,Match+4的例子是为了理解方便假设每个
// 日志消息一条日志。真实的情况是Message[Match,Match+9],Message[Match+10,Match+15],
// 一个日志消息如果带有多条日志,Peer拒绝的是其中一条日志。此时用什么判断拒绝索引日志就在刚刚
// 发送的探测消息中呢?所以探测消息一次只发送一条日志就能做到了,因为这个日志的索引肯定是Next-1。
if pr.Next-1 != rejected {
return false
}
// 根据Peer的反馈调整Next
if pr.Next = min(rejected, last+1); pr.Next < 1 {
pr.Next = 1
}
// 因为节点拒绝了日志,如果这个日志是探测消息,那就再探测一次,ProbeSent=true的话,Leader
// 就不会再发消息了
pr.ProbeSent = false
return true
}
// 判断Progress当前是否暂停,“暂停”这个词还是不错的,毕竟Progress是一个动态过程,而暂停即表示
// Leader不能再向Peer发日志消息了,必须等待Peer回复打破这个状态。
func (pr *Progress) IsPaused() bool {
// 不同状态下的暂停条件是不同的。
switch pr.State {
case StateProbe:
// 探测状态下如果已经发送了探测消息Progress就暂停了,意味着不能再发探测消息了,前一个消息
// 还没回复呢,如果节点真的不活跃,发再多也没用。
return pr.ProbeSent
case StateReplicate:
// 复制状态如果Inflights满了就是Progress暂停,这个很合理,也很好理解。
return pr.Inflights.Full()
case StateSnapshot:
// 快照状态Progress就是暂停的,Peer正在复制Leader发送的快照,这个过程是一个相对较大
// 而且重要的事情,因为所有的日志都是基于某一快照基础上的增量,所以快照不完成其他的都是
// 徒劳。
return true
default:
panic("unexpected state")
}
}
以上就是Progress几乎全部的代码了,代码量虽然不多,但是包含的内容确实不少,有些内容需要有一些上下文背景才能理解,笔者在注释中基本都提及了。
上一节提到了好多次Inflights,这里就对他进行一次手术刀式剖析:
// 代码源自go.etcd.io/etcd/raft/tracker/inflights.go
// 在解释Inflights前先温习小学的数据题:有一个水池子,一个入水口,一个出水口,小学题一般会问什么时候
// 能把池子放满。Inflights就好像这个池子,当Progress在复制状态时,Leader向Peer发日志消息相当于
// 放水,Peer回复日志已经收到了相当于出水,当池子满了就不能放水了,也就是上面提到的暂停。作为一个
// 容量相对固定的池子,有入水口有出水口,而且需要按照进水的顺序出水,这正符合queue的特性。而raft的
// 实现没有使用queue,而是在一个内存块上采用循环方式模拟queue的特性,这样效率会更高。就这么多了干货
// 了,没有其他更有价值的内容了。
type Inflights struct {
// 因为是循环使用内存块,需要用起始位置+数量的方式表达Inflights中的日志,start和count就是
// 这两个变量。
start int
count int
// size是内存块的大小
size int
// buffer是内存块,Inflights只记录日志的索引值,而不是日志本身,有索引就足够了。
buffer []uint64
}
// 创建Inflights,需要给Inflights的容量
func NewInflights(size int) *Inflights {
// 有没有注意到并没有为buffer申请内存?size是容量,但是实际运行过程中对于buffer的使用量可能
// 远远低于容量,此时申请size大小的内存明显是一种浪费,所以设计者采用动态调整buffer大小的方法
// 这个会在后面的函数中看到。此处来一个附加题,为什么实际运行过程中对于buffer的使用量可能远远
// 低于容量?例如,容量是256,但是即使用的量可能只有16。首先,日志是以消息为粒度发送的,一个
// 消息可以携带多个日志;其次,Inflights记录的是消息中最大日志的索引,所以它记录的是飞行中的
// 消息的数量,那么折算成飞行中的日志数量就更多了;第三,正常情况下日志发送到节点到接收到节点
// 的回复是非常快的,几毫秒到几十毫秒;第四,使用者在不频繁执行写操作的情况下节点间的IO性能基本
// 能够满足写IO,Inflights的缓冲效果就不明显了。所以说,在大部分情况下,buffer的使用远到不
// 了设置容量。
return &Inflights{
size: size,
}
}
// 向Inflights添加一个日志索引,就是向池子放水
func (in *Inflights) Add(inflight uint64) {
// 如果已经满了是不能再添加的
if in.Full() {
panic("cannot add into a Full inflights")
}
// 找到新添加的日志应该在内存块的位置,因为是循环使用内存块,算法也比较简单:(count) % size
next := in.start + in.count
size := in.size
if next >= size {
next -= size
}
// 这里有意思了,如果buffer大小不够了,那就再扩容。前面我们提到了,buffer不是上来就申请内存的
if next >= len(in.buffer) {
in.grow()
}
// 把日志索引存储buffer
in.buffer[next] = inflight
in.count++
}
// 为buffer扩容
func (in *Inflights) grow() {
// 每次扩上次容量的2倍,不多解释了
newSize := len(in.buffer) * 2
if newSize == 0 {
newSize = 1
} else if newSize > in.size {
newSize = in.size
}
// 把以前内存的内容拷贝到新内存上
newBuffer := make([]uint64, newSize)
copy(newBuffer, in.buffer)
in.buffer = newBuffer
}
// 把小于等于to的日志全部释放,为什么不是把等于to的释放掉?这个很简单,如果节点回复的消息丢包了,那么
// 就会造成部分日志无法释放。raft里日志是有序的,搜到了节点回复消息的使用为n,那就说明节点已经收到了
// n以前的全部日志,所以可以把之前的全部释放掉。
func (in *Inflights) FreeLE(to uint64) {
// 没有日志或者老旧消息则忽略
if in.count == 0 || to < in.buffer[in.start] {
return
}
// 找到第一个比to更大的日志
idx := in.start
var i int
for i = 0; i < in.count; i++ {
if to < in.buffer[idx] { // found the first large inflight
break
}
// 此处还是循环使用内存的操作
size := in.size
if idx++; idx >= size {
idx -= size
}
}
// 调整start和count
in.count -= i
in.start = idx
// 如果此时没有日志了,索性把start也调整到0的位置,我感觉这是coder的强迫症,哈哈~
if in.count == 0 {
in.start = 0
}
}
// 释放第一个日志
func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) }
// 判断是否满了
func (in *Inflights) Full() bool {
return in.count == in.size
}
// 获取日志数量
func (in *Inflights) Count() int { return in.count }
// 复位
func (in *Inflights) reset() {
in.count = 0
in.start = 0
}
是不是非常简单,以至于不用总结什么。
有了前两节的铺垫,再来理解ProgressTracker就比较容易了。ProgressTracker是Progress的管理者,可以理解为Leader跟踪所有Peer的Progress。
老规矩,先看Tracker的代码定义:
// 代码源自go.etcd.io/etcd/raft/tracker/tracker.go
type ProgressTracker struct {
// 这个会在后面的章节说明,此处现忽略
Voters quorum.JointConfig
// 所有的learners
Learners map[uint64]struct{}
// 所有的peer的Progress
Progress map[uint64]*Progress
// 这个会在后面的章节说明,此处现忽略
Votes map[uint64]bool
// 这个就是Inflights的容量。
MaxInflight int
}
除了和quota相关的内容会在下一章说明,其实就是一个Progress的map,所以Tracker也被不会有太过高深的内容。先来看看ProgressTracker初始化Progress的代码:
// 代码源自go.etcd.io/etcd/raft/tracker/tracker.go
// 源码注释:初始化给定Follower或Learner的Progress,该节点不能以任何一种形式存在,否则异常。
// ID是Peer的ID,match和next用来初始化Progress的。
func (p *ProgressTracker) InitProgress(id, match, next uint64, isLearner bool) {
// 完全按照注释来的,不能重复初始化
if pr := p.Progress[id]; pr != nil {
panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
}
// Follower可以参与选举和投票,Learner不可以,只要知道这一点就可以了。无论是Follower还是
// Learner都会有一个Progress,但是他们再次进行了分组管理。
if !isLearner {
// 此处为什么是Voters[0],这个在quorum解释,暂时就把他看做一个map好了。
p.Voters[0][id] = struct{}{}
} else {
p.Learners[id] = struct{}{}
}
// 初始化Progress需要给定Next、Match、Inflights容量以及是否是learner,其他也没啥
// 此处可以剧透一下,raft的代码初始化的时候Match=0,Next=1.
p.Progress[id] = &Progress{Next: next, Match: match, Inflights: NewInflights(p.MaxInflight), IsLearner: isLearner}
}
了解了Progress的初始化,接下来就是ProgressTracker存在的最大价值了,先来看一个非常简单的函数:
// 代码源自go.etcd.io/etcd/raft/tracker/tracker.go
// 源码注释:根据投票成员已确认的返回已提交的最大日志索引。这句话是什么意思呢?首先需要理解什么是投票
// 成员,raft中有Follower和Learner,只有Follower才有权投票,Learner是没有的,所以投票成员其实
// 就是Follower。最大日志索引比较好理解,没什么需要解释的,最重要的就是已提交,那什么才算是已提交呢?
// raft认为超过半数以上Follower确认接收的日志就算是已提交的,Committed()是从整个集群的角度计算出
// 已提交的最大日志索引。因为Leader是通过Progres是跟踪每个Follower的日志进度的,Follower之间还
// 存在这个各种差异(比如网络)使得彼此的进度不同,这就是这个函数存在的必要性。
func (p *ProgressTracker) Committed() uint64 {
// 此处是用Voters.CommittedIndex()实现的,这也是笔者将tracker和quorum放在一个文章的原因
// 下面需要了解一下matchAckIndexer,因为Voters.CommittedIndex()传入了这个类型的对象。
return uint64(p.Voters.CommittedIndex(matchAckIndexer(p.Progress)))
}
// matchAckIndexer就是Progress的map,这个和ProgressTracker.Progress本质上是同一个类型,所以
// 在上面的函数传入的参数matchAckIndexer(p.Progress)是做了一次强制的类型转换。
type matchAckIndexer map[uint64]*Progress
// matchAckIndexer实现了AckedIndexer(定义在quorum中的接口)的AckedIndex()接口函数,
// 而Voters.CommittedIndex()的参数其实是AckedIndexer类型的对象。AckedIndex()就是返回指定
// ID的Peer接收的最大日志索引,就是Progress.Match。
func (l matchAckIndexer) AckedIndex(id uint64) (quorum.Index, bool) {
// 根据ID找到Progress
pr, ok := l[id]
if !ok {
return 0, false
}
// 返回Progress.Match,现在我们已经知道了Match就是Peer回复给Leader确认收到的最大的日志索引
// 此处可以想象得到Voters.CommittedIndex()函数会用到每个peer的Progress.Match来计算raft
// 当前已经提交的最大日志索引。
return quorum.Index(pr.Match), true
}
根据上一章节的注释自然而然的进入到了quorum部分,笔者顺着上一章节的思路顺藤摸瓜,最终带着读者把quorum搞明白。从quorum的定义可以推测这个模块做的基本都是跟法定人数(即超过一半以上的人数)有关的功能,比如选举就需要超过一半以上的Peer支持才能成为Leader。
ProgressTracker.Voters的类型是quorum.JointConfig,但是笔者此处先卖个关子,先来看MajorityConfig,因为JointConfig.CommittedIndex()调用的是MajorityConfig.CommittedIndex()。
// 代码源自go.etcd.io/etcd/raft/quorum/majority.go
// MajorityConfig的定义其实就是peerID的set,所以MajorityConfig就是记录了所有Peer
type MajorityConfig map[uint64]struct{}
// 这里需要注意的是AckedIndexer就是上一节提到的matchAckIndexer,通过matchAckIndexer可以获取
// 所有节点Progress.Match。
func (c MajorityConfig) CommittedIndex(l AckedIndexer) Index {
n := len(c)
if n == 0 {
// 这里很有意思,当没有任何peer的时候返回值居然是无穷大(64位无符号范围内),如果都没有任何
// peer,0不是更合适?其实这跟JoinConfig类型有关,此处先放一放,后面会给出解释。
return math.MaxUint64
}
// 下面的代码对理解函数的实现原理没有多大影响,只是用了一个小技巧,在Peer数量不大于7个的情况下
// 优先用栈数组,否则通过堆申请内存。因为raft集群超过7个的概率不大,用栈效率会更高
var stk [7]uint64
srt := uint64Slice(stk[:])
if cap(srt) < n {
srt = make([]uint64, n)
}
srt = srt[:n]
{
// 把所有的Peer.Progress.Match放入srt数组中,看了源码注释也没太弄明白为什么从后往前
// 放,貌似是在排序的时候效率会更高。量一般在个位数的情况下不知道效率会高多少,读者如果
// 感兴趣可以自行了解,理解了设计目的麻烦告诉笔者。
i := n - 1
for id := range c {
if idx, ok := l.AckedIndex(id); ok {
srt[i] = uint64(idx)
i--
}
}
}
// 插入排序,这里只需要知道根据所有Peer.Progress.Match进行了排序即可,至于用什么排序并不重要
insertionSort(srt)
// 这句代码就是整个函数的精髓了,当前srt是按照peer.Progress.Match从小到达排好序了,此时需要
// 知道一个事情:Peer.Progress.Match代表了[0,Match]的日志全部被peer确认收到。有了这个前提
// 就非常容易理解了,可以把srt理解为按照处理速度升序排序的Peer。n - (n/2 + 1)之后的所有Peer
// 接收日志的速度都比它快,而在他之后包括他自己的节点数量正好超过一半,那么他的Match就是集群的
// 提交索引了。换句话说,有少于一半的节点的Match可能小于该节点的Match。
pos := n - (n/2 + 1)
return Index(srt[pos])
}
MajorityConfig除了用来计算raft的提交索引,同时也用来做选举结果统计,接下来看看他是如何实现的:
// 代码源自go.etcd.io/etcd/raft/quorum/majority.go
// 参数votes是一个map,支持自己成为leader那么votes[peerID]=true,所以这个函数就是一个唱票的实现
func (c MajorityConfig) VoteResult(votes map[uint64]bool) VoteResult {
if len(c) == 0 {
// 这里和CommittedIndex()设计方法一样,背着在后面解释
return VoteWon
}
// 统计支持者(nv[1])和反对者(nv[0])的数量
ny := [2]int{}
// 当然还有弃权的,raft的弃权不是peer主动弃权的,而是丢包或者超时造成的
var missing int
// 统计票数,这个也没啥好解释的了
for id := range c {
v, ok := votes[id]
if !ok {
missing++
continue
}
if v {
ny[1]++
} else {
ny[0]++
}
}
// 支持者超过一半代表选举胜利
q := len(c)/2 + 1
if ny[1] >= q {
return VoteWon
}
// 支持者和弃权数量超过一半以上选举挂起,因为可能还有一部分票还在路上~
if ny[1]+missing >= q {
return VotePending
}
// 反对者超过一半以上肯定就失败了
return VoteLost
}
以上就是MajorityConfig最核心的两个函数,他们最核心的思路都是找到大多数,所以用Majority这个单词还是比较贴切的。
千呼万唤始出来,终于轮到JointConfig上场了,让我们看看他的真身:
// 代码源自go.etcd.io/etcd/raft/quorum/joint.go
// 是不是有种想骂街的心情?就是这么简单,简单到让你有种上当的感觉~这符合joint的定义,由两个
// MajorityConfig组成,JointConfig和MajorityConfig功能是一样的,只是JointConfig的做法是
// 根据两个MajorityConfig的结果做一次融合性操作。
type JointConfig [2]MajorityConfig
// 这个函数的功能应该不需要解释了
func (c JointConfig) CommittedIndex(l AckedIndexer) Index {
idx0 := c[0].CommittedIndex(l)
idx1 := c[1].CommittedIndex(l)
// 返回的是二者最小的那个,这时候可以理解MajorityConfig.CommittedIndex()为什么Peers数
// 为0的时候返回无穷大了吧,如果返回0该函数就永远返回0了。
if idx0 < idx1 {
return idx0
}
return idx1
}
// 无需解释这个函数的功能
func (c JointConfig) VoteResult(votes map[uint64]bool) VoteResult {
r1 := c[0].VoteResult(votes)
r2 := c[1].VoteResult(votes)
// 相同的,下里面的判断逻辑基就可以知道MajorityConfig.VoteResult()在peers数为0返回选举
// 胜利的原因。
if r1 == r2 {
return r1
}
if r1 == VoteLost || r2 == VoteLost {
return VoteLost
}
return VotePending
}
如果此时读者还在懵逼与为什么会有MajorityConfig的peer数量是0,这时候读者应该回头看看ProgressTracker.InitProgress()代码,他只初始化了Voter[0]。笔者怀疑JointConfig是设计者为未来某个功能提前开发的,至少编写本文时的代码仅用到了一个MajorityConfig。
至此,还有一个疑问,那就是获得提交索引用来干什么?很简单,Leader用来把这个值广播到所有的Peer,这样Follower就可以把提交的日志给使用者应用了。因为此时的日志已经被超过一半以上的Peer接收了。
现在已经弄明白tracker和quorum在raft中的作用了,总结如下: