当前位置: 首页 > 工具软件 > BRAFT > 使用案例 >

braft源码学习笔记,api介绍

韦宏扬
2023-12-01

braft架构

braft分为几个板块,我根据自己的理解进行了概述:

ballot:

ballot,ballot_box

选票类,raft的选举leader步骤

cli:

cli,cli_service

客户端,cli用来修改或者获取复制状态机组节点状态,cli_service继承cli

closure:

closure_helper,closure_queue

返回时的信息和报告内容

configuration:

configuration,configuration_manager

配置信息类,peerid类,nodeid类

configuration_manager用来记录和管理configuration条目历史变更

file:

file_reader:读取文件内容

file_service:为状态机增加或者移除一个file_reader

file_system_adaptor:文件系统适配

protobuf_file:

remote_file_copier:远程文件复制,存储使用快照

fsm_caller:

复制状态机,发生事件时的操作

fsync:

参数选择内存数据落到磁盘的方式,直接调用mingw函数

lease:

作为leader节点和follower节点的状态

log:

log:日志类,log定义和LogStorage定义

log_entry:每条日志条目

log_manager:用于管理日志条目

memory_log:日志条目索引相关操作

node:

node:节点类,每一个node是一个复制状态机的节点实例

node_manager:node节点管理

raft(核心):

raft:定义task,状态机操作,节点参数,状态机参数等

raft_meta:

raft_service

repeated_timer_task:

重复任务计时器

replicator:

leader,follower之间的信息复制

route_table:

一张通信表,存多个复制组,可获取每个复制组leader,更新组配置,等

snapshot:

snapshot:快照类,

snapshot_executor:快照执行操作

snapshot_throttle:大容量磁盘读/写

storage:

存储类,包含元数据存储,log存储,快照存储,稳定存储等

util:

放一些常用的公用方法

 

之后对我认为比较重要的,或者官方文档中出现的头文件进行注释翻译

configuration.h

 

变量名类型说明
GroupId

typedef std::string

raft组名

VersionedGroupId

typedef std::string

GroupId加个版本号

 {group_id}_{index}

=======struct PeerId:=======

butil::EndPoint addr;//就是ip+port

int idx;//idx算是下标号,默认0

除了构造函数外,

reset():初始化值为0

is_empty():都为0时认为是空的

parse():解析字符串并赋值

to_string():将值转成字符串,ip:port:idx

<,==,!=,<<操作符被重载

=======struct NodeId:=======

GroupId group_id;//存组名

PeerId peer_id;//存一个peerid

有一个两值都传的构造函数

to_string():将值转成字符串,group_id:ip:port:idx

<,==,!=,<<操作符被重载

=======class Configuration:=======

std::set<PeerId> _peers;//是一个peers集合

typedef std::set<PeerId>::const_iterator const_iterator;//一个循环集合的迭代器

两个构造函数,可以用vector和set来构造

重载了=符号(赋值)

void reset():清空集合

bool empty():是否为空

size_t size() :返回大小

const_iterator begin() / end():返回迭代器头部 / 迭代器尾部

void list_peers(std::set<PeerId>* peers) / list_peers(std::vector<PeerId>* peers):使用参数替换掉当前集合

void append_peers(std::set<PeerId>* peers):将一系列集合插入到当前集合

bool add_peer(const PeerId& peer):插入一个新节点并判断是否插入成功(是否为新peer)

bool remove_peer(const PeerId& peer):移除节点,并判断是否移除

bool contains(const PeerId& peer_id) /contains(const std::vector<PeerId>& peers):判断是否存在这些peer

bool equals(const std::vector<PeerId>& peers) / equals(const Configuration& rhs): 判断是否和参数匹配

void diffs(const Configuration& rhs,

               Configuration* included,

               Configuration* excluded):得到当前值和rhs的不同,included为this-rhs,excluded为rhs-this

int parse_from(butil::StringPiece conf):从字符串解析并赋值(peerid之间用,隔开)

重载了<<输出符号,输出peerid,用逗号隔开

raft.h

#ifndef BRAFT_RAFT_H
#define BRAFT_RAFT_H
//一堆头文件
#include <string>

#include <butil/logging.h>
#include <butil/iobuf.h>
#include <butil/status.h>
#include <brpc/callback.h>
#include "braft/configuration.h"
#include "braft/enum.pb.h"
#include "braft/errno.pb.h"

template <typename T> class scoped_refptr;//一个模板类scoped_refptr

namespace brpc {
class Server;
}  // namespace brpc

namespace braft {
//给出这些类的声明,定义在其他c文件中
class SnapshotWriter;//快照写
class SnapshotReader;//快照读
class SnapshotHook;//
class LeaderChangeContext;//leader变更信息
class FileSystemAdaptor;//文件适配器
class SnapshotThrottle;//快照大磁盘
class LogStorage;//log存储
//给出一个默认值,会用来做初始化
const PeerId ANY_PEER(butil::EndPoint(butil::IP_ANY, 0), 0);

// 特定于raft的closure(返回报告),包含一个butil :: Status以报告操作是否成功。
class Closure : public google::protobuf::Closure {
public://两个获取status值的函数
    butil::Status& status() { return _st; }
    const butil::Status& status() const { return _st; }
    
private:
    butil::Status _st;//status
};

// 描述一个特定的错误
class Error {
public:
    //构造函数
    Error() : _type(ERROR_TYPE_NONE) {}
    Error(const Error& e) : _type(e._type), _st(e._st) {}
    //返回值
    ErrorType type() const { return _type; }
    const butil::Status& status() const { return _st; }
    butil::Status& status() { return _st; }
    //设置error类型
    void set_type(ErrorType type) { _type = type; }
    //重载赋值号
    Error& operator=(const Error& rhs) {
        _type = rhs._type;
        _st = rhs._st;
        return *this;
    }
private:
    // 需要赋值
    ErrorType _type;
    butil::Status _st;
};
//将error类型转为字符串
inline const char* errortype2str(ErrorType t) {
    switch (t) {
    case ERROR_TYPE_NONE:
        return "None";
    case ERROR_TYPE_LOG:
        return "LogError";
    case ERROR_TYPE_STABLE:
        return "StableError";
    case ERROR_TYPE_SNAPSHOT:
        return "SnapshotError";
    case ERROR_TYPE_STATE_MACHINE:
        return "StateMachineError";
    }
    return "Unknown";
}
//重载输出符号
inline std::ostream& operator<<(std::ostream& os, const Error& e) {
    os << "{type=" << errortype2str(e.type()) 
       << ", error_code=" << e.status().error_code()
       << ", error_text=`" << e.status().error_cstr()
       << "'}";
    return os;
}

// libraft的基本消息结构
struct Task {//任务
    Task() : data(NULL), done(NULL), expected_term(-1) {}//构造函数

