当前位置: 首页 > 工具软件 > BFS-Baidu > 使用案例 >

百度的bfs中raft协议实现(1)

邵璞
2023-12-01

raft具体原理就不说了,在这里只分析下百度bfs中的代码实现。

为了使用不同的同步协议,bfs中把同步抽象了接口,实现方面一个是传统的主从(带快照,类似mysql dump数据+GTID同步方式),一个就是raft了。

具体抽象接口如下:

class RaftImpl : public Sync {
public:
    RaftImpl();
    ~RaftImpl();
    void Init(SyncCallbacks callbacks);//初始化一系列callback,如log具体怎么落地,生成快照方式
    bool IsLeader(std::string* leader_addr = NULL);//自己是否leader,传出真正leader
    bool Log(const std::string& entry, int timeout_ms = 10000);//log调用入口,等待timeout_ms的时间
    void Log(const std::string& entry, std::function<void (bool)> callback);//log入口,成功后回调
    void SwitchToLeader() {}
    std::string GetStatus();//返回状态,leader/follower
public:
    google::protobuf::Service* GetService(); // 获取服务提供者
private:
    RaftNodeImpl* raft_node_;//真正实现者
};
然后看看实现者,代码本身已经带了一些注释了:
enum NodeState {
    kFollower = 0,
    kCandidate = 1,
    kLeader = 2,
};

class RaftNodeImpl : public RaftNode {
public:
    RaftNodeImpl(const std::string& raft_nodes, int node_index,
                 int election_timeout, const std::string& db_path);// 初始化节点信息,选举时间,log存储目录
    ~RaftNodeImpl();
    void Vote(::google::protobuf::RpcController* controller,
              const ::baidu::bfs::VoteRequest* request,
              ::baidu::bfs::VoteResponse* response,
              ::google::protobuf::Closure* done);// 要求其他节点给我投票
    void AppendEntries(::google::protobuf::RpcController* controller,
                       const ::baidu::bfs::AppendEntriesRequest* request,
                       ::baidu::bfs::AppendEntriesResponse* response,
                       ::google::protobuf::Closure* done);// 主动给其他节点复制日志(包含心跳)
public:
    bool GetLeader(std::string* leader);// 获取leader ip,返回自己是否leader
    void AppendLog(const std::string& log, std::function<void (bool)> callback);//抽象接口功能实现,同上描述
    bool AppendLog(const std::string& log, int timeout_ms = 10000);// 同上
    void Init(std::function<void (const std::string& log)> callback, // 同上
              std::function<void (int32_t, std::string*)> snapshot_callback);
private:
    bool StoreContext(const std::string& context, int64_t value); //存储meta信息
    bool StoreContext(const std::string& context, const std::string& value);//同上

    std::string Index2Logkey(int64_t index);//atoi
    void LoadStorage(const std::string& db_path);//初始化meta
    bool CancelElection();//放弃选举
    void ResetElection();//重置选举
    void ReplicateLogForNode(uint32_t id);//leader复制日志
    void ReplicateLogWorker(uint32_t id);//复制日志的线程函数
    void Election();//发起选举
    bool CheckTerm(int64_t term);//比较term和本地term
    void ElectionCallback(const VoteRequest* request,
                          VoteResponse* response,
                          bool failed,
                          int error,
                          const std::string& node_addr);// 选举的cb
    bool StoreLog(int64_t term, int64_t index, const std::string& log, LogType type = kUserLog);//存储log
    void ApplyLog();//应用commitlog

    std::string LoadVoteFor();// 以下函数都是和状态相关,已废弃
    void SetVeteFor(const std::string& votefor); 
    int64_t LoadCurrentTerm();
    void SetCurrentTerm(int64_t);
    void SetLastApplied(int64_t index);
    int64_t GetLastApplied(int64_t index);
private:
    std::vector<std::string> nodes_;
    std::string self_;

    int64_t current_term_;      /// 当前term
    std::string voted_for_;     /// 当前term下投的票
    LogDB* log_db_;             /// log持久存储
    int64_t log_index_;         /// 上一条log的index
    int64_t log_term_;          /// 上一条log的term

    int64_t commit_index_;      /// 提交的log的index
    int64_t last_applied_;      /// 应用到状态机的index
    bool applying_;             /// 正在提交到状态机

    bool node_stop_;//节点停止,用于日志同步标志位
    struct FollowerContext {//follower同步状态
        int64_t next_index;//下一个日志序号
        int64_t match_index;//已经match的序号
        common::ThreadPool worker;
        common::CondVar condition;
        FollowerContext(Mutex* mu) : next_index(0), match_index(0), worker(1), condition(mu) {}
    };
    std::vector<FollowerContext*> follower_context_;

    Mutex mu_;
    common::ThreadPool*  thread_pool_;
    RpcClient*   rpc_client_;
    std::set<std::string> voted_;   /// 谁投我了
    std::string leader_;
    int64_t election_taskid_;//选举定时任务id,可以用于取消
    int32_t election_timeout_;//选举超时

    std::function<void (const std::string& log)> log_callback_;
    std::map<int64_t, std::function<void (bool)> > callback_map_;//日志被复制后对应的cb
    NodeState node_state_;
};
先看看构造:
/*具体代码不贴,基本是初始化raft节点,状态恢复(LoadStorage),重置选举(ResetElection)等。
LoadStorage中,恢复了4个变量
last_applied_,current_term_,voted_for_,log_index_。然后对每个非自身节点初始化其FollowerContext,next_index = log_index_+1,并初始化工作线程为如下函数:*/
void RaftNodeImpl::ReplicateLogWorker(uint32_t id) {
    FollowerContext* follower = follower_context_[id];
    while (true) {
        MutexLock lock(&mu_);
        while (node_state_ != kLeader && !node_stop_) {//如果不是leader且没stop,那就等着leader通知
            follower->condition.Wait();
        }
        if (node_stop_) {//因退出而唤醒的,退出循环
            return;
        }
        int64_t s = common::timer::get_micros();//记录开始时间
        ReplicateLogForNode(id);//具体复制日志过程(顺带心跳)
        int64_t d = election_timeout_ / 2 - (common::timer::get_micros() - s) / 1000;
        if (d > 0) {//d=选举一趟时间-复制时间;如果d还有剩余则变相sleep,除非leader主动通知(比如有新日志了,leader状态变了之类),这样做的结果也就是控制心跳时间,使其至少要>选举一趟时间
            follower->condition.TimeWait(d);
        }
    }
}
/*这个函数对非leader状态的节点,会一直阻塞在wait。如果是leader状态,则会定期调到ReplicateLogForNode,有日志(match_index < log_index_)则是复制,没有则是心跳作用(也就是心跳间隔>选举单趟时间)。*/
void RaftNodeImpl::ResetElection() {
    mu_.AssertHeld();
    if (election_taskid_ != -1) {
        CancelElection();
    }
    election_taskid_ =
        thread_pool_->DelayTask(election_timeout_ + rand() % election_timeout_,
                                std::bind(&RaftNodeImpl::Election, this));
    //LOG(INFO, "Reset election %ld", election_taskid_);
}
/*功能很简单,就是取消线程池中的定时器任务,并重启。这里和raft标准类似的[t,t * 2]随机时间,如果没有取消定时任务就会开始一轮新选举任务。*/
接下来是最长的函数了,具体的Leader日志复制逻辑:

void RaftNodeImpl::ReplicateLogForNode(uint32_t id) {
    mu_.AssertHeld();

    FollowerContext* follower = follower_context_[id];
    int64_t next_index = follower->next_index;//该follower的下一个日志序号
    int64_t match_index = follower->match_index;//已经match的序号

    mu_.Unlock();
    int64_t max_index = 0;
    int64_t max_term = -1;
    std::unique_ptr<AppendEntriesRequest> request(new AppendEntriesRequest);
    std::unique_ptr<AppendEntriesResponse> response(new AppendEntriesResponse);
    request->set_term(current_term_);
    request->set_leader(self_);
    request->set_leader_commit(commit_index_);//构建req
    LOG(INFO, "M %ld N %ld I %ld", match_index, next_index, log_index_);
    if (match_index < log_index_) {//follower的match点<当前log点,说明有新日志
        assert(match_index <= next_index);
        int64_t prev_index = 0;
        int64_t prev_term = 0;
        std::string prev_log;
        StatusCode s = log_db_->Read(next_index - 1, &prev_log);
        if (s == kOK) {// 获取上一次提交的最后一条日志的index和term
            LogEntry prev_entry;
            bool ret = prev_entry.ParseFromString(prev_log);
            assert(ret);
            prev_index = prev_entry.index();
            prev_term = prev_entry.term();
        }
        request->set_prev_log_index(prev_index);
        request->set_prev_log_term(prev_term);
        for (int64_t i = next_index; i <= log_index_; i++) {//把新产生的日志放req里
            std::string log;
            s = log_db_->Read(i, &log);
            if (s != kOK) {
                LOG(FATAL, "Data lost: %ld", i);
                break;
            }
            LOG(INFO, "Add %ld to request", i);
            LogEntry* entry = request->add_entries();
            bool ret = entry->ParseFromString(log);
            assert(ret);
            max_index = entry->index();//记录最后一个log的index和term,等下更新用
            max_term = entry->term();
            if (request->ByteSize() >= 1024*1024) {//限制req大小,如果发太多但失败了,重发就恶心了
                break;
            }
        }
    }
    RaftNode_Stub* node;
    rpc_client_->GetStub(nodes_[id], &node);//给follower发送日志,这里使用阻塞rpc,等返回结果,follower的响应函数为AppendEntries。先直接看下面leader对结果的处理
    bool ret = rpc_client_->SendRequest(node, &RaftNode_Stub::AppendEntries,
                                        request.get(), response.get(), 1, 1);
    LOG(INFO, "Replicate %d entrys to %s return %d",
        request->entries_size(), nodes_[id].c_str(), ret);
    delete node;

    mu_.Lock();
    if (ret) {
        int64_t term = response->term();
        if (CheckTerm(term)) {//如果term匹配的(term<=current_term_)
            if (response->success()) {//这里表示日志匹配了(空日志也可以)
                if (max_index && max_term == current_term_) {//有日志且是当前term的日志
                    follower->match_index = max_index;//更新follower的match,next状态
                    follower->next_index = max_index + 1;
                    LOG(INFO, "Replicate to %s success match %ld next %ld",
                        nodes_[id].c_str(), max_index, max_index + 1);
                    std::vector<int64_t> match_index;//log被接收后搜集所有follower的match_index
                    for (uint32_t i = 0; i < nodes_.size(); i++) {
                        if (nodes_[i] == self_) {
                            match_index.push_back(1LL<<60);//如果是自身,一定match了,当做最大
                        } else {
                            match_index.push_back(follower_context_[i]->match_index);
                        }
                    }
                    std::sort(match_index.begin(), match_index.end());
                    int mid_pos = (nodes_.size() - 1) / 2;//排序,获取多数派的match_index
                    int64_t commit_index = match_index[mid_pos];
                    if (commit_index > commit_index_) {// commit_index更新了,本地commit
                        LOG(INFO, "Update commit_index from %ld to %ld",
                            commit_index_, commit_index);
                        commit_index_ = commit_index;
                        while (last_applied_ < commit_index) {//依次本地应用日志
                            last_applied_ ++;
                            LOG(INFO, "[Raft] Apply %ld to leader", last_applied_);
                            std::map<int64_t, std::function<void (bool)> >::iterator cb_it =
                                callback_map_.find(last_applied_);//日志的callback
                            if (cb_it != callback_map_.end()) {
                                std::function<void (bool)> callback = cb_it->second;
                                callback_map_.erase(cb_it);
                                mu_.Unlock();
                                LOG(INFO, "[Raft] AppendLog callback %ld", last_applied_);
                                callback(true);//处理callback
                                mu_.Lock();
                            } else {
                                LOG(INFO, "[Raft] no callback for %ld", last_applied_);
                            }
                        }
                        if (last_applied_ == commit_index) {//如果处理完才持久化last_applied
                            StoreContext("last_applied", last_applied_);
                        }
                    }
                }// end of response->success(),没日志,也就是心跳不处理任何事
            } else {// 日志不匹配,这里是回退一个log_index_,等待下一次复制(Q:差距比较大会不会导致效率低?)
                if (follower->next_index > follower->match_index
                    && follower->next_index > 1) {
                    --follower->next_index;
                }
            }
        }
    }
}
这里是日志复制follower的处理逻辑:

void RaftNodeImpl::AppendEntries(::google::protobuf::RpcController* controller,
                   const ::baidu::bfs::AppendEntriesRequest* request,
                   ::baidu::bfs::AppendEntriesResponse* response,
                   ::google::protobuf::Closure* done) {
    MutexLock lock(&mu_);
    int64_t term = request->term();// leader的term
    if (term < current_term_) {//leader term还没自己大?可能是延迟到达,但我已经当选leader。也可能是发生分区,旧leader已经不是最新leader了
        LOG(INFO, "AppendEntries old term %ld / %ld", term, current_term_);
        response->set_success(false);
        done->Run();//不知道干什么的?回包?
        return;
    }

    CheckTerm(term);//比较term,可能转变为follower
    if (term == current_term_ && node_state_ == kCandidate) {//对方已经开始发送日志,term相等,但我还是候选状态,说明对方已经当选leader了
        node_state_ = kFollower;
    }
    leader_ = request->leader();
    ResetElection();//日志(心跳)到了,重置选举超时时间
    int64_t prev_log_term = request->prev_log_term();
    int64_t prev_log_index = request->prev_log_index();//上次一次同步的最后一个log_index
    if (prev_log_index > 0) {   // check prev term,检查日志连续性
        std::string log;
        StatusCode s = log_db_->Read(prev_log_index, &log);
        LogEntry entry;
        if (s == kOK && !entry.ParseFromString(log)) {
            LOG(FATAL, "Paser logdb value fail:%ld", prev_log_index);
        }
        if (s == kNsNotFound || entry.term() != prev_log_term) {//日志不连续,让leader调整next_index
            LOG(INFO, "[Raft] Last index %ld term %ld / %ld mismatch",
                prev_log_index, prev_log_term, entry.term());
            response->set_success(false);
            done->Run();
            return;
        }
        LOG(INFO, "[Raft] Last index %ld term %ld match", prev_log_index, prev_log_term);
    }

    /// log match...
    int64_t leader_commit = request->leader_commit();
    int entry_count = request->entries_size();
    if (entry_count > 0) {//有日志要处理
        LOG(INFO, "AppendEntries from %s %ld \"%s\" %ld",
            request->leader().c_str(), term,
            request->entries(0).log_data().c_str(), leader_commit);
        for (int i = 0; i < entry_count; i++) {
            int64_t index = request->entries(i).index();
            if (log_index_ >= index) {//有重复的日志
                LOG(INFO, "[raft] Ignore duplicate entry %ld, last log_index: %ld",
                    index, log_index_);
                continue;
            } else if (log_index_ + 1 != index && log_index_ != -1) {//预期index是连续的(Q:不连续怎么办?)
                LOG(FATAL, "[Raft] Wrong index %ld in AppendEntries, last log_index: %ld",
                    index, log_index_);
            }
            bool ret = StoreLog(current_term_, index,
                                request->entries(i).log_data());//存储日志
            log_index_ = index;//对follower来说log_index_是接受的最后一个日志的index
            if (!ret) {
                response->set_success(false);
                done->Run();
                return;
            }
        }
    }

    response->set_term(current_term_);
    response->set_success(true);
    done->Run();
    //到这里日志复制都成功了,更新本地commit_index_
    if (leader_commit > commit_index_) {
        commit_index_ = leader_commit;
    }
    if (commit_index_ > last_applied_) {//有新日志可以应用,异步处理
        thread_pool_->AddTask(std::bind(&RaftNodeImpl::ApplyLog, this));
    }
}
接着看看应用日志,代码很简单,读取[last_applied_+1, commit_index_]区间的日志,依次调用callback,并更新last_applied_落地。不过这里值得注意的是,只有log_type为kUserLog的才应用,log还有一种kRaftCmd类型,这个类型日志后面说明。

日志复制大概就是以上流程,上面提到,每次如果没有收到日志复制(心跳)RPC,则有一个随机时间的delay_task触发选举,选举主体:

void RaftNodeImpl::Election() {
    MutexLock lock(&mu_);
    if (node_state_ == kLeader) {//已经是leader,取消本次操作(Q:因为其他节点回应慢了,所以多触发一次?)
        election_taskid_ = -1;
        return;
    }

    voted_.clear();//重新统计投票信息
    current_term_ ++;
    node_state_ = kCandidate;//候选状态
    voted_for_ = self_;//先投自己(标识投过谁,每个term只投一次,不重复)
    if (!StoreContext("current_term", current_term_) || !StoreContext("voted_for", voted_for_)) {
        LOG(FATAL, "Store term & vote_for fail %s %ld", voted_for_.c_str(), current_term_);
    }
    voted_.insert(self_);//谁投过我,多数派判断的地方
    LOG(INFO, "Start Election: CT%ld LI%ld LT%ld", current_term_, log_index_, log_term_);
    for (uint32_t i = 0; i < nodes_.size(); i++) {//对每个非自己的node,要求他们投我
        if (nodes_[i] == self_) {
            continue;
        }
        LOG(INFO, "Send VoteRequest to %s", nodes_[i].c_str());
        VoteRequest* request = new VoteRequest;
        request->set_term(current_term_);
        request->set_candidate(self_);
        request->set_last_log_index(log_index_);
        request->set_last_log_term(log_term_);//拿出自己最后log的term,index,当前选举term.(Q:log_term_好像并没有用到,一直都是0?)
        VoteResponse* response = new VoteResponse;
        RaftNode_Stub* raft_node;
        rpc_client_->GetStub(nodes_[i], &raft_node);
        std::function<void (const VoteRequest*, VoteResponse*, bool, int)> callback
                = std::bind(&RaftNodeImpl::ElectionCallback, this,
                            std::placeholders::_1, std::placeholders::_2,
                            std::placeholders::_3, std::placeholders::_4, nodes_[i]);
        rpc_client_->AsyncRequest(raft_node, &RaftNode_Stub::Vote, request, response, callback, 60, 1);//这里和日志复制不同了,没有使用同步RPC,可能是节点多了,用不着等待,也没有promise/future并行模式等待N-1个。所以入口需要判断是否已经是leader
        delete raft_node;
    }
    election_taskid_ =
        thread_pool_->DelayTask(election_timeout_ + rand() % election_timeout_,
                                std::bind(&RaftNodeImpl::Election, this));//计划下一次选举
}
看看对选举的回应:

void RaftNodeImpl::Vote(::google::protobuf::RpcController* controller,
                    const ::baidu::bfs::VoteRequest* request,
                    ::baidu::bfs::VoteResponse* response,
                    ::google::protobuf::Closure* done) {
    int64_t term = request->term();
    const std::string& candidate = request->candidate();
    int64_t last_log_index = request->last_log_index();
    int64_t last_log_term = request->last_log_term();
    LOG(INFO, "Recv vote request: %s %ld %ld / (%s %ld %ld)",
        candidate.c_str(), term, last_log_term,
        voted_for_.c_str(), current_term_, log_term_);
    MutexLock lock(&mu_);
    CheckTerm(term);
    if (term >= current_term_//给对方投票的条件: 1.对方term>=当前选举term
        && (voted_for_ == "" || voted_for_ == candidate)//2.没投过 或 已经给对方投过
        && (last_log_term > log_term_ ||//3.对方日志term较大 或 日志相等,但log序号更大
        (last_log_term == log_term_ && last_log_index >= log_index_))) {
        voted_for_ = candidate;//记录本次term投过的
        if (!StoreContext("current_term", current_term_) || !StoreContext("voted_for", voted_for_)) {
            LOG(FATAL, "Store term & vote_for fail %s %ld", voted_for_.c_str(), current_term_);
        } else {
            LOG(INFO, "Granted %s %ld %ld", candidate.c_str(), term, last_log_index);
        }
        response->set_vote_granted(true);
        response->set_term(term);
        done->Run();
        return;
    }
    response->set_vote_granted(false);//否则拒绝本次投票
    response->set_term(current_term_);
    done->Run();
}
最后是投票结果的处理:
void RaftNodeImpl::ElectionCallback(const VoteRequest* request,
                                    VoteResponse* response,
                                    bool failed,
                                    int error,
                                    const std::string& node_addr) {
    std::unique_ptr<const VoteRequest> req(request);
    std::unique_ptr<VoteResponse> res(response);
    if (failed) {
        return;
    }

    int64_t term = response->term();
    bool granted = response->vote_granted();
    assert(term >= request->term() && (term == request->term() || !granted));//对结果的预期;1.投了则resp的term和req的相等。2.没投说明对方resp的term较大 或term相等,但log较新。

    LOG(INFO, "ElectionCallback %s by %s %ld / %ld",
        granted ? "granted" : "reject",
        node_addr.c_str(), term, current_term_);

    MutexLock lock(&mu_);
    CheckTerm(term);
    if (term != current_term_ || !granted || node_state_ == kLeader) {//异步回调,状态都要判断
        return;
    }

    voted_.insert(node_addr);//每次有人投我,加入被投列表(上面过滤了非current_term_的cb)
    if (voted_.size() >= (nodes_.size() / 2) + 1) {//拿到多数投票
        leader_ = self_;
        node_state_ = kLeader;
        CancelElection();//取消选举超时
        LOG(INFO, "Change state to Leader, term %ld index %ld commit %ld applied %ld",
            current_term_, log_index_, commit_index_, last_applied_);
        StoreLog(current_term_, ++log_index_, "", kRaftCmd);//立即插入一个kRaftCmd空内容的日志,在日志应用里是被忽略的。但这一步很重要,从根本上来说强化了node的leader合法地位。这里举2个例子:[1].有ABC三节点,A和C分区,但和都和B连通。假设A是靠term自增和一个较快的超时当选的,大家的log_index相同,还没有用户日志,如果没这一步,C觉得超时了,要重新选举,C是有可能根据term+较快超时当选的,于是AC轮流当选。有这一步了会使得B的log和A同步,故B不会投票给C,可以减少这样的情况(当然,C投票要求先到则还是会出现)   [2].有ABCDE5个节点,a.A是leader,log_index=1,已经复制到B,AB挂掉。b.假设C当选,C接受了一条用户日志,log_index=1(term不同),C挂掉 c.A恢复,A复制D,这次已经复制到多数集可以提交了,AB再次挂 d.C当选,然后用自己的日志覆盖了AB的log_index=1(这是不允许的)。 有了这一步后在c步这里A会加一条log_index=2,必须先提交了,才能提交前面的log1。
        for (uint32_t i = 0;i < follower_context_.size(); i++) {//更新follower状态
            if (nodes_[i] != self_) {
                follower_context_[i]->match_index = 0;
                follower_context_[i]->next_index = log_index_ + 1;
                follower_context_[i]->condition.Signal();//触发和各个follower的日志复制(心跳),之前阻塞在condtion.Wait()处
            }
        }
    }
}
以上就是大致流程,代码实现看起来并不麻烦,主要是对比raft的细节和正确性

 类似资料: