侵删,仅供参考学习
braft的官方文档
个人建议,由于braft的说明文档并没有那么友好,可以先看一遍jraft的文档,两个的大致思路和一些变量名称是一样的,这样对理解braft的帮助会大一些。
braft 本身并不提供server功能, 你可以将braft集成到包括brpc在内的任意编程框架中,本文主要是阐述如何在分布式Server中使用braft来构建高可用系统。具体业务如何实现一个Server,本文不在展开。
server-side code of Counter(服务端代码的例子)
braft需要运行在具体的brpc server里面你可以让braft和你的业务共享同样的端口, 也可以将braft启动到不同的端口中.
brpc允许一个端口上注册多个逻辑Service, 如果你的Service同样运行在brpc Server里面,你可以管理brpc Server并且调用以下任意一个接口将braft相关的Service加入到你的Server中。这样能让braft和你的业务跑在同样的端口里面, 降低运维的复杂度。如果对brpc Server的使用不是非常了解, 可以先查看wiki页面.
注意: 如果你提供的是对外网用户暴露的服务,不要让braft跑在相同的端口上。
//将raft服务附加到| server |(自己的服务端),这使raft服务与用户服务共享相同的侦听地址。
//注意:目前,我们仅允许使用特定的侦听地址启动备用服务器,如果要从一个范围端口启动该服务器,则行为是不确定的。
//成功返回0,否则返回-1。
int add_service(brpc::Server* server, const butil::EndPoint& listen_addr);
int add_service(brpc::Server* server, int port);
int add_service(brpc::Server* server, const char* const butil::EndPoint& listen_addr);
你需要继承braft::StateMachine并且实现里面的接口
//将raft服务附加到| server |(自己的服务端),这使raft服务与用户服务共享相同的侦听地址。
//注意:目前,我们仅允许使用特定的侦听地址启动备用服务器,如果要从一个范围端口启动该服务器,则行为是不确定的。
//成功返回0,否则返回-1。
int add_service(brpc::Server* server, const butil::EndPoint& listen_addr);
int add_service(brpc::Server* server, int port);
int add_service(brpc::Server* server, const char* const butil::EndPoint& listen_addr);
#include <braft/raft.h>
//注意:不需要所有接口都是线程安全的,它们是按顺序调用的,就是说每个方法都会阻塞之后的所有方法。
class YourStateMachineImple : public braft::StateMachine {
protected:
// on_apply是*必须*实现的
// on_apply会在一条或者多条日志被多数节点持久化之后调用, 通知用户将这些日志所表示的操作应用到业务状态机中.
// 通过iter, 可以从遍历所有未处理但是已经提交的日志, 如果你的状态机支持批量更新,可以一次性获取多
// 条日志提高状态机的吞吐.
//
void on_apply(braft::Iterator& iter) {
// 提交了一批任务,必须通过| iter |处理。
for (; iter.valid(); iter.next()) {
// 该保护措施有助于异步调用iter.done()->Run(),避免回调阻塞StateMachine。
braft::AsyncClosureGuard closure_guard(iter.done());
// 从iter.data()解析出操作语句,并执行
// op = parse(iter.data());
// result = process(op)
// 跟踪日志是为了帮助您了解此StateMachine的工作方式。
// 可以在性能敏感的服务器中删除这些日志。
LOG_IF(INFO, FLAGS_log_applied_task)
<< "Exeucted operation " << op
<< " and the result is " << result
<< " at log_index=" << iter.index();
}
}
// 当这个braft节点被shutdown之后, 当所有的操作都结束, 会调用on_shutdown, 来通知用户这个状态机不再被使用。
// 这时候你可以安全的释放一些资源了.
virtual void on_shutdown() {
// 清理您想要清理的资源
}
通过braft::iterator你可以遍历从所有有的任务
class Iterator {
// 移至下一个任务。
void next();
// 返回当前的唯一且单调递增的索引
// 任务:
// - 唯一性保证了,在不同的peers中具有相同索引的已提交的任务始终相同且保持不变
// - 单调性确保在小组中的所有peers中,对于任何索引对i,j(i < j),
// 索引为| i |的任务必须在索引| j |的任务之前应用
int64_t index() const;
// 返回任务应用时的leader任期
int64_t term() const;
// 返回与在leader节点中传递给Node::apply的内容相同的数据
const butil::IOBuf& data() const;
// 如果done()为非空,无论此操作成功或失败,您都必须在应用此任务后
// 调用done()->Run(),否则将泄漏相应的资源。
//
// 如果此节点在担任该组的负责人时提出此任务,并且领导位置在此之前没有改变,
// 那么done()正是传递给Node::apply的内容,
// 在使用给定任务更新StateMachine之后,done()可能代表某种延续(例如响应客户端)。
// 否则done()必须为NULL
Closure* done() const;
// 如果此迭代器当前引用的是有效任务,返回true,
// 否则返回false,说明迭代器到达了这批任务的结尾,或者说明发生了些错误
bool valid() const;
// 发生某些严重错误时调用。
// 并且我们认为最后的|ntail|任务(从上一个迭代的任务开始)没有被应用
//
// 如果| st | 不是NULL,它将描述错误的详细信息。
void set_error_and_rollback(size_t ntail = 1, const butil::Status* st = NULL);
};
一个Node代表了一个RAFT实例, Node的ID由两个部分组成:
Node(const GroupId& group_id, const PeerId& peer_id);
启动这个节点:
struct NodeOptions {
// 如果在| election_timeout_ms |毫秒的时间里未收到来自领导者的任何消息,则追随者将成为候选人。
// 默认为: 1000 (1s)
int election_timeout_ms;
// 如果将其置为正数,每|snapshot_interval_s| 秒会触发一次快照保存。
// 如果 |snapshot_interval_s| <= 0, 基于时间保存快照的功能将被禁用。
// 默认为: 3600 (1 hour)
int snapshot_interval_s;
// 我们认为一个新增的peer节点已经追赶上当前进度,假如peer节点的last_log_index和
// leader节点之间的差值小于 |catchup_margin|
// 默认为: 1000
int catchup_margin;
// 如果节点从一个新环境启动(LogStorage和SnapshotStorage均为空)
// 那么它会使用| initial_conf | 作为组的配置,否则它将从现有环境中加载配置。
// 默认: A empty group
Configuration initial_conf;
// 特定的StateMachine实现了您的业务逻辑,该业务逻辑必须是有效实例。
StateMachine* fsm;
// 如果|node_owns_fsm|为true,那么当不再引用节点时,|fsm|将被销毁
// 默认: false
bool node_owns_fsm;
// 以${类型}://${参数}格式描述特定的LogStorage
std::string log_uri;
// 以${类型}://${参数}格式描述特定的StableStorage
std::string raft_meta_uri;
// 以${类型}://${参数}格式描述特定的SnapshotStorage
std::string snapshot_uri;
// 如果启用,在从远程复制快照之前,将过滤掉重复的文件,以避免无用的传输。
// 仅当本地文件和远程文件中的两个文件具有相同的文件名
// 和相同的checksum(存储在文件meta中)时,才会开始复制
// 默认: false
bool filter_before_copy_remote;
// 如果为true,将拒绝通过raft_cli进行的RPC。
// 默认: false
bool disable_cli;
};
class Node {
int init(const NodeOptions& options);
};
你需要将你的操作序列化成IOBuf, 这是一个非连续零拷贝的缓存结构。构造一个Task, 并且向braft::Node提交
#include <braft/raft.h>
...
// op为操作语句
// callback为closure
void function(op, callback) {
butil::IOBuf data;
serialize(op, &data);
braft::Task task;
task.data = &data;
task.done = make_closure(callback);
task.expected_term = expected_term;
return _node->apply(task);
}
具体接口
struct Task {
Task() : data(NULL), done(NULL) {}
// data应用于StateMachine
base::IOBuf* data;
// 当data应用于StateMachine或发生错误时继续。
Closure* done;
// 如果|expected_term|不是-1,如果expected_term与该节点的当前term不匹配,则拒绝此任务
// 默认: -1
int64_t expected_term;
};
// 将任务应用于复制状态机
//
// 关于所有权:
// |task.data|: 出于性能考虑,我们将删除内容。如果要保留内容,请在调用此函数之前将其复制
// |task.done|: 如果data已成功提交到raft组。我们将所有权传递给StateMachine::on_apply.
// 否则,我们将指定错误并调用它。
void apply(const Task& task);
raft::Closure是一个特殊的protobuf::Closure的子类, 可以用了标记一次异步调用成功或者失败. 和protobuf::Closure一样, 你需要继承这个类,实现Run接口。 当一次异步调用真正结束之后, Run会被框架调用, 此时你可以通过status()来确认这次调用是否成功或者失败。
// Raft-specific closure包含一个base::Status以报告操作是否成功。
class Closure : public google::protobuf::Closure {
public:
base::Status& status() { return _st; }
const base::Status& status() const { return _st; }
};
StateMachine中还提供了一些接口, 实现这些接口能够监听Node的状态变化,你的系统可以针对这些状态变化实现一些特定的逻辑(比如转发消息给leader节点)
class StateMachine {
...
// 当raft节点关闭时调用一次,在这之后就可以安全清除相应的资源。
// 默认不执行任何操作
virtual void on_shutdown();
// 当所属节点在| term |任期成为组的leader时调用
// 默认不执行任何操作
virtual void on_leader_start(int64_t term);
// 当此节点不再是所属组的leader时调用
// |status| 描述有关原因的更多详细信息。
virtual void on_leader_stop(const butil::Status& status);
// 当发生一些严重错误并且此节点此后停止工作时调用。
virtual void on_error(const ::braft::Error& e);
// 在将配置提交给当前组后调用
virtual void on_configuration_committed(const ::braft::Configuration& conf);
// 当一个 follower 停止跟随leader时调用
// 情况包括:
// 1.选举超时时间已到。
// 2.从具有更高term的节点收到消息
virtual void on_stop_following(const ::braft::LeaderChangeContext& ctx);
// 当该节点开始跟随新leader节点时调用。
virtual void on_start_following(const ::braft::LeaderChangeContext& ctx);
...
};
在braft中,Snapshot被定义为在特定持久化存储中的文件集合, 用户将状态机序列化到一个或者多个文件中, 并且任何节点都能从这些文件中恢复状态机到当时的状态.
Snapshot有两个作用:
class Snapshot : public butil::Status {
public:
Snapshot() {}
virtual ~Snapshot() {}
// 获取快照的路径
virtual std::string get_path() = 0;
// 列出当前快照中的所有现有文件
virtual void list_files(std::vector<std::string> *files) = 0;
// 获取实现定义的file_meta(implementation-defined file_meta)
virtual int get_file_meta(const std::string& filename,
::google::protobuf::Message* file_meta) {
(void)filename;
file_meta->Clear();
return 0;
}
};
class SnapshotWriter : public Snapshot {
public:
SnapshotWriter() {}
virtual ~SnapshotWriter() {}
// 保存被raft框架使用的快照的元信息。
virtual int save_meta(const SnapshotMeta& meta) = 0;
// 将文件添加到快照。
// |file_meta| 是一个实现定义的原型消息 (implmentation-defined protobuf message 不会翻译)
// 所有实现都必须处理|file_meta| 为null的情况,确保不会引发错误
// 请注意,是否将文件创建到后备存储器是由实现定义(implmentation-defined)的。
virtual int add_file(const std::string& filename) {
return add_file(filename, NULL);
}
virtual int add_file(const std::string& filename,
const ::google::protobuf::Message* file_meta) = 0;
// 从快照中删除文件
// 请注意,是否从后备存储器中删除文件是由实现定义(implmentation-defined)的。
virtual int remove_file(const std::string& filename) = 0;
};
class SnapshotReader : public Snapshot {
public:
SnapshotReader() {}
virtual ~SnapshotReader() {}
// 加载元信息 from
virtual int load_meta(SnapshotMeta* meta) = 0;
// 为其他peers生成uri以复制此快照。
// 如果发生某些错误,则返回一个空字符串
virtual std::string generate_uri_for_copy() = 0;
};
不同业务的Snapshot千差万别,因为SnapshotStorage并没有抽象具体读写Snapshot的接口,而是抽象出SnapshotReader和SnapshotWriter,交由用户扩展具体的snapshot创建和加载逻辑。
Snapshot创建流程:
Snapshot读取流程:
libraft内提供了基于文件列表的LocalSnapshotWriter和LocalSnapshotReader默认实现,具体使用方式为:
实际情况下,用户业务状态机数据的snapshot有下面几种实现方式:
对于业界一些newsql系统,它们大都使用类rocksdb的lsm tree的存储引擎,支持MVCC。在进行raft snapshot的时候,使用上面的方案1,先创建一个db的snapshot,然后创建一个iterator,遍历并持久化数据。tidb、cockroachdb都是类似的解决方案。
braft::Node可以通过调用api控制也可以通过braft_cli来控制, 本章主要说明如何使用api.
在分布式系统中,机器故障,扩容,副本均衡是管理平面需要解决的基本问题,braft提供了几种方式:
// 在raft组中添加一个新的peer节点
// 此操作完成后,将调用done-> Run(),描述结果详细信息。
void add_peer(const PeerId& peer, Closure* done);
// 从raft组中移除一个peer节点
// 此操作完成后,将调用done-> Run(),描述结果详细信息。
void remove_peer(const PeerId& peer, Closure* done);
// 将raft组的配置优雅地更改为|new_peers|
// 此操作完成后,将调用done-> Run(),描述结果详细信息。
void change_peers(const Configuration& new_peers, Closure* done);
节点变更分为几个阶段:
当考虑节点删除的时候, 情况会变得有些复杂, 由于判断成功提交的节点数量变少, 可能会出现在前面的日志没有成功提交的情况下, 后面的日志已经被判断已经提交。 这时候为了状态机的操作有序性, 即使之前的日志还未提交, 我们也会强制判断为成功.
举个例子:
当多数节点故障的时候,是不能通过add_peer/remove_peer/change_peers进行节点变更的,这个时候安全的做法是等待多数节点恢复,能够保证数据安全。如果业务追求服务的可用性,放弃数据安全性的话,可以使用reset_peers飞线设置复制组Configuration。
// 单独重置此节点的配置,在此节点成为领导者之前,无需和其他peers节点有任何复制
// 当大多数复制组都已失效,并且您希望在可用性方面恢复服务时,该功能应该被调用
//
// 请注意,在这种情况下,既不能保证一致性也不可以保证共识,在处理此方法时**请务必小心**。
butil::Status reset_peers(const Configuration& new_peers);
reset_peer之后,新的Configuration的节点会开始重新选主,当新的leader选主成功之后,会写一条新Configuration的Log,这条Log写成功之后,reset_peer才算成功。如果中间又发生了失败的话,外部需要重新选取peers并发起reset_peers。
不建议使用reset_peers,reset_peers会破坏raft对数据一致性的保证,而且可能会造成脑裂。例如,{A B C D E}组成的复制组G,其中{C D E}故障,将{A B} set_peer成功恢复复制组G’,{C D E}又重新启动它们也会形成一个复制组G’’,这样复制组G中会存在两个Leader,且{A B}这两个复制组中都存在,其中的follower会接收两个leader的AppendEntries,当前只检测term和index,可能会导致其上数据错乱。
// 当目前的配置和|old_peers|匹配时,添加一个新peer节点到raft组
// 此操作完成后,将调用done-> Run(),描述结果详细信息。
void add_peer(const std::vector<PeerId>& old_peers, const PeerId& peer, Closure* done);
// 尝试将leader位置转移给| peer |。
// 如果peer是ANY_PEER,下个term将选出合适的follower作为leader
// 成功返回0,否则返回-1。
int transfer_leadership_to(const PeerId& peer);
在一些场景中,我们会需要外部强制将leader切换到另外的节点, 比如:
braft实现了主迁移算法, 这个算法包含如下步骤:
braft中在Node启动之后,会在http://${your_server_endpoint}/raft_stat中列出当前这个进程上Node的列表,及其每个Node的内部状态。
其中包括:
字段 | 说明 |
---|---|
state | 节点状态,包括LEADER/FOLLOWER/CANDIDATE |
term | 当前term |
conf_index | 上一个Configuration产生的log index |
peers | 当前Configuration中节点列表 |
leader | 当前Configuration中Leader节点 |
election_timer | 选主定时器,FOLLOWER状态下启用 |
vote_timer | 投票定时器,CANDIDATE状态下启用 |
stepdown_timer | 主切从定时器,LEADER状态下启用 |
snapshot_timer | 快照定时器 |
storage | log storage中first log index和last log index |
disk_index | 持久化的最后一个log index |
known_applied_index | fsm已经apply的最后一个log index |
last_log_id | 最后一条内存log信息(log先写内存再批量刷disk) |
state_machine | fsm状态,包括IDLE/COMMITTED/SNAPSHOT_SAVE/SNAPSHOT_LOAD/LEADER_STOP/ERROR |
last_committed_index | 已经committed的最大log index |
last_snapshot_index | 上一次snapshot中包含的最后一条log index |
last_snapshot_term | 上一次snapshot中包含的最后一条log的term |
snapshot_status | snapshot状态,包括:LOADING/DOWNLOADING/SAVING/IDLE,其中LOADING和DOWNLOADING会显示snapshot uri和snapshot meta |
raft中有很多flags配置项,运行中可以通过http://endpoint/flags查看,具体如下:
flags名 | 说明 |
---|---|
raft_sync | 是否开启sync |
raft_max_append_buffer_size | log manager中内存缓存大小 |
raft_leader_batch | log manager中最大batch合并 |
raft_max_entries_size | AppendEntries包含entries最大数量 |
raft_max_body_size | AppendEntris最大body大小 |
raft_max_segment_size | 单个logsegment大小 |
raft_max_byte_count_per_rpc | snapshot每次rpc下载大小 |
raft_apply_batch | apply的时候最大batch数量 |
raft_election_heartbeat_factor | election超时与heartbeat超时的比例 |