    // 数据应用于状态机
    butil::IOBuf* data;

    // 当数据应用于状态机或发生错误时继续。
    Closure* done;

    // 如果expected_term与当前值不匹配,则拒绝此任务
    // 此节点,如果值不为-1
    // 默认值:-1
    int64_t expected_term;
};

class IteratorImpl;//类声明

// Iterator over a batch of committed tasks
//
// Example:
// void YouStateMachine::on_apply(braft::Iterator& iter) {
//     for (; iter.valid(); iter.next()) {
//         brpc::ClosureGuard done_guard(iter.done());
//         process(iter.data());
//     }
// }
class Iterator {
    DISALLOW_COPY_AND_ASSIGN(Iterator);//关闭隐式调用构造函数
public:
    // 移至下一个任务。
    void next();

    // 返回当前的唯一且单调递增的索引
    // 任务:
    //  - 唯一性保证了,在不同的peers中具有相同索引的已提交的任务始终相同且保持不变
    //  - 单调性确保在小组中的所有peers中,对于任何索引对i,j(i < j),
    //	  索引为| i |的任务必须在索引| j |的任务之前应用
    int64_t index() const;

    // 返回任务应用时的leader任期
    int64_t term() const;

    // 返回与在leader节点中传递给Node::apply的内容相同的数据
    const butil::IOBuf& data() const;

    // 如果done()为非空,无论此操作成功或失败,您都必须在应用此任务后
    // 调用done()->Run(),否则将泄漏相应的资源。
    //
    // 如果此节点在担任该组的负责人时提出此任务,并且领导位置在此之前没有改变,
    // 那么done()正是传递给Node::apply的内容,
    // 在使用给定任务更新StateMachine之后,done()可能代表某种延续(例如响应客户端)。
    // 否则done()必须为NULL
    Closure* done() const;

    // 如果此迭代器当前引用的是有效任务,返回true,
    // 否则返回false,说明迭代器到达了这批任务的结尾,或者说明发生了些错误
    bool valid() const;

    // 发生某些严重错误时调用。
    // 并且我们认为最后的|ntail|任务(从上一个迭代的任务开始)没有被应用
    //
    // 如果| st | 不是NULL,它将描述错误的详细信息。
    void set_error_and_rollback(size_t ntail = 1, const butil::Status* st = NULL);

private:
friend class FSMCaller;//友元类声明
    Iterator(IteratorImpl* impl) : _impl(impl) {}//构造函数
    ~Iterator() {};//析构函数

    // _impl的所有权属于FSMCaller;
    IteratorImpl* _impl;
};

// | StateMachine | 是一个raft节点的所有事件的接收器。
// 为您自己的业务逻辑实现特定的StateMachine。
// 注意:不需要所有接口都是线程安全的,它们是按顺序调用的,就是说每个方法都会阻塞之后的所有方法。
class StateMachine {
public:
    virtual ~StateMachine();//析构函数,需要继承并实现

    // 使用一批可以通过| iterator |访问的任务,来更新StateMachine。
    //
    // 当传递给Node :: apply的一个或多个任务已提交到raft组时(法定个数已收到这些任务,并将其存储在后备存储中)调用。
    //
    // 一旦该函数返回给调用者,我们认为所有通过| iter |处理迭代的任务已成功应用。 
    // 而且,如果没有应用所有给定的任务,我们会将其视为严重错误,并报告错误类型为ERROR_TYPE_STATE_MACHINE
    virtual void on_apply(::braft::Iterator& iter) = 0;//需要继承实现

    // 当raft节点崩溃关闭时调用一次。
    // 不实现默认啥都不做
    virtual void on_shutdown();

    // 用户定义的快照生成功能,此方法将阻塞on_apply。
    // 当fsm可以是cow(写时复制)时,用户可以使快照异步。
    // 快照完成后,调用done-> Run()。
    // 成功返回0,失败返回error.
    virtual void on_snapshot_save(::braft::SnapshotWriter* writer,
                                  ::braft::Closure* done);

    // 用户定义的快照加载功能
    // 获取并加载快照
    // 成功返回0,失败返回errno
    // 默认值:不加载任何内容并返回错误。
    virtual int on_snapshot_load(::braft::SnapshotReader* reader);

    // 当所属节点在| term |成为组的leader时调用
    // 默认值:不执行任何操作
    virtual void on_leader_start(int64_t term);

    // 当所属节点不是组的leader时调用
    // |status| 描述详细信息
    virtual void on_leader_stop(const butil::Status& status);

    // 当遇到严重错误时,将调用on_error,在这之后,在错误修复并重新启动该节点之前,不允许对该节点进行任何进一步的修改。
    virtual void on_error(const ::braft::Error& e);

    // 将配置提交给组后调用
    virtual void on_configuration_committed(const ::braft::Configuration& conf);
    virtual void on_configuration_committed(const ::braft::Configuration& conf, int64_t index);

    // 当跟随者停止跟随某个领导者并且其leader_id变为NULL时,将调用此方法,
    // 情况包括:
    // 1.处理选举超时并开始投票
    // 2.接收到候选者的更高term的请求,例如vote_request或来自新leader的append_entries_request
    // 3.从当前领导者接收timeout_now_request并开始request_vote
    // 参数stop_following_context提供有关跟随者之前跟随过的领导者的信息(leader_id, term and status)
    // 用户可以在节点停止跟随某个领导者时重置节点的信息。
    virtual void on_stop_following(const ::braft::LeaderChangeContext& ctx);

