braft分为几个板块,我根据自己的理解进行了概述:
ballot:
ballot,ballot_box
选票类,raft的选举leader步骤
cli:
cli,cli_service
客户端,cli用来修改或者获取复制状态机组节点状态,cli_service继承cli
closure:
closure_helper,closure_queue
返回时的信息和报告内容
configuration:
configuration,configuration_manager
配置信息类,peerid类,nodeid类
configuration_manager用来记录和管理configuration条目历史变更
file:
file_reader:读取文件内容
file_service:为状态机增加或者移除一个file_reader
file_system_adaptor:文件系统适配
protobuf_file:
remote_file_copier:远程文件复制,存储使用快照
fsm_caller:
复制状态机,发生事件时的操作
fsync:
参数选择内存数据落到磁盘的方式,直接调用mingw函数
lease:
作为leader节点和follower节点的状态
log:
log:日志类,log定义和LogStorage定义
log_entry:每条日志条目
log_manager:用于管理日志条目
memory_log:日志条目索引相关操作
node:
node:节点类,每一个node是一个复制状态机的节点实例
node_manager:node节点管理
raft(核心):
raft:定义task,状态机操作,节点参数,状态机参数等
raft_meta:
raft_service
repeated_timer_task:
重复任务计时器
replicator:
leader,follower之间的信息复制
route_table:
一张通信表,存多个复制组,可获取每个复制组leader,更新组配置,等
snapshot:
snapshot:快照类,
snapshot_executor:快照执行操作
snapshot_throttle:大容量磁盘读/写
storage:
存储类,包含元数据存储,log存储,快照存储,稳定存储等
util:
放一些常用的公用方法
变量名 | 类型 | 说明 |
---|---|---|
GroupId | typedef std::string | raft组名 |
VersionedGroupId | typedef std::string | GroupId加个版本号 {group_id}_{index} |
=======struct PeerId:=======
butil::EndPoint addr;//就是ip+port
int idx;//idx算是下标号,默认0
除了构造函数外,
reset():初始化值为0
is_empty():都为0时认为是空的
parse():解析字符串并赋值
to_string():将值转成字符串,ip:port:idx
<,==,!=,<<操作符被重载
=======struct NodeId:=======
GroupId group_id;//存组名
PeerId peer_id;//存一个peerid
有一个两值都传的构造函数
to_string():将值转成字符串,group_id:ip:port:idx
<,==,!=,<<操作符被重载
=======class Configuration:=======
std::set<PeerId> _peers;//是一个peers集合
typedef std::set<PeerId>::const_iterator const_iterator;//一个循环集合的迭代器
两个构造函数,可以用vector和set来构造
重载了=符号(赋值)
void reset():清空集合
bool empty():是否为空
size_t size() :返回大小
const_iterator begin() / end():返回迭代器头部 / 迭代器尾部
void list_peers(std::set<PeerId>* peers) / list_peers(std::vector<PeerId>* peers):使用参数替换掉当前集合
void append_peers(std::set<PeerId>* peers):将一系列集合插入到当前集合
bool add_peer(const PeerId& peer):插入一个新节点并判断是否插入成功(是否为新peer)
bool remove_peer(const PeerId& peer):移除节点,并判断是否移除
bool contains(const PeerId& peer_id) /contains(const std::vector<PeerId>& peers):判断是否存在这些peer
bool equals(const std::vector<PeerId>& peers) / equals(const Configuration& rhs): 判断是否和参数匹配
void diffs(const Configuration& rhs,
Configuration* included,
Configuration* excluded):得到当前值和rhs的不同,included为this-rhs,excluded为rhs-this
int parse_from(butil::StringPiece conf):从字符串解析并赋值(peerid之间用,隔开)
重载了<<输出符号,输出peerid,用逗号隔开
#ifndef BRAFT_RAFT_H
#define BRAFT_RAFT_H
//一堆头文件
#include <string>
#include <butil/logging.h>
#include <butil/iobuf.h>
#include <butil/status.h>
#include <brpc/callback.h>
#include "braft/configuration.h"
#include "braft/enum.pb.h"
#include "braft/errno.pb.h"
template <typename T> class scoped_refptr;//一个模板类scoped_refptr
namespace brpc {
class Server;
} // namespace brpc
namespace braft {
//给出这些类的声明,定义在其他c文件中
class SnapshotWriter;//快照写
class SnapshotReader;//快照读
class SnapshotHook;//
class LeaderChangeContext;//leader变更信息
class FileSystemAdaptor;//文件适配器
class SnapshotThrottle;//快照大磁盘
class LogStorage;//log存储
//给出一个默认值,会用来做初始化
const PeerId ANY_PEER(butil::EndPoint(butil::IP_ANY, 0), 0);
// 特定于raft的closure(返回报告),包含一个butil :: Status以报告操作是否成功。
class Closure : public google::protobuf::Closure {
public://两个获取status值的函数
butil::Status& status() { return _st; }
const butil::Status& status() const { return _st; }
private:
butil::Status _st;//status
};
// 描述一个特定的错误
class Error {
public:
//构造函数
Error() : _type(ERROR_TYPE_NONE) {}
Error(const Error& e) : _type(e._type), _st(e._st) {}
//返回值
ErrorType type() const { return _type; }
const butil::Status& status() const { return _st; }
butil::Status& status() { return _st; }
//设置error类型
void set_type(ErrorType type) { _type = type; }
//重载赋值号
Error& operator=(const Error& rhs) {
_type = rhs._type;
_st = rhs._st;
return *this;
}
private:
// 需要赋值
ErrorType _type;
butil::Status _st;
};
//将error类型转为字符串
inline const char* errortype2str(ErrorType t) {
switch (t) {
case ERROR_TYPE_NONE:
return "None";
case ERROR_TYPE_LOG:
return "LogError";
case ERROR_TYPE_STABLE:
return "StableError";
case ERROR_TYPE_SNAPSHOT:
return "SnapshotError";
case ERROR_TYPE_STATE_MACHINE:
return "StateMachineError";
}
return "Unknown";
}
//重载输出符号
inline std::ostream& operator<<(std::ostream& os, const Error& e) {
os << "{type=" << errortype2str(e.type())
<< ", error_code=" << e.status().error_code()
<< ", error_text=`" << e.status().error_cstr()
<< "'}";
return os;
}
// libraft的基本消息结构
struct Task {//任务
Task() : data(NULL), done(NULL), expected_term(-1) {}//构造函数
// 数据应用于状态机
butil::IOBuf* data;
// 当数据应用于状态机或发生错误时继续。
Closure* done;
// 如果expected_term与当前值不匹配,则拒绝此任务
// 此节点,如果值不为-1
// 默认值:-1
int64_t expected_term;
};
class IteratorImpl;//类声明
// Iterator over a batch of committed tasks
//
// Example:
// void YouStateMachine::on_apply(braft::Iterator& iter) {
// for (; iter.valid(); iter.next()) {
// brpc::ClosureGuard done_guard(iter.done());
// process(iter.data());
// }
// }
class Iterator {
DISALLOW_COPY_AND_ASSIGN(Iterator);//关闭隐式调用构造函数
public:
// 移至下一个任务。
void next();
// 返回当前的唯一且单调递增的索引
// 任务:
// - 唯一性保证了,在不同的peers中具有相同索引的已提交的任务始终相同且保持不变
// - 单调性确保在小组中的所有peers中,对于任何索引对i,j(i < j),
// 索引为| i |的任务必须在索引| j |的任务之前应用
int64_t index() const;
// 返回任务应用时的leader任期
int64_t term() const;
// 返回与在leader节点中传递给Node::apply的内容相同的数据
const butil::IOBuf& data() const;
// 如果done()为非空,无论此操作成功或失败,您都必须在应用此任务后
// 调用done()->Run(),否则将泄漏相应的资源。
//
// 如果此节点在担任该组的负责人时提出此任务,并且领导位置在此之前没有改变,
// 那么done()正是传递给Node::apply的内容,
// 在使用给定任务更新StateMachine之后,done()可能代表某种延续(例如响应客户端)。
// 否则done()必须为NULL
Closure* done() const;
// 如果此迭代器当前引用的是有效任务,返回true,
// 否则返回false,说明迭代器到达了这批任务的结尾,或者说明发生了些错误
bool valid() const;
// 发生某些严重错误时调用。
// 并且我们认为最后的|ntail|任务(从上一个迭代的任务开始)没有被应用
//
// 如果| st | 不是NULL,它将描述错误的详细信息。
void set_error_and_rollback(size_t ntail = 1, const butil::Status* st = NULL);
private:
friend class FSMCaller;//友元类声明
Iterator(IteratorImpl* impl) : _impl(impl) {}//构造函数
~Iterator() {};//析构函数
// _impl的所有权属于FSMCaller;
IteratorImpl* _impl;
};
// | StateMachine | 是一个raft节点的所有事件的接收器。
// 为您自己的业务逻辑实现特定的StateMachine。
// 注意:不需要所有接口都是线程安全的,它们是按顺序调用的,就是说每个方法都会阻塞之后的所有方法。
class StateMachine {
public:
virtual ~StateMachine();//析构函数,需要继承并实现
// 使用一批可以通过| iterator |访问的任务,来更新StateMachine。
//
// 当传递给Node :: apply的一个或多个任务已提交到raft组时(法定个数已收到这些任务,并将其存储在后备存储中)调用。
//
// 一旦该函数返回给调用者,我们认为所有通过| iter |处理迭代的任务已成功应用。
// 而且,如果没有应用所有给定的任务,我们会将其视为严重错误,并报告错误类型为ERROR_TYPE_STATE_MACHINE
virtual void on_apply(::braft::Iterator& iter) = 0;//需要继承实现
// 当raft节点崩溃关闭时调用一次。
// 不实现默认啥都不做
virtual void on_shutdown();
// 用户定义的快照生成功能,此方法将阻塞on_apply。
// 当fsm可以是cow(写时复制)时,用户可以使快照异步。
// 快照完成后,调用done-> Run()。
// 成功返回0,失败返回error.
virtual void on_snapshot_save(::braft::SnapshotWriter* writer,
::braft::Closure* done);
// 用户定义的快照加载功能
// 获取并加载快照
// 成功返回0,失败返回errno
// 默认值:不加载任何内容并返回错误。
virtual int on_snapshot_load(::braft::SnapshotReader* reader);
// 当所属节点在| term |成为组的leader时调用
// 默认值:不执行任何操作
virtual void on_leader_start(int64_t term);
// 当所属节点不是组的leader时调用
// |status| 描述详细信息
virtual void on_leader_stop(const butil::Status& status);
// 当遇到严重错误时,将调用on_error,在这之后,在错误修复并重新启动该节点之前,不允许对该节点进行任何进一步的修改。
virtual void on_error(const ::braft::Error& e);
// 将配置提交给组后调用
virtual void on_configuration_committed(const ::braft::Configuration& conf);
virtual void on_configuration_committed(const ::braft::Configuration& conf, int64_t index);
// 当跟随者停止跟随某个领导者并且其leader_id变为NULL时,将调用此方法,
// 情况包括:
// 1.处理选举超时并开始投票
// 2.接收到候选者的更高term的请求,例如vote_request或来自新leader的append_entries_request
// 3.从当前领导者接收timeout_now_request并开始request_vote
// 参数stop_following_context提供有关跟随者之前跟随过的领导者的信息(leader_id, term and status)
// 用户可以在节点停止跟随某个领导者时重置节点的信息。
virtual void on_stop_following(const ::braft::LeaderChangeContext& ctx);
// 当跟随者或候选人开始跟随领导者时,将调用此方法
// 其leader_id(在调用方法之前应为NULL)设置为领导者的id,
// 情况包括:
// 1.候选人收到领导的append_entries
// 2.跟随者(无领导者)从领导者那里获得append_entries
// 参数start_following_context提供有关跟随者开始跟随的领导者的信息(leader_id, term and status)
// 用户可以在开始跟随某个领导者时重置节点的信息。
virtual void on_start_following(const ::braft::LeaderChangeContext& ctx);
};
//节点的状态
enum State {
// 如果您不确定用法,请不要更改顺序。
STATE_LEADER = 1,
STATE_TRANSFERRING = 2,
STATE_CANDIDATE = 3,
STATE_FOLLOWER = 4,
STATE_ERROR = 5,
STATE_UNINITIALIZED = 6,
STATE_SHUTTING = 7,
STATE_SHUTDOWN = 8,
STATE_END,
};
//状态转成字符串
inline const char* state2str(State state) {
const char* str[] = {"LEADER", "TRANSFERRING", "CANDIDATE", "FOLLOWER",
"ERROR", "UNINITIALIZED", "SHUTTING", "SHUTDOWN", };
if (state < STATE_END) {
return str[(int)state - 1];
} else {
return "UNKNOWN";
}
}
// 如果| s | 指出节点处于活动状态,返回true
inline bool is_active_state(State s) {
// This should be as fast as possible
return s < STATE_ERROR;
}
// 此类封装了on_start_following和on_stop_following接口的参数。
class LeaderChangeContext {
DISALLOW_COPY_AND_ASSIGN(LeaderChangeContext);//关闭隐式构造函数
public:
// 构造函数
LeaderChangeContext(const PeerId& leader_id, int64_t term, const butil::Status& status)
: _leader_id(leader_id)
, _term(term)
, _st(status)
{};
// 对于on_start_following,leader_id和term是新领导者的;
// 对于on_stop_following,leader_id和term是旧领导者的。
const PeerId& leader_id() const { return _leader_id; }
int64_t term() const { return _term; }
// 返回有关为什么调用on_start_following或on_stop_following的信息。
const butil::Status& status() const { return _st; }
private:
PeerId _leader_id;
int64_t _term;
butil::Status _st;
};
//重载输出符号
inline std::ostream& operator<<(std::ostream& os, const LeaderChangeContext& ctx) {
os << "{ leader_id=" << ctx.leader_id()
<< ", term=" << ctx.term()
<< ", status=" << ctx.status()
<< "}";
return os;
}
class UserLog {
DISALLOW_COPY_AND_ASSIGN(UserLog);//关闭隐式构造
public:
// 构造函数
UserLog() {};
UserLog(int64_t log_index, const butil::IOBuf& log_data)
: _index(log_index)
, _data(log_data)
{};
// 返回日志索引号
int64_t log_index() const { return _index; }
// 返回日志内容
const butil::IOBuf& log_data() const { return _data; }
// 设置日志内容和索引
void set_log_index(const int64_t log_index) { _index = log_index; }
void set_log_data(const butil::IOBuf& log_data) { _data = log_data; }
// 清空
void reset() {
_index = 0;
_data.clear();
}
private:
int64_t _index;
butil::IOBuf _data;
};
// 重载输出符号
inline std::ostream& operator<<(std::ostream& os, const UserLog& user_log) {
os << "{user_log: index=" << user_log.log_index()
<< ", data size=" << user_log.log_data().size()
<< "}";
return os;
}
// peer的状态
struct PeerStatus {
//构造函数
PeerStatus()
: valid(false), installing_snapshot(false), next_index(0)
, last_rpc_send_timestamp(0), flying_append_entries_size(0)
, readonly_index(0), consecutive_error_times(0)
{}
bool valid;
bool installing_snapshot;
int64_t next_index;
int64_t last_rpc_send_timestamp;
int64_t flying_append_entries_size;
int64_t readonly_index;
int consecutive_error_times;
};
// 节点状态
struct NodeStatus {
typedef std::map<PeerId, PeerStatus> PeerStatusMap;
NodeStatus()
: state(STATE_END), readonly(false), term(0), committed_index(0), known_applied_index(0)
, pending_index(0), pending_queue_size(0), applying_index(0), first_index(0)
, last_index(-1), disk_index(0)
{}
State state;
PeerId peer_id;//节点的id
PeerId leader_id;//;leader的id
bool readonly;//是否只读
int64_t term;
// 两个索引,一个已提交索引,一个已应用索引
int64_t committed_index;
int64_t known_applied_index;
// 等待提交的日志的起始索引。
// 如果值为0,则表示没有待处理的日志。
//
// 警告:如果该值不为0,并且长时间保持该值不变,则表示发生了某些事情导致节点很有可能无法提交日志,因此用户应仔细检查以找出原因。
int64_t pending_index;
// 有多少日志等待提交。
//
// 警告:待处理的日志过多,意味着处理速度无法满足写入速度。 用户可以考虑降低写入速度,以避免资源耗尽。
int64_t pending_queue_size;
// 当前的应用索引。 如果值为0,则表示没有应用日志。
//
// 警告:如果该值不为0,并且长时间保持不变,则表示应用线程已挂起,用户应检查是否发生了死锁,或是否正在处理一些耗时的操作。
int64_t applying_index;
// 节点的第一个日志,包括内存和磁盘中的日志。
int64_t first_index;
// 节点的最后一个日志,包括内存和磁盘中的日志。
int64_t last_index;
// 磁盘中的最后一个日志。
int64_t disk_index;
// 稳定的跟随者是当前配置中的peers
// 如果该节点不是领导者,则此映射为空。
PeerStatusMap stable_followers;
// 不稳定的跟随者是不在当前配置中的peers
// 举例来说,如果添加了一个新的peer,但现在还未赶上leader,它在此映射中。
PeerStatusMap unstable_followers;
};
// 租赁状态。 以下是典型的租约状态更改图:
// =========================================================================
// event: become leader become follower
// ^ on leader start ^ on leader stop
// | ^ | ^
// time: ----------|-----------|-----------------|---|-------
// lease state: EXPIRED | NOT_READY | VALID | EXPIRED
// =========================================================================
enum LeaseState {
// Lease is disabled,只有在以下情况下才会返回此状态
// |raft_enable_leader_lease == false|.
LEASE_DISABLED = 1,
// Lease is expired, 此节点不再是领导者。
LEASE_EXPIRED = 2,
// 该节点是领导者,但我们不确定数据是最新的。
// 此状态一直持续到| on_leader_start |。 或领导下台。
LEASE_NOT_READY = 3,
// Lease is valid.
LEASE_VALID = 4,
};
// Status of a leader lease.
struct LeaderLeaseStatus {
// 构造函数
LeaderLeaseStatus()
: state(LEASE_DISABLED), term(0), lease_epoch(0)
{}
LeaseState state;
// 这些字段仅在| state == LEASE_VALID |时才有意义。
// The term of this lease
int64_t term;
// 当转移leader超时时,一个特定的TERM可能具有多个租赁。 在节点的生命周期中,保证租赁时期将单调增加。
int64_t lease_epoch;
};
struct NodeOptions {
// 如果在| election_timeout_ms |毫秒的时间里未收到来自领导者的任何消息,则追随者将成为候选人。
// 默认为: 1000 (1s)
int election_timeout_ms; //follower to candidate timeout
// 最大时钟浮动时间,它将用于维护领导者租赁的安全。
// 默认为: 1000 (1s)
int max_clock_drift_ms;
// 如果将其置为正数,每|snapshot_interval_s| 秒会触发一次快照保存。
// 如果 |snapshot_interval_s| <= 0, 基于时间保存快照的功能将被禁用。
// 默认为: 3600 (1 hour)
int snapshot_interval_s;
// 我们认为一个新增的peer节点已经追赶上当前进度,假如peer节点的last_log_index和
// leader节点之间的差值小于 |catchup_margin|
// 默认为: 1000
int catchup_margin;
// 如果节点从一个新环境启动(LogStorage和SnapshotStorage均为空)
// 那么它会使用| initial_conf | 作为组的配置,否则它将从现有环境中加载配置。
// 默认: A empty group
Configuration initial_conf;
// 在pthread而非bthread中运行user callbacks and user closures
//
// 默认: false
bool usercode_in_pthread;
// 特定的StateMachine实现了您的业务逻辑,该业务逻辑必须是有效实例。
StateMachine* fsm;
// 如果|node_owns_fsm|为true,那么当不再引用节点时,|fsm|将被销毁
// 默认: false
bool node_owns_fsm;
// 在业务层实现的特定LogStorage,应为有效实例,否则默认使用SegmentLogStorage。
//
// 默认: null
LogStorage* log_storage;
// 如果| node_owns_log_storage | 是true。 | log_storage | 当不再引用后备节点时,它将被销毁。
//
// 默认: true
bool node_owns_log_storage;
// 以${类型}://${参数}格式描述特定的LogStorage
// 当 |log_storage| 是 null 时有效
std::string log_uri;
// 以${类型}://${参数}格式描述特定的RaftMetaStorage
// 到目前为止提供了三种类型:
// 1. type=local
// FileBasedSingleMetaStorage(旧名是LocalRaftMetaStorage) 将被使用
// 它基于protobuf文件,并且仅管理一个节点的稳定meta
// 典型格式: local://${node_path}
// 2. type=local-merged
// KVBasedMergedMetaStorage 将被使用, 其底层基于KV存储,并在同一磁盘上管理一批节点。
// 当Multi-raft下拥有大量节点时,该解决方案旨在解决在领导者选举期间由非常多小的同步IO引起的性能问题。
// 典型格式: local-merged://${disk_path}
// 3. type=local-mixed
// MixedMetaStorage 将被使用, 当升级或降级后,这将使上述两种类型的元存储重复写入。
// 典型格式:
// local-mixed://merged_path=${disk_path}&&single_path=${node_path}
//
// 升级和降级步骤:
// upgrade from Single to Merged: local -> mixed -> merged
// downgrade from Merged to Single: merged -> mixed -> local
std::string raft_meta_uri;
// 以${类型}://${参数}格式描述特定的SnapshotStorage
std::string snapshot_uri;
// 如果启用,在从远程复制快照之前,将过滤掉重复的文件,以避免无用的传输。
// 仅当本地文件和远程文件中的两个文件具有相同的文件名
// 和相同的checksum(存储在文件meta中)时,才会开始复制
// 默认: false
bool filter_before_copy_remote;
// 如果 non-null, 我们将把这个snapshot_file_system_adaptor传递给SnapshotStorage
// 默认: NULL
scoped_refptr<FileSystemAdaptor>* snapshot_file_system_adaptor;
// 如果 non-null, 我们将把此snapshot_throttle传递给SnapshotExecutor
// 默认: NULL
scoped_refptr<SnapshotThrottle>* snapshot_throttle;
// 如果为true,将拒绝通过raft_cli进行的RPC。
// 默认:false
bool disable_cli;
// 构造一个默认实例
NodeOptions();
};
//构造函数的默认值
inline NodeOptions::NodeOptions()
: election_timeout_ms(1000)
, max_clock_drift_ms(1000)
, snapshot_interval_s(3600)
, catchup_margin(1000)
, usercode_in_pthread(false)
, fsm(NULL)
, node_owns_fsm(false)
, log_storage(NULL)
, node_owns_log_storage(true)
, filter_before_copy_remote(false)
, snapshot_file_system_adaptor(NULL)
, snapshot_throttle(NULL)
, disable_cli(false)
{}
class NodeImpl;//类声明
//节 点 类
class Node {
public:
//构造函数和析构函数
Node(const GroupId& group_id, const PeerId& peer_id);
virtual ~Node();
// get node id
NodeId node_id();
// get leader PeerId, 重定向
PeerId leader_id();
// 如果这是所属组的leader,则返回true
bool is_leader();
// 如果这是领导者且领导者租约有效,则返回true。
// | raft_enable_leader_lease == false |时,它始终为false。
// 在以下情况下,返回的true是不可信的:
// - 并非所有在raft组中的节点将 |raft_enable_leader_lease| 设置为 true,
// 正在转移leader或者正在投票;
// - 在raft组中,一个节点中的| election_timeout_ms |比另外一和节点中的|election_timeout_ms + max_clock_drift_ms| 大
bool is_leader_lease_valid();
// 获取领导者租赁状态以进行更复杂的检查
void get_leader_lease_status(LeaderLeaseStatus* status);
// 使用节点设置来初始化node
int init(const NodeOptions& options);
// 关闭本地副本。
// done 是用户定义的功能, 可能响应客户端或清理一些资源
// [注意] 应用后的代码无法完成访问资源
void shutdown(Closure* done);
// 阻塞线程,直到节点成功停止。
void join();
// [Thread-safe and wait-free]
// 将任务应用于复制状态机
//
// 关于所有权:
// |task.data|: 出于性能考虑,我们将删除内容。如果要保留内容,请在调用此函数之前将其复制
// |task.done|: 如果data已成功提交到raft组。我们将所有权传递给StateMachine::on_apply.
// 否则,我们将指定错误并调用它。
void apply(const Task& task);
// 列出raft组中的peer,只有leader返回ok
// [注意]当list_peers与add_peer / remove_peer并发时,返回可能会被阻塞
// 因为add_peer / remove_peer会立即修改内存中的配置
butil::Status list_peers(std::vector<PeerId>* peers);
// 在raft组中添加一个新的peer节点
// 此操作完成后,将调用done-> Run(),描述结果详细信息。
void add_peer(const PeerId& peer, Closure* done);
// 从raft组中移除一个peer节点
// 此操作完成后,将调用done-> Run(),描述结果详细信息。
void remove_peer(const PeerId& peer, Closure* done);
// 将raft组的配置优雅地更改为|new_peers|
// 此操作完成后,将调用done-> Run(),描述结果详细信息。
void change_peers(const Configuration& new_peers, Closure* done);
// 单独重置此节点的配置,在此节点成为领导者之前,无需和其他peers节点有任何复制
// 当大多数复制组都已失效,并且您希望在可用性方面恢复服务时,该功能应该被调用
//
// 请注意,在这种情况下,既不能保证一致性也不可以保证共识,在处理此方法时**请务必小心**。
butil::Status reset_peers(const Configuration& new_peers);
// 如果可能,立即启动快照。
// 快照完成时将调用done-> Run(),描述详细结果。
void snapshot(Closure* done);
// 用户触发投票
// 重设eletection_timeout,这样某些节点更有可能成为leader
butil::Status vote(int election_timeout);
// 对于该节点重置| election_timeout_ms | ,| max_clock_drift_ms | 也已调整
// 保持| election_timeout_ms |的总和 和| max_clock_drift_ms | 不变。
butil::Status reset_election_timeout_ms(int election_timeout_ms);
// 强制重置| election_timeout_ms | 和| max_clock_drift_ms |。 它可能会破坏领导者的租约安全性,应小心。
// 以下是安全更改 |election_timeout_ms| 的建议
// 1. 三步来升级|election_timeout_ms| 到更大数值
// - 在所有同级节点中放大| max_clock_drift_ms | 确保
|old election_timeout_ms + new max_clock_drift_ms|比
|new election_timeout_ms + old max_clock_drift_ms|更大.
// - 至少等待 |old election_timeout_ms + new max_clock_drift_ms| 的时间以确保以前
所有选举都完成。
// - 升级| election_timeout_ms | 到新的值,同时| max_clock_drift_ms | 可以重新设置
为旧值。
// 2. 三步来升级 |election_timeout_ms| 到小数值:
// - 同时调整| election_timeout_ms | 和| max_clock_drift_ms |,使
| election_timeout_ms + max_clock_drift_ms |的总和不变。
// - 至少等待 |election_timeout_ms + max_clock_drift_ms| 的时间以确保以前的
所有选举都完成。
// - 升级| max_clock_drift_ms | 回到旧值
void reset_election_timeout_ms(int election_timeout_ms, int max_clock_drift_ms);
// 尝试将leader位置转移给| peer |。
// 如果peer是ANY_PEER,下个term将选出合适的follower作为leader
// 成功返回0,否则返回-1。
int transfer_leadership_to(const PeerId& peer);
// 从给定索引中读取第一个提交的用户日志。
// 成功返回OK,并为user_log分配数据。
// 请注意,user_log可能不是给定索引处的确切日志,而是从给定索引到last_committed_index的第一个可用用户日志。
// 否则,将返回适当的错误:
// - 当日志被删除后返回ELOGDELETED;
// - 当我们即使到达了last_committed_index都无法获得用户日志时,返回ENOMOREUSERLOG。
// [注意]考虑到安全性,在代码实现中我们使用last_applied_index代替last_committed_index
butil::Status read_committed_user_log(const int64_t index, UserLog* user_log);
// 获取此节点的内部状态,信息与我们在网站上看到的信息基本相同。
void get_status(NodeStatus* status);
// 使该节点进入只读模式。
// 只读模式仅应在某些极端情况下用于保护系统。
// 例如,在存储系统中,太多的写请求涌入系统
// 不幸的,系统处于耗尽容量的危险中,没有足够的时间添加新计算机并等待容量平衡。一旦许多磁盘已满,raft组就会发生仲裁死亡。此示例中的一种选择是只读模式,该模式允许领导者拒绝新的写请求,但仍处理读请求和配置更改。
// 如果跟随节点变为只读,则leader停止将新日志复制到该节点。 在leader仍然可写的情况下,这可能导致数据远远落后于leader。 跟随节点退出只读模式后,leader将继续复制丢失的日志。
void enter_readonly_mode();
// 使该节点离开只读模式。
void leave_readonly_mode();
// 检查此节点是否为只读。
// 有两种情况,如果节点是只读的:
// - 通过调用enter_readonly_mode()将该节点标记为只读
// - 此节点是leader,并且组中可写节点的数量少于大多数。
bool readonly();
private:
NodeImpl* _impl;
};
//引导程序选项
struct BootstrapOptions {
// 包含此raft组的初始成员
// 默认: empty conf
Configuration group_conf;
// 转储快照中包含的最后一个索引
// 默认: 0
int64_t last_log_index;
// 将要转储第一个快照的特定StateMachine
// 如果last_log_index不为0,则fsm必须是有效实例。
// 默认: NULL
StateMachine* fsm;
// 如果| node_owns_fsm | 是true。 当不再引用后备节点时,| fsm | 将被销毁。
// 默认: false
bool node_owns_fsm;
// 在pthread而非bthread中运行user callbacks and user closures
// 默认: false
bool usercode_in_pthread;
// 以${类型}://${参数}格式描述特定的LogStorage
std::string log_uri;
// // 以${类型}://${参数}格式描述特定的RaftMetaStorage
std::string raft_meta_uri;
// 以${类型}://${参数}格式描述特定的SnapshotStorage
std::string snapshot_uri;
// 构造默认选项
BootstrapOptions();
};
// 引导一个非空的raft节点
int bootstrap(const BootstrapOptions& options);
// 将raft服务附加到| server |(自己的服务端),这使raft服务与用户服务共享相同的侦听地址。
// 注意:目前,我们仅允许使用特定的侦听地址启动备用服务器,如果要从一个范围端口启动该服务器,则行为是不确定的。
// 成功返回0,否则返回-1。
int add_service(brpc::Server* server, const butil::EndPoint& listen_addr);
int add_service(brpc::Server* server, int port);
int add_service(brpc::Server* server, const char* listen_ip_and_port);
// 回收
struct GCOptions {
// raft实例的Versioned-groupid。
// 版本是很重要的, 因为具有相同groupid的实例在销毁后可能很快会再次创建。
VersionedGroupId vgid;
std::string log_uri;
std::string raft_meta_uri;
std::string snapshot_uri;
};
// 如果磁盘被删除但是并没有从global_mss_manager释放,之后再次添加该怎么办? 看起来没问题,因为该磁盘上的所有实例在删除磁盘本身之前都会被销毁,因此不会有垃圾。
//
// 由于某种原因破坏实例时,对raft实例的数据进行回收。
//
// 成功返回0,否则返回-1
int gc_raft_data(const GCOptions& gc_options);
} // namespace braft
#endif //BRAFT_RAFT_H
#ifndef BRAFT_RAFT_STORAGE_H
#define BRAFT_RAFT_STORAGE_H
// 一堆头文件
#include <string>
#include <vector>
#include <gflags/gflags.h>
#include <butil/status.h>
#include <butil/class_name.h>
#include <brpc/extension.h>
#include <butil/strings/string_piece.h>
#include "braft/configuration.h"
#include "braft/configuration_manager.h"
namespace google {
namespace protobuf {
class Message;
} // namespace protobuf
} // namespace google
namespace braft {
// 使用的是谷歌的gflags,大致就是可以使用这三个参数做调参
DECLARE_bool(raft_sync);
DECLARE_bool(raft_sync_meta);
DECLARE_bool(raft_create_parent_directories);
struct LogEntry;//声明,log条目
// 个人感觉像是统计运行时长用的
struct IOMetric {
public:
IOMetric()
: start_time_us(butil::cpuwide_time_us())
, bthread_queue_time_us(0)
, open_segment_time_us(0)
, append_entry_time_us(0)
, sync_segment_time_us(0) {}
int64_t start_time_us;
int64_t bthread_queue_time_us;
int64_t open_segment_time_us;
int64_t append_entry_time_us;
int64_t sync_segment_time_us;
};
// 重载输出符号
inline std::ostream& operator<<(std::ostream& os, const IOMetric& m) {
return os << " bthread_queue_time_us: " << m.bthread_queue_time_us
<< " open_segment_time_us: " << m.open_segment_time_us
<< " append_entry_time_us: " << m.append_entry_time_us
<< " sync_segment_time_us: " << m.sync_segment_time_us;
}
//解析uri
inline butil::StringPiece parse_uri(butil::StringPiece* uri, std::string* parameter) {
// ${protocol}://${parameters}
size_t pos = uri->find("://");
if (pos == butil::StringPiece::npos) {
return butil::StringPiece();
}
butil::StringPiece protocol = uri->substr(0, pos);
uri->remove_prefix(pos + 3/* length of '://' */);
protocol.trim_spaces();
parameter->reserve(uri->size());
parameter->clear();
size_t removed_spaces = 0;
for (butil::StringPiece::const_iterator
iter = uri->begin(); iter != uri->end(); ++iter) {
if (!isspace(*iter)) {
parameter->push_back(*iter);
} else {
++removed_spaces;
}
}
LOG_IF(WARNING, removed_spaces) << "Removed " << removed_spaces
<< " spaces from `" << *uri << '\'';
return protocol;
}
// 回收资源
inline int gc_dir(const std::string& path) {
butil::File::Error e;
butil::FilePath target_path(path);
butil::FilePath tmp_path(path + ".tmp");
// delete tmp path firstly in case there is garbage
if (!butil::DeleteFile(tmp_path, true)) {
LOG(ERROR) << "Fail to delete tmp file, path: " << tmp_path.value();
return -1;
}
if (butil::PathExists(target_path)) {
const bool rc = butil::ReplaceFile(butil::FilePath(target_path),
butil::FilePath(tmp_path), &e);
if (!rc) {
LOG(ERROR) << "Fail to rename `" << target_path.value()
<< " to `" << tmp_path.value() << "' : " << e;
return -1;
}
if (!butil::DeleteFile(tmp_path, true)) {
LOG(ERROR) << "Fail to delete tmp file, path: " << tmp_path.value();
return -1;
}
} else {
LOG(INFO) << "Target path not exist, so no need to gc, path: "
<< target_path.value();
}
return 0;
}
// log存储结构
class LogStorage {
public:
virtual ~LogStorage() {}
// 初始化,检查一致性和完整性,纯虚函数,由子类实现
virtual int init(ConfigurationManager* configuration_manager) = 0;
// 日志中的第一个日志索引
virtual int64_t first_log_index() = 0;
// 日志中的最后一个日志索引
virtual int64_t last_log_index() = 0;
// 通过索引号获取日志条目
virtual LogEntry* get_entry(const int64_t index) = 0;
// 通过索引号获取日志条目的所属term
virtual int64_t get_term(const int64_t index) = 0;
// 将条目追加到日志
virtual int append_entry(const LogEntry* entry) = 0;
// 追加条目以记录和更新IOMetric,返回追加成功编号
virtual int append_entries(const std::vector<LogEntry*>& entries, IOMetric* metric) = 0;
// 从存储的头部删除日志,[first_log_index,first_index_kept)将被丢弃
virtual int truncate_prefix(const int64_t first_index_kept) = 0;
// 从存储的尾部删除未提交的日志,(last_index_kept,last_log_index]将被丢弃
virtual int truncate_suffix(const int64_t last_index_kept) = 0;
// 删除所有现有日志,然后将下一个日志索引重置为| next_log_index |。
// 在安装从leader节点传来的快照后调用此函数
virtual int reset(const int64_t next_log_index) = 0;
// 使用| uri |中编码的参数创建此类LogStorage的实例。
// 成功则返回引用到实例的指针,否则返回NULL。
virtual LogStorage* new_instance(const std::string& uri) const = 0;
static LogStorage* create(const std::string& uri);
// 回收使用| uri |参数的实例的资源
virtual butil::Status gc_instance(const std::string& uri) const {
CHECK(false) << butil::class_name_str(*this)
<< " didn't implement gc_instance interface while deleting"
" raft log in " << uri;
butil::Status status;
status.set_error(ENOSYS, "gc_instance interface is not implemented");
return status;
}
static butil::Status destroy(const std::string& uri);
};
// 第二大块存储结构,用来存放一些RAFT算法自身的状态数据, 比如term, vote_for等信息.
class RaftMetaStorage {
public:
virtual ~RaftMetaStorage() {}//构造函数
// 初始化
virtual butil::Status init() = 0;
// 设置term 和votedfor信息
virtual butil::Status set_term_and_votedfor(const int64_t term,
const PeerId& peer_id, const VersionedGroupId& group) = 0;
// 得到 term 和votedfor信息
virtual butil::Status get_term_and_votedfor(int64_t* term, PeerId* peer_id,
const VersionedGroupId& group) = 0;
// 使用| uri |中编码的参数创建此类RaftMetaStorage的实例。
// 成功则返回引用到实例的指针,否则返回NULL。
virtual RaftMetaStorage* new_instance(const std::string& uri) const = 0;
static RaftMetaStorage* create(const std::string& uri);
// 回收使用| uri |参数的实例的资源
virtual butil::Status gc_instance(const std::string& uri,
const VersionedGroupId& vgid) const {
CHECK(false) << butil::class_name_str(*this)
<< " didn't implement gc_instance interface while deleting"
" raft stable meta in " << uri;
butil::Status status;
status.set_error(ENOSYS, "gc_instance interface is not implemented");
return status;
}
static butil::Status destroy(const std::string& uri,
const VersionedGroupId& vgid);
};
// 快照
class Snapshot : public butil::Status {
public:
Snapshot() {}//
virtual ~Snapshot() {}
// 获取快照的路径
virtual std::string get_path() = 0;
// 列出当前快照中的所有现有文件
virtual void list_files(std::vector<std::string> *files) = 0;
// 获取实现定义的file_meta(implementation-defined file_meta)
virtual int get_file_meta(const std::string& filename,
::google::protobuf::Message* file_meta) {
(void)filename;
file_meta->Clear();
return 0;
}
};
// 快照写
class SnapshotWriter : public Snapshot {
public:
SnapshotWriter() {}
virtual ~SnapshotWriter() {}
// 保存被raft框架使用的快照的元信息。
virtual int save_meta(const SnapshotMeta& meta) = 0;
// 将文件添加到快照。
// |file_meta| 是一个实现定义的原型消息 (implmentation-defined protobuf message, 不会翻译)
// 所有实现都必须处理|file_meta| 为null的情况,确保不会引发错误
// 请注意,是否将文件创建到后备存储器是由实现定义(implmentation-defined)的。
virtual int add_file(const std::string& filename) {
return add_file(filename, NULL);
}
virtual int add_file(const std::string& filename,
const ::google::protobuf::Message* file_meta) = 0;
// 从快照中删除文件
// 请注意,是否从后备存储器中删除文件是由实现定义(implmentation-defined)的。
virtual int remove_file(const std::string& filename) = 0;
};
// 快照读
class SnapshotReader : public Snapshot {
public:
SnapshotReader() {}
virtual ~SnapshotReader() {}
// Load meta from
virtual int load_meta(SnapshotMeta* meta) = 0;
// 为其他peers生成uri以复制此快照。
// 如果发生某些错误,则返回一个空字符串
virtual std::string generate_uri_for_copy() = 0;
};
// 从给定资源复制快照
class SnapshotCopier : public butil::Status {
public:
virtual ~SnapshotCopier() {}
// 取消复制作业
virtual void cancel() = 0;
// 阻塞线程,直到此复制作业完成或发生某些错误。
virtual void join() = 0;
// 获取代表复制的快照的SnapshotReader
virtual SnapshotReader* get_reader() = 0;
};
// 类声明
class SnapshotHook;
class FileSystemAdaptor;
class SnapshotThrottle;
// 快照存储
class SnapshotStorage {
public:
virtual ~SnapshotStorage() {}
// 以下三种都有参数设定
virtual int set_filter_before_copy_remote() {
CHECK(false) << butil::class_name_str(*this)
<< " doesn't support filter before copy remote";
return -1;
}
virtual int set_file_system_adaptor(FileSystemAdaptor* fs) {
(void)fs;
CHECK(false) << butil::class_name_str(*this)
<< " doesn't support file system adaptor";
return -1;
}
virtual int set_snapshot_throttle(SnapshotThrottle* st) {
(void)st;
CHECK(false) << butil::class_name_str(*this)
<< " doesn't support snapshot throttle";
return -1;
}
// 初始化
virtual int init() = 0;
// 创建一个新的 快照编写器
virtual SnapshotWriter* create() = 0;
// 关闭快照编写器
virtual int close(SnapshotWriter* writer) = 0;
// 获取最新的快照阅读器
virtual SnapshotReader* open() = 0;
// 关闭快照阅读器
virtual int close(SnapshotReader* reader) = 0;
// 从uri复制快照并作为SnapshotReader打开
virtual SnapshotReader* copy_from(const std::string& uri) WARN_UNUSED_RESULT = 0;
virtual SnapshotCopier* start_to_copy_from(const std::string& uri) = 0;
virtual int close(SnapshotCopier* copier) = 0;
// 使用| uri |中编码的参数创建此类SnapshotStorage的实例。
// 成功则返回引用到实例的指针,否则返回NULL。
virtual SnapshotStorage* new_instance(const std::string& uri) const WARN_UNUSED_RESULT = 0;
static SnapshotStorage* create(const std::string& uri);
// 回收使用| uri |参数的 SnapshotStorage实例的资源
virtual butil::Status gc_instance(const std::string& uri) const {
CHECK(false) << butil::class_name_str(*this)
<< " didn't implement gc_instance interface while deleting"
" raft snapshot in " << uri;
butil::Status status;
status.set_error(ENOSYS, "gc_instance interface is not implemented");
return status;
}
static butil::Status destroy(const std::string& uri);
};
inline brpc::Extension<const LogStorage>* log_storage_extension() {
return brpc::Extension<const LogStorage>::instance();
}
inline brpc::Extension<const RaftMetaStorage>* meta_storage_extension() {
return brpc::Extension<const RaftMetaStorage>::instance();
}
inline brpc::Extension<const SnapshotStorage>* snapshot_storage_extension() {
return brpc::Extension<const SnapshotStorage>::instance();
}
}
#endif //~BRAFT_RAFT_STORAGE_H
#ifndef BRAFT_CLI_H
#define BRAFT_CLI_H
#include "braft/raft.h"
namespace braft {
namespace cli {
struct CliOptions {
int timeout_ms;
int max_retry;
CliOptions() : timeout_ms(-1), max_retry(3) {}
};
// 将一个新的peer节点添加到由| conf |组成的复制组中。
// 成功返回OK,否则返回错误信息。
butil::Status add_peer(const GroupId& group_id, const Configuration& conf,
const PeerId& peer_id, const CliOptions& options);
// 将一个peer节点从由| conf |组成的复制组中移除。
// 成功返回OK,否则返回错误信息。
butil::Status remove_peer(const GroupId& group_id, const Configuration& conf,
const PeerId& peer_id, const CliOptions& options);
// 优雅地更改复制组的peers节点
butil::Status change_peers(const GroupId& group_id, const Configuration& conf,
const Configuration& new_peers,
const CliOptions& options);
// 将该复制组的leader转移到目标peer节点
butil::Status transfer_leader(const GroupId& group_id, const Configuration& conf,
const PeerId& peer, const CliOptions& options);
// 将目标peer节点的peer set重置
butil::Status reset_peer(const GroupId& group_id, const PeerId& peer_id,
const Configuration& new_conf,
const CliOptions& options);
// 要求peer立即转储快照
butil::Status snapshot(const GroupId& group_id, const PeerId& peer_id,
const CliOptions& options);
} // namespace cli
} // namespace braft
#endif //BRAFT_CLI_H
#ifndef BRAFT_ROUTE_TABLE_H
#define BRAFT_ROUTE_TABLE_H
#include "braft/configuration.h" // Configuration
#include "braft/raft.h"
// Maintain routes to raft groups
namespace braft {
namespace rtb {
// 更新路由表中组的配置
int update_configuration(const GroupId& group, const Configuration& conf);
int update_configuration(const GroupId& group, const std::string& conf_str);
// 得到该组leader
// 返回:
// 0 : 成功
// 1 : 不确定leader是谁
// -1: 其他情况
int select_leader(const GroupId& group, PeerId* leader);
// 更新leader
int update_leader(const GroupId& group, const PeerId& leader);
int update_leader(const GroupId& group, const std::string& leader_str);
// 阻塞线程,直到query_leader完成
butil::Status refresh_leader(const GroupId& group, int timeout_ms);
// 从路由表中删除该组
int remove_group(const GroupId& group);
} // namespace rtb
} // namespace braft
#endif //BRAFT_ROUTE_TABLE_H