    // 当跟随者或候选人开始跟随领导者时,将调用此方法
    // 其leader_id(在调用方法之前应为NULL)设置为领导者的id,
    // 情况包括:
    // 1.候选人收到领导的append_entries
    // 2.跟随者(无领导者)从领导者那里获得append_entries
    // 参数start_following_context提供有关跟随者开始跟随的领导者的信息(leader_id, term and status)
    // 用户可以在开始跟随某个领导者时重置节点的信息。
    virtual void on_start_following(const ::braft::LeaderChangeContext& ctx);
};

//节点的状态
enum State {
    // 如果您不确定用法,请不要更改顺序。
    STATE_LEADER = 1,
    STATE_TRANSFERRING = 2,
    STATE_CANDIDATE = 3,
    STATE_FOLLOWER = 4,
    STATE_ERROR = 5,
    STATE_UNINITIALIZED = 6,
    STATE_SHUTTING = 7,
    STATE_SHUTDOWN = 8,
    STATE_END,
};

//状态转成字符串
inline const char* state2str(State state) {
    const char* str[] = {"LEADER", "TRANSFERRING", "CANDIDATE", "FOLLOWER", 
                         "ERROR", "UNINITIALIZED", "SHUTTING", "SHUTDOWN", };
    if (state < STATE_END) {
        return str[(int)state - 1];
    } else {
        return "UNKNOWN";
    }
}

// 如果| s | 指出节点处于活动状态,返回true 
inline bool is_active_state(State s) {
    // This should be as fast as possible
    return s < STATE_ERROR;
}

// 此类封装了on_start_following和on_stop_following接口的参数。
class LeaderChangeContext {
    DISALLOW_COPY_AND_ASSIGN(LeaderChangeContext);//关闭隐式构造函数
public:
    // 构造函数
    LeaderChangeContext(const PeerId& leader_id, int64_t term, const butil::Status& status)
        : _leader_id(leader_id)
        , _term(term) 
        , _st(status)
    {};

    // 对于on_start_following,leader_id和term是新领导者的;
    // 对于on_stop_following,leader_id和term是旧领导者的。
    const PeerId& leader_id() const { return _leader_id; }
    int64_t term() const { return _term; }

    // 返回有关为什么调用on_start_following或on_stop_following的信息。
    const butil::Status& status() const { return _st; }
        
private:
    PeerId _leader_id;
    int64_t _term;
    butil::Status _st;
};

//重载输出符号
inline std::ostream& operator<<(std::ostream& os, const LeaderChangeContext& ctx) {
    os << "{ leader_id=" << ctx.leader_id()
       << ", term=" << ctx.term()
       << ", status=" << ctx.status()
       << "}";
    return os;
}


class UserLog {
    DISALLOW_COPY_AND_ASSIGN(UserLog);//关闭隐式构造
public:
    // 构造函数
    UserLog() {};
    UserLog(int64_t log_index, const butil::IOBuf& log_data)
        : _index(log_index)
        , _data(log_data)
    {};
    // 返回日志索引号
    int64_t log_index() const { return _index; }
    // 返回日志内容
    const butil::IOBuf& log_data() const { return _data; }
    // 设置日志内容和索引
    void set_log_index(const int64_t log_index) { _index = log_index; }
    void set_log_data(const butil::IOBuf& log_data) { _data = log_data; }
    // 清空
    void reset() {
        _index = 0;
        _data.clear();
    }

private:
    int64_t _index;
    butil::IOBuf _data;
};

// 重载输出符号
inline std::ostream& operator<<(std::ostream& os, const UserLog& user_log) {
    os << "{user_log: index=" << user_log.log_index()
       << ", data size=" << user_log.log_data().size()
       << "}";
    return os;
}

// peer的状态
struct PeerStatus {
    //构造函数
    PeerStatus()
        : valid(false), installing_snapshot(false), next_index(0)
        , last_rpc_send_timestamp(0), flying_append_entries_size(0)
        , readonly_index(0), consecutive_error_times(0)
    {}

    bool    valid;
    bool    installing_snapshot;
    int64_t next_index;
    int64_t last_rpc_send_timestamp;
    int64_t flying_append_entries_size;
    int64_t readonly_index;
    int     consecutive_error_times;
};

// 节点状态
struct NodeStatus {
    typedef std::map<PeerId, PeerStatus> PeerStatusMap;

    NodeStatus()
        : state(STATE_END), readonly(false), term(0), committed_index(0), known_applied_index(0)
        , pending_index(0), pending_queue_size(0), applying_index(0), first_index(0)
        , last_index(-1), disk_index(0)
    {}

    State state;
    PeerId peer_id;//节点的id
    PeerId leader_id;//;leader的id
    bool readonly;//是否只读
    int64_t term;
    // 两个索引,一个已提交索引,一个已应用索引
    int64_t committed_index;
    int64_t known_applied_index;

    // 等待提交的日志的起始索引。
    // 如果值为0,则表示没有待处理的日志。
    // 
    // 警告:如果该值不为0,并且长时间保持该值不变,则表示发生了某些事情导致节点很有可能无法提交日志,因此用户应仔细检查以找出原因。
    int64_t pending_index;

    // 有多少日志等待提交。
    // 
    // 警告:待处理的日志过多,意味着处理速度无法满足写入速度。 用户可以考虑降低写入速度,以避免资源耗尽。
    int64_t pending_queue_size;

    // 当前的应用索引。 如果值为0,则表示没有应用日志。
    //
    // 警告:如果该值不为0,并且长时间保持不变,则表示应用线程已挂起,用户应检查是否发生了死锁,或是否正在处理一些耗时的操作。
    int64_t applying_index;

    // 节点的第一个日志,包括内存和磁盘中的日志。
    int64_t first_index;

    // 节点的最后一个日志,包括内存和磁盘中的日志。
    int64_t last_index;

    // 磁盘中的最后一个日志。
    int64_t disk_index;

    // 稳定的跟随者是当前配置中的peers
    // 如果该节点不是领导者,则此映射为空。
    PeerStatusMap stable_followers;

    // 不稳定的跟随者是不在当前配置中的peers
    // 举例来说,如果添加了一个新的peer,但现在还未赶上leader,它在此映射中。
    PeerStatusMap unstable_followers;
};

// 租赁状态。 以下是典型的租约状态更改图:
// =========================================================================
// event:                 become leader                 become follower
//                        ^           on leader start   ^   on leader stop
//                        |           ^                 |   ^
// time:        ----------|-----------|-----------------|---|-------
// lease state:   EXPIRED | NOT_READY |      VALID      |  EXPIRED  
// =========================================================================
enum LeaseState {
    // Lease is disabled,只有在以下情况下才会返回此状态
    // |raft_enable_leader_lease == false|.
    LEASE_DISABLED = 1,

    // Lease is expired, 此节点不再是领导者。
    LEASE_EXPIRED = 2,

    // 该节点是领导者,但我们不确定数据是最新的。 
    // 此状态一直持续到| on_leader_start |。 或领导下台。
    LEASE_NOT_READY = 3,

    // Lease is valid.
    LEASE_VALID = 4,
};

// Status of a leader lease.
struct LeaderLeaseStatus {
    // 构造函数
    LeaderLeaseStatus()
        : state(LEASE_DISABLED), term(0), lease_epoch(0)
    {}

    LeaseState state;

    // 这些字段仅在| state == LEASE_VALID |时才有意义。
    
    // The term of this lease
    int64_t term;

    // 当转移leader超时时,一个特定的TERM可能具有多个租赁。 在节点的生命周期中,保证租赁时期将单调增加。
    int64_t lease_epoch;
};

struct NodeOptions {
    // 如果在| election_timeout_ms |毫秒的时间里未收到来自领导者的任何消息,则追随者将成为候选人。
    // 默认为: 1000 (1s)
    int election_timeout_ms; //follower to candidate timeout

    // 最大时钟浮动时间,它将用于维护领导者租赁的安全。
    // 默认为: 1000 (1s)
    int max_clock_drift_ms;

    // 如果将其置为正数,每|snapshot_interval_s| 秒会触发一次快照保存。
    // 如果 |snapshot_interval_s| <= 0, 基于时间保存快照的功能将被禁用。
    // 默认为: 3600 (1 hour)
    int snapshot_interval_s;

    // 我们认为一个新增的peer节点已经追赶上当前进度,假如peer节点的last_log_index和
    // leader节点之间的差值小于 |catchup_margin|
    // 默认为: 1000
    int catchup_margin;

    // 如果节点从一个新环境启动(LogStorage和SnapshotStorage均为空)
    // 那么它会使用| initial_conf | 作为组的配置,否则它将从现有环境中加载配置。
    // 默认: A empty group
    Configuration initial_conf;

    // 在pthread而非bthread中运行user callbacks and user closures
    // 
    // 默认: false
    bool usercode_in_pthread;

    // 特定的StateMachine实现了您的业务逻辑,该业务逻辑必须是有效实例。
    StateMachine* fsm;

    // 如果|node_owns_fsm|为true,那么当不再引用节点时,|fsm|将被销毁
    // 默认: false
    bool node_owns_fsm;

    // 在业务层实现的特定LogStorage,应为有效实例,否则默认使用SegmentLogStorage。
    //
    // 默认: null
    LogStorage* log_storage;

    // 如果| node_owns_log_storage | 是true。 | log_storage | 当不再引用后备节点时,它将被销毁。
    //
    // 默认: true
    bool node_owns_log_storage;

    // 以${类型}://${参数}格式描述特定的LogStorage
    // 当 |log_storage| 是 null 时有效
    std::string log_uri;

    // 以${类型}://${参数}格式描述特定的RaftMetaStorage
    // 到目前为止提供了三种类型:
    // 1. type=local
    //     FileBasedSingleMetaStorage(旧名是LocalRaftMetaStorage) 将被使用
    //     它基于protobuf文件,并且仅管理一个节点的稳定meta
    //     典型格式: local://${node_path}
    // 2. type=local-merged
    //     KVBasedMergedMetaStorage 将被使用, 其底层基于KV存储,并在同一磁盘上管理一批节点。 
    //     当Multi-raft下拥有大量节点时,该解决方案旨在解决在领导者选举期间由非常多小的同步IO引起的性能问题。
    //     典型格式: local-merged://${disk_path}
    // 3. type=local-mixed
    //     MixedMetaStorage 将被使用, 当升级或降级后,这将使上述两种类型的元存储重复写入。
    //     典型格式: 
    //     local-mixed://merged_path=${disk_path}&&single_path=${node_path}
    // 
    // 升级和降级步骤:
    //     upgrade from Single to Merged: local -> mixed -> merged
    //     downgrade from Merged to Single: merged -> mixed -> local
    std::string raft_meta_uri;

    // 以${类型}://${参数}格式描述特定的SnapshotStorage
    std::string snapshot_uri;

    // 如果启用,在从远程复制快照之前,将过滤掉重复的文件,以避免无用的传输。
    // 仅当本地文件和远程文件中的两个文件具有相同的文件名
    // 和相同的checksum(存储在文件meta中)时,才会开始复制
    // 默认: false
    bool filter_before_copy_remote;

    // 如果 non-null, 我们将把这个snapshot_file_system_adaptor传递给SnapshotStorage
    // 默认: NULL
    scoped_refptr<FileSystemAdaptor>* snapshot_file_system_adaptor;    
    
    // 如果 non-null, 我们将把此snapshot_throttle传递给SnapshotExecutor
    // 默认: NULL
    scoped_refptr<SnapshotThrottle>* snapshot_throttle;

    // 如果为true,将拒绝通过raft_cli进行的RPC。
    // 默认:false
    bool disable_cli;

    // 构造一个默认实例
    NodeOptions();
};
//构造函数的默认值
inline NodeOptions::NodeOptions() 
    : election_timeout_ms(1000)
    , max_clock_drift_ms(1000)
    , snapshot_interval_s(3600)
    , catchup_margin(1000)
    , usercode_in_pthread(false)
    , fsm(NULL)
    , node_owns_fsm(false)
    , log_storage(NULL)
    , node_owns_log_storage(true)
    , filter_before_copy_remote(false)
    , snapshot_file_system_adaptor(NULL)
    , snapshot_throttle(NULL)
    , disable_cli(false)
{}

class NodeImpl;//类声明
//节 点 类
class Node {
public:
    //构造函数和析构函数
    Node(const GroupId& group_id, const PeerId& peer_id);
    virtual ~Node();

    // get node id
    NodeId node_id();

    // get leader PeerId, 重定向
    PeerId leader_id();

    // 如果这是所属组的leader,则返回true
    bool is_leader();

    // 如果这是领导者且领导者租约有效,则返回true。
    // | raft_enable_leader_lease == false |时,它始终为false。
    // 在以下情况下,返回的true是不可信的:
    //    -  并非所有在raft组中的节点将 |raft_enable_leader_lease| 设置为 true,
    //       正在转移leader或者正在投票;
    //    -  在raft组中,一个节点中的| election_timeout_ms |比另外一和节点中的|election_timeout_ms + max_clock_drift_ms| 大
    bool is_leader_lease_valid();

    // 获取领导者租赁状态以进行更复杂的检查
    void get_leader_lease_status(LeaderLeaseStatus* status);

    // 使用节点设置来初始化node
    int init(const NodeOptions& options);

    // 关闭本地副本。
    // done 是用户定义的功能, 可能响应客户端或清理一些资源
    // [注意] 应用后的代码无法完成访问资源
    void shutdown(Closure* done);

    // 阻塞线程,直到节点成功停止。
    void join();

    // [Thread-safe and wait-free]
    // 将任务应用于复制状态机
    //
    // 关于所有权:
    // |task.data|: 出于性能考虑,我们将删除内容。如果要保留内容,请在调用此函数之前将其复制
    // |task.done|: 如果data已成功提交到raft组。我们将所有权传递给StateMachine::on_apply.
    // 否则,我们将指定错误并调用它。
    void apply(const Task& task);

    // 列出raft组中的peer,只有leader返回ok
    // [注意]当list_peers与add_peer / remove_peer并发时,返回可能会被阻塞
    // 因为add_peer / remove_peer会立即修改内存中的配置
    butil::Status list_peers(std::vector<PeerId>* peers);

    // 在raft组中添加一个新的peer节点
    // 此操作完成后,将调用done-> Run(),描述结果详细信息。
    void add_peer(const PeerId& peer, Closure* done);

    // 从raft组中移除一个peer节点
    // 此操作完成后,将调用done-> Run(),描述结果详细信息。
    void remove_peer(const PeerId& peer, Closure* done);

    // 将raft组的配置优雅地更改为|new_peers|
    // 此操作完成后,将调用done-> Run(),描述结果详细信息。
    void change_peers(const Configuration& new_peers, Closure* done);

    // 单独重置此节点的配置,在此节点成为领导者之前,无需和其他peers节点有任何复制
    // 当大多数复制组都已失效,并且您希望在可用性方面恢复服务时,该功能应该被调用
    //
    // 请注意,在这种情况下,既不能保证一致性也不可以保证共识,在处理此方法时**请务必小心**。
    butil::Status reset_peers(const Configuration& new_peers);

    // 如果可能,立即启动快照。
    // 快照完成时将调用done-> Run(),描述详细结果。
    void snapshot(Closure* done);

    // 用户触发投票
    // 重设eletection_timeout,这样某些节点更有可能成为leader
    butil::Status vote(int election_timeout);

    // 对于该节点重置| election_timeout_ms | ,| max_clock_drift_ms | 也已调整
    // 保持| election_timeout_ms |的总和 和| max_clock_drift_ms | 不变。
    butil::Status reset_election_timeout_ms(int election_timeout_ms);

    // 强制重置| election_timeout_ms | 和| max_clock_drift_ms |。 它可能会破坏领导者的租约安全性,应小心。
    // 以下是安全更改 |election_timeout_ms| 的建议
    // 1. 三步来升级|election_timeout_ms| 到更大数值
    //     - 在所有同级节点中放大| max_clock_drift_ms | 确保 
            |old election_timeout_ms + new max_clock_drift_ms|比
            |new election_timeout_ms + old max_clock_drift_ms|更大.
    //     - 至少等待 |old election_timeout_ms + new max_clock_drift_ms| 的时间以确保以前
            所有选举都完成。
    //     - 升级| election_timeout_ms | 到新的值,同时| max_clock_drift_ms | 可以重新设置
            为旧值。
    // 2. 三步来升级 |election_timeout_ms| 到小数值:
    //     - 同时调整| election_timeout_ms | 和| max_clock_drift_ms |,使
            | election_timeout_ms + max_clock_drift_ms |的总和不变。
    //     - 至少等待 |election_timeout_ms + max_clock_drift_ms| 的时间以确保以前的
            所有选举都完成。
    //     - 升级| max_clock_drift_ms | 回到旧值
    void reset_election_timeout_ms(int election_timeout_ms, int max_clock_drift_ms);

    // 尝试将leader位置转移给| peer |。
    // 如果peer是ANY_PEER,下个term将选出合适的follower作为leader
    // 成功返回0,否则返回-1。
    int transfer_leadership_to(const PeerId& peer);

    // 从给定索引中读取第一个提交的用户日志。
    // 成功返回OK,并为user_log分配数据。
    // 请注意,user_log可能不是给定索引处的确切日志,而是从给定索引到last_committed_index的第一个可用用户日志。
    // 否则,将返回适当的错误:
    //     - 当日志被删除后返回ELOGDELETED;
    //     - 当我们即使到达了last_committed_index都无法获得用户日志时,返回ENOMOREUSERLOG。
    // [注意]考虑到安全性,在代码实现中我们使用last_applied_index代替last_committed_index
    butil::Status read_committed_user_log(const int64_t index, UserLog* user_log);

    // 获取此节点的内部状态,信息与我们在网站上看到的信息基本相同。
    void get_status(NodeStatus* status);

    // 使该节点进入只读模式。
    // 只读模式仅应在某些极端情况下用于保护系统。
    // 例如,在存储系统中,太多的写请求涌入系统
    // 不幸的,系统处于耗尽容量的危险中,没有足够的时间添加新计算机并等待容量平衡。一旦许多磁盘已满,raft组就会发生仲裁死亡。此示例中的一种选择是只读模式,该模式允许领导者拒绝新的写请求,但仍处理读请求和配置更改。
    // 如果跟随节点变为只读,则leader停止将新日志复制到该节点。 在leader仍然可写的情况下,这可能导致数据远远落后于leader。 跟随节点退出只读模式后,leader将继续复制丢失的日志。
    void enter_readonly_mode();

    // 使该节点离开只读模式。
    void leave_readonly_mode();

    // 检查此节点是否为只读。
    // 有两种情况,如果节点是只读的:
    //      - 通过调用enter_readonly_mode()将该节点标记为只读
    //      - 此节点是leader,并且组中可写节点的数量少于大多数。
    bool readonly();

private:
    NodeImpl* _impl;
};
//引导程序选项
struct BootstrapOptions {

    // 包含此raft组的初始成员
    // 默认: empty conf
    Configuration group_conf;

    // 转储快照中包含的最后一个索引
    // 默认: 0
    int64_t last_log_index;

    // 将要转储第一个快照的特定StateMachine
    // 如果last_log_index不为0,则fsm必须是有效实例。
    // 默认: NULL
    StateMachine* fsm;

    // 如果| node_owns_fsm | 是true。 当不再引用后备节点时,| fsm | 将被销毁。
    // 默认: false
    bool node_owns_fsm;

    // 在pthread而非bthread中运行user callbacks and user closures
    // 默认: false
    bool usercode_in_pthread;

    // 以${类型}://${参数}格式描述特定的LogStorage
    std::string log_uri;

    // // 以${类型}://${参数}格式描述特定的RaftMetaStorage
    std::string raft_meta_uri;

    // 以${类型}://${参数}格式描述特定的SnapshotStorage
    std::string snapshot_uri;

    // 构造默认选项
    BootstrapOptions();

};

// 引导一个非空的raft节点
int bootstrap(const BootstrapOptions& options);

// 将raft服务附加到| server |(自己的服务端),这使raft服务与用户服务共享相同的侦听地址。
// 注意:目前,我们仅允许使用特定的侦听地址启动备用服务器,如果要从一个范围端口启动该服务器,则行为是不确定的。
// 成功返回0,否则返回-1。
int add_service(brpc::Server* server, const butil::EndPoint& listen_addr);
int add_service(brpc::Server* server, int port);
int add_service(brpc::Server* server, const char* listen_ip_and_port);

// 回收
struct GCOptions {
    // raft实例的Versioned-groupid。
    // 版本是很重要的, 因为具有相同groupid的实例在销毁后可能很快会再次创建。
    VersionedGroupId vgid;
    std::string log_uri;
    std::string raft_meta_uri;
    std::string snapshot_uri;
};

// 如果磁盘被删除但是并没有从global_mss_manager释放,之后再次添加该怎么办? 看起来没问题,因为该磁盘上的所有实例在删除磁盘本身之前都会被销毁,因此不会有垃圾。 
// 
// 由于某种原因破坏实例时,对raft实例的数据进行回收。
//
// 成功返回0,否则返回-1
int gc_raft_data(const GCOptions& gc_options);

}  //  namespace braft

#endif //BRAFT_RAFT_H

 

storage.h

#ifndef BRAFT_RAFT_STORAGE_H
#define BRAFT_RAFT_STORAGE_H
// 一堆头文件
#include <string>
#include <vector>
#include <gflags/gflags.h>
#include <butil/status.h>
#include <butil/class_name.h>
#include <brpc/extension.h>
#include <butil/strings/string_piece.h>
#include "braft/configuration.h"
#include "braft/configuration_manager.h"

namespace google {
namespace protobuf {
class Message;
}  // namespace protobuf
}  // namespace google

namespace braft {
// 使用的是谷歌的gflags,大致就是可以使用这三个参数做调参
DECLARE_bool(raft_sync);
DECLARE_bool(raft_sync_meta);
DECLARE_bool(raft_create_parent_directories);

struct LogEntry;//声明,log条目

// 个人感觉像是统计运行时长用的
struct IOMetric {
public:
    IOMetric() 
        : start_time_us(butil::cpuwide_time_us())
        , bthread_queue_time_us(0)
        , open_segment_time_us(0)
        , append_entry_time_us(0)
        , sync_segment_time_us(0) {}

    int64_t start_time_us;
    int64_t bthread_queue_time_us;
    int64_t open_segment_time_us;
    int64_t append_entry_time_us;
    int64_t sync_segment_time_us;
};

// 重载输出符号
inline std::ostream& operator<<(std::ostream& os, const IOMetric& m) {
    return os << " bthread_queue_time_us: " << m.bthread_queue_time_us
              << " open_segment_time_us: " << m.open_segment_time_us 
              << " append_entry_time_us: " << m.append_entry_time_us
              << " sync_segment_time_us: " << m.sync_segment_time_us;
}
//解析uri
inline butil::StringPiece parse_uri(butil::StringPiece* uri, std::string* parameter) {
    // ${protocol}://${parameters}
    size_t pos = uri->find("://");
    if (pos == butil::StringPiece::npos) {
        return butil::StringPiece();
    }
    butil::StringPiece protocol = uri->substr(0, pos);
    uri->remove_prefix(pos + 3/* length of '://' */);
    protocol.trim_spaces();
    parameter->reserve(uri->size());
    parameter->clear();
    size_t removed_spaces = 0;
    for (butil::StringPiece::const_iterator 
            iter = uri->begin(); iter != uri->end(); ++iter) {
        if (!isspace(*iter)) {
            parameter->push_back(*iter);
        } else {
            ++removed_spaces;
        }
    }
    LOG_IF(WARNING, removed_spaces) << "Removed " << removed_spaces 
            << " spaces from `" << *uri << '\'';
    return protocol;
}
// 回收资源
inline int gc_dir(const std::string& path) {
    butil::File::Error e;
    butil::FilePath target_path(path);
    butil::FilePath tmp_path(path + ".tmp");
    // delete tmp path firstly in case there is garbage
    if (!butil::DeleteFile(tmp_path, true)) {
        LOG(ERROR) << "Fail to delete tmp file, path: " << tmp_path.value();
        return -1;
    }

    if (butil::PathExists(target_path)) {
        const bool rc = butil::ReplaceFile(butil::FilePath(target_path),
                                          butil::FilePath(tmp_path), &e);
        if (!rc) {
            LOG(ERROR) << "Fail to rename `" << target_path.value()
                       << " to `" << tmp_path.value() << "' : " << e;
            return -1;
        }
        if (!butil::DeleteFile(tmp_path, true)) {
            LOG(ERROR) << "Fail to delete tmp file, path: " << tmp_path.value();
            return -1;
        }
    } else {
        LOG(INFO) << "Target path not exist, so no need to gc, path: " 
                  << target_path.value();
    }
    return 0; 
}

// log存储结构
class LogStorage {
public:
    virtual ~LogStorage() {}

    // 初始化,检查一致性和完整性,纯虚函数,由子类实现
    virtual int init(ConfigurationManager* configuration_manager) = 0;

    // 日志中的第一个日志索引
    virtual int64_t first_log_index() = 0;

    // 日志中的最后一个日志索引
    virtual int64_t last_log_index() = 0;

    // 通过索引号获取日志条目
    virtual LogEntry* get_entry(const int64_t index) = 0;

    // 通过索引号获取日志条目的所属term
    virtual int64_t get_term(const int64_t index) = 0;

    // 将条目追加到日志
    virtual int append_entry(const LogEntry* entry) = 0;

    // 追加条目以记录和更新IOMetric,返回追加成功编号
    virtual int append_entries(const std::vector<LogEntry*>& entries, IOMetric* metric) = 0;

    // 从存储的头部删除日志,[first_log_index,first_index_kept)将被丢弃
    virtual int truncate_prefix(const int64_t first_index_kept) = 0;

    // 从存储的尾部删除未提交的日志,(last_index_kept,last_log_index]将被丢弃
    virtual int truncate_suffix(const int64_t last_index_kept) = 0;

    // 删除所有现有日志,然后将下一个日志索引重置为| next_log_index |。
    // 在安装从leader节点传来的快照后调用此函数
    virtual int reset(const int64_t next_log_index) = 0;

    // 使用| uri |中编码的参数创建此类LogStorage的实例。
    // 成功则返回引用到实例的指针,否则返回NULL。
    virtual LogStorage* new_instance(const std::string& uri) const = 0;

    static LogStorage* create(const std::string& uri);

    // 回收使用| uri |参数的实例的资源
    virtual butil::Status gc_instance(const std::string& uri) const {
        CHECK(false) << butil::class_name_str(*this)
                     << " didn't implement gc_instance interface while deleting"
                        " raft log in " << uri;
        butil::Status status;
        status.set_error(ENOSYS, "gc_instance interface is not implemented");
        return status;
    }
    
    static butil::Status destroy(const std::string& uri);
};

// 第二大块存储结构,用来存放一些RAFT算法自身的状态数据, 比如term, vote_for等信息.
class RaftMetaStorage {
public:
    virtual ~RaftMetaStorage() {}//构造函数

    // 初始化
    virtual butil::Status init() = 0;

    // 设置term 和votedfor信息
    virtual butil::Status set_term_and_votedfor(const int64_t term, 
                    const PeerId& peer_id, const VersionedGroupId& group) = 0;

    // 得到 term 和votedfor信息
    virtual butil::Status get_term_and_votedfor(int64_t* term, PeerId* peer_id, 
                                                const VersionedGroupId& group) = 0;

    // 使用| uri |中编码的参数创建此类RaftMetaStorage的实例。
    // 成功则返回引用到实例的指针,否则返回NULL。
    virtual RaftMetaStorage* new_instance(const std::string& uri) const = 0;

    static RaftMetaStorage* create(const std::string& uri);
    
    // 回收使用| uri |参数的实例的资源
    virtual butil::Status gc_instance(const std::string& uri, 
                                     const VersionedGroupId& vgid) const {
        CHECK(false) << butil::class_name_str(*this)
                     << " didn't implement gc_instance interface while deleting"
                        " raft stable meta in " << uri;
        butil::Status status;
        status.set_error(ENOSYS, "gc_instance interface is not implemented");
        return status;
    }
    
    static butil::Status destroy(const std::string& uri, 
                                const VersionedGroupId& vgid);

};

// 快照
class Snapshot : public butil::Status {
public:
    Snapshot() {}//
    virtual ~Snapshot() {}

    // 获取快照的路径
    virtual std::string get_path() = 0;

    // 列出当前快照中的所有现有文件
    virtual void list_files(std::vector<std::string> *files) = 0;

    // 获取实现定义的file_meta(implementation-defined file_meta)
    virtual int get_file_meta(const std::string& filename, 
                              ::google::protobuf::Message* file_meta) {
        (void)filename;
        file_meta->Clear();
        return 0;
    }
};

// 快照写
class SnapshotWriter : public Snapshot {
public:
    SnapshotWriter() {}
    virtual ~SnapshotWriter() {}

    // 保存被raft框架使用的快照的元信息。
    virtual int save_meta(const SnapshotMeta& meta) = 0;

    // 将文件添加到快照。
    // |file_meta| 是一个实现定义的原型消息 (implmentation-defined protobuf message, 不会翻译)
    // 所有实现都必须处理|file_meta| 为null的情况,确保不会引发错误
    // 请注意,是否将文件创建到后备存储器是由实现定义(implmentation-defined)的。
    virtual int add_file(const std::string& filename) { 
        return add_file(filename, NULL);
    }

    virtual int add_file(const std::string& filename, 
                         const ::google::protobuf::Message* file_meta) = 0;

    // 从快照中删除文件
    // 请注意,是否从后备存储器中删除文件是由实现定义(implmentation-defined)的。
    virtual int remove_file(const std::string& filename) = 0;
};

// 快照读
class SnapshotReader : public Snapshot {
public:
    SnapshotReader() {}
    virtual ~SnapshotReader() {}

    // Load meta from 
    virtual int load_meta(SnapshotMeta* meta) = 0;

    // 为其他peers生成uri以复制此快照。
    // 如果发生某些错误,则返回一个空字符串
    virtual std::string generate_uri_for_copy() = 0;
};

// 从给定资源复制快照
class SnapshotCopier : public butil::Status {
public:
    virtual ~SnapshotCopier() {}
    // 取消复制作业
    virtual void cancel() = 0;
    // 阻塞线程,直到此复制作业完成或发生某些错误。
    virtual void join() = 0;
    // 获取代表复制的快照的SnapshotReader
    virtual SnapshotReader* get_reader() = 0;
};

// 类声明
class SnapshotHook;
class FileSystemAdaptor;
class SnapshotThrottle;

// 快照存储
class SnapshotStorage {
public:
    virtual ~SnapshotStorage() {}
    // 以下三种都有参数设定
    virtual int set_filter_before_copy_remote() {
        CHECK(false) << butil::class_name_str(*this) 
                     << " doesn't support filter before copy remote";
        return -1;
    }

    virtual int set_file_system_adaptor(FileSystemAdaptor* fs) {
        (void)fs;
        CHECK(false) << butil::class_name_str(*this) 
                     << " doesn't support file system adaptor";
        return -1;
    }

    virtual int set_snapshot_throttle(SnapshotThrottle* st) {
        (void)st;
        CHECK(false) << butil::class_name_str(*this) 
                     << " doesn't support snapshot throttle";
        return -1;
    }

    // 初始化
    virtual int init() = 0;

    // 创建一个新的 快照编写器
    virtual SnapshotWriter* create() = 0;

    // 关闭快照编写器
    virtual int close(SnapshotWriter* writer) = 0;

    // 获取最新的快照阅读器
    virtual SnapshotReader* open() = 0;

    // 关闭快照阅读器
    virtual int close(SnapshotReader* reader) = 0;

    // 从uri复制快照并作为SnapshotReader打开
    virtual SnapshotReader* copy_from(const std::string& uri) WARN_UNUSED_RESULT = 0;
    virtual SnapshotCopier* start_to_copy_from(const std::string& uri) = 0;
    virtual int close(SnapshotCopier* copier) = 0;

    // 使用| uri |中编码的参数创建此类SnapshotStorage的实例。
    // 成功则返回引用到实例的指针,否则返回NULL。
    virtual SnapshotStorage* new_instance(const std::string& uri) const WARN_UNUSED_RESULT = 0;
    static SnapshotStorage* create(const std::string& uri);
    
    // 回收使用| uri |参数的 SnapshotStorage实例的资源
    virtual butil::Status gc_instance(const std::string& uri) const {
        CHECK(false) << butil::class_name_str(*this)
                     << " didn't implement gc_instance interface while deleting"
                        " raft snapshot in " << uri;
        butil::Status status;
        status.set_error(ENOSYS, "gc_instance interface is not implemented");
        return status;
    }

    static butil::Status destroy(const std::string& uri);
};

inline brpc::Extension<const LogStorage>* log_storage_extension() {
    return brpc::Extension<const LogStorage>::instance();
}

inline brpc::Extension<const RaftMetaStorage>* meta_storage_extension() {
    return brpc::Extension<const RaftMetaStorage>::instance();
}

inline brpc::Extension<const SnapshotStorage>* snapshot_storage_extension() {
    return brpc::Extension<const SnapshotStorage>::instance();
}

}

#endif //~BRAFT_RAFT_STORAGE_H

 

cli.h

#ifndef  BRAFT_CLI_H
#define  BRAFT_CLI_H

#include "braft/raft.h"

namespace braft {
namespace cli {

struct CliOptions {
    int timeout_ms;
    int max_retry;
    CliOptions() : timeout_ms(-1), max_retry(3) {}
};

// 将一个新的peer节点添加到由| conf |组成的复制组中。
// 成功返回OK,否则返回错误信息。
butil::Status add_peer(const GroupId& group_id, const Configuration& conf,
                       const PeerId& peer_id, const CliOptions& options);
 
// 将一个peer节点从由| conf |组成的复制组中移除。
// 成功返回OK,否则返回错误信息。
butil::Status remove_peer(const GroupId& group_id, const Configuration& conf,
                          const PeerId& peer_id, const CliOptions& options);
 
// 优雅地更改复制组的peers节点
butil::Status change_peers(const GroupId& group_id, const Configuration& conf, 
                           const Configuration& new_peers,
                           const CliOptions& options);
 
// 将该复制组的leader转移到目标peer节点
butil::Status transfer_leader(const GroupId& group_id, const Configuration& conf,
                              const PeerId& peer, const CliOptions& options);
 
// 将目标peer节点的peer set重置
butil::Status reset_peer(const GroupId& group_id, const PeerId& peer_id,
                         const Configuration& new_conf,
                         const CliOptions& options);
 
// 要求peer立即转储快照
butil::Status snapshot(const GroupId& group_id, const PeerId& peer_id,
                       const CliOptions& options);

}  // namespace cli
}  //  namespace braft

#endif  //BRAFT_CLI_H

 

route_table.h

#ifndef  BRAFT_ROUTE_TABLE_H
#define  BRAFT_ROUTE_TABLE_H

#include "braft/configuration.h"                 // Configuration
#include "braft/raft.h"

// Maintain routes to raft groups

namespace braft {
namespace rtb {

// 更新路由表中组的配置
int update_configuration(const GroupId& group, const Configuration& conf);
int update_configuration(const GroupId& group, const std::string& conf_str);
 
// 得到该组leader
// 返回:
//  0 : 成功
//  1 : 不确定leader是谁
//  -1: 其他情况
int select_leader(const GroupId& group, PeerId* leader);
 
// 更新leader
int update_leader(const GroupId& group, const PeerId& leader);
int update_leader(const GroupId& group, const std::string& leader_str);
 
// 阻塞线程,直到query_leader完成
butil::Status refresh_leader(const GroupId& group, int timeout_ms);
 
// 从路由表中删除该组
int remove_group(const GroupId& group);

}  // namespace rtb
}  // namespace braft 

#endif  //BRAFT_ROUTE_TABLE_H

 

 类似资料: