当前位置: 首页 > 工具软件 > SOFAJRaft > 使用案例 >

Raft算法实现 - Sofa-JRaft,选主,数据写入,日志复制

洪昊然
2023-12-01

关于raft算法相关细节,可以全看之前的文章 分布式一致性算法,两阶段提交,三阶段提交,Paxos,Raft,zookeeper的选主过程,zab协议,顺序一致性,数据写入流程,节点状态,节点的角色

这里我们说下阿里开源的sofa-jraft的实现。
首先说明下,在sofa-jraft有几个比较重要的角色

  • Node 代表的就是一个服务节点
  • Ballot 代表的是一次投票的相关信息
  • PeerId 代表的是一个复制组里面的一个参与角色
  • StateMachine 当数据提交到Node之后,会执行其onApply方法

另外Node中有几个比较重要的定时器:

  • electionTimer 选举定时器,如果当前leader挂了,会进行preVote
  • voteTimer 投票定时器,当投票超时后,会进行preVote
  • stepDownTimer leader使用,判断当前节点是否存活,且检察整个集群是否有节点下线并更新Leader节点的Timestamp

选主投票

JRaft的选举投票有两个步骤preVotevote,之所以要增加一个preVote的步骤,是为了解决系统中防止某个节点由于无法和leader同步,不断发起投票,抬升自己的Term,导致自己Term比Leader的Term还大,然后迫使Leader放弃Leader身份,开始新一轮的选举。
preVote则强调节点必须获得半数以上的投票才能开始发起新一轮的选举。

JRaft的选举是通过定时器超时开始的,在NodeImpl中(Node的具体实现类),当我们执行NodeImpl.init的时候,会开启electionTimer:

this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(),
            TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)) {
            protected void onTrigger() {
                handleElectionTimeout();
            }
            protected int adjustTimeout(final int timeoutMs) {
                return randomTimeout(timeoutMs);
            }
        };
  private void handleElectionTimeout() {
        boolean doUnlock = true;
        this.writeLock.lock();
        try {
            if (this.state != State.STATE_FOLLOWER) {
                return;
            }
            if (isCurrentLeaderValid()) {
                return;
            }
            resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.",
                this.leaderId));

            // Judge whether to launch a election.
            if (!allowLaunchElection()) {
                return;
            }

            doUnlock = false;
            preVote();

        } finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
        }
    }

handleElectionTimeout中主要就是进行了preVote操作,这里JRaft一次投票的主要几个操作如下:

preVote ===> handlePreVoteRequest ===> electSelf ===>handleRequestVoteRequest

我们首先看下preVote:

private void preVote() {
     .....
        final LogId lastLogId = this.logManager.getLastLogId(true);
        boolean doUnlock = true;
        this.writeLock.lock();
        try {
            // pre_vote need defense ABA after unlock&writeLock
            if (oldTerm != this.currTerm) {
                LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
                return;
            }
            this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
            for (final PeerId peer : this.conf.listPeers()) {
                if (peer.equals(this.serverId)) {
                    continue;
                }
                if (!this.rpcService.connect(peer.getEndpoint())) {
                    LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
                    continue;
                }
                final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
                done.request = RequestVoteRequest.newBuilder() //
                    .setPreVote(true) // it's a pre-vote request.
                    .setGroupId(this.groupId) //
                    .setServerId(this.serverId.toString()) //
                    .setPeerId(peer.toString()) //
                    .setTerm(this.currTerm + 1) // next term
                    .setLastLogIndex(lastLogId.getIndex()) //
                    .setLastLogTerm(lastLogId.getTerm()) //
                    .build();
                this.rpcService.preVote(peer.getEndpoint(), done.request, done);
            }
            this.prevVoteCtx.grant(this.serverId);
            if (this.prevVoteCtx.isGranted()) {
                doUnlock = false;
                electSelf();
            }
        } finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
        }
    }

可以看到preVote中会对当前出自己以外的节点发送RequestVoteRequest请求,主要设置信息如下:

RequestVoteRequest.newBuilder() //
                    .setPreVote(true) // it's a pre-vote request.
                    .setGroupId(this.groupId) //
                    .setServerId(this.serverId.toString()) //
                    .setPeerId(peer.toString()) //
                    .setTerm(this.currTerm + 1) // next term
                    .setLastLogIndex(lastLogId.getIndex()) //
                    .setLastLogTerm(lastLogId.getTerm()) //
                    .build();

可以看到,这时候并没有将自己的currTerm设置为currTerm +1,只是在请求的时候发送了一个currTerm+1的值,这和实际选举的时候有差别,实际选举的时候首选会将currTerm++

我们看下其他节点收到这个请求是怎么处理的:

public Message handlePreVoteRequest(final RequestVoteRequest request) {
        boolean doUnlock = true;
        this.writeLock.lock();
        try {
            if (!this.state.isActive()) {
                LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
                return RpcFactoryHelper //
                    .responseFactory() //
                    .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
                        "Node %s is not in active state, state %s.", getNodeId(), this.state.name());
            }
            final PeerId candidateId = new PeerId();
            if (!candidateId.parse(request.getServerId())) {
                LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", getNodeId(),
                    request.getServerId());
                return RpcFactoryHelper //
                    .responseFactory() //
                    .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
                        "Parse candidateId failed: %s.", request.getServerId());
            }
            boolean granted = false;
            // noinspection ConstantConditions
            do {
                if (!this.conf.contains(candidateId)) {
                    LOG.warn("Node {} ignore PreVoteRequest from {} as it is not in conf <{}>.", getNodeId(),
                        request.getServerId(), this.conf);
                    break;
                }
                if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) {
                    LOG.info(
                        "Node {} ignore PreVoteRequest from {}, term={}, currTerm={}, because the leader {}'s lease is still valid.",
                        getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId);
                    break;
                }
                if (request.getTerm() < this.currTerm) {
                    LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
                        request.getServerId(), request.getTerm(), this.currTerm);
                    // A follower replicator may not be started when this node become leader, so we must check it.
                    checkReplicator(candidateId);
                    break;
                }
                // A follower replicator may not be started when this node become leader, so we must check it.
                // check replicator state
                checkReplicator(candidateId);

                doUnlock = false;
                this.writeLock.unlock();

                final LogId lastLogId = this.logManager.getLastLogId(true);

                doUnlock = true;
                this.writeLock.lock();
                final LogId requestLastLogId = new LogId(request.getLastLogIndex(), request.getLastLogTerm());
                granted = requestLastLogId.compareTo(lastLogId) >= 0;

                LOG.info(
                    "Node {} received PreVoteRequest from {}, term={}, currTerm={}, granted={}, requestLastLogId={}, lastLogId={}.",
                    getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, granted, requestLastLogId,
                    lastLogId);
            } while (false);

            return RequestVoteResponse.newBuilder() //
                .setTerm(this.currTerm) //
                .setGranted(granted) //
                .build();
        } finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
        }
    }

这里其他节点收到PreVoteRequest的时候,会进行如下判断:

  1. 如果当前节点的Leader节点依然活着,直接返回本次投票granted=false
  2. 如果请求preVote的term比当前节点term小,直接返回本次投票granted=false
  3. 如果请求的Log信息(index和term比当前小),直接返回本次投票granted=false
  4. 如果上面都不满足,返回granted=true

这是其他节点收到PreVoteRequest的处理,我们再看发起preVote节点收到其他节点的响应是怎么处理的:

public void handlePreVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {
        boolean doUnlock = true;
        this.writeLock.lock();
        try {
            if (this.state != State.STATE_FOLLOWER) {
                LOG.warn("Node {} received invalid PreVoteResponse from {}, state not in STATE_FOLLOWER but {}.",
                    getNodeId(), peerId, this.state);
                return;
            }
            if (term != this.currTerm) {
                LOG.warn("Node {} received invalid PreVoteResponse from {}, term={}, currTerm={}.", getNodeId(),
                    peerId, term, this.currTerm);
                return;
            }
            if (response.getTerm() > this.currTerm) {
                LOG.warn("Node {} received invalid PreVoteResponse from {}, term {}, expect={}.", getNodeId(), peerId,
                    response.getTerm(), this.currTerm);
                stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
                    "Raft node receives higher term pre_vote_response."));
                return;
            }
            LOG.info("Node {} received PreVoteResponse from {}, term={}, granted={}.", getNodeId(), peerId,
                response.getTerm(), response.getGranted());
            // check granted quorum?
            if (response.getGranted()) {
                this.prevVoteCtx.grant(peerId);
                if (this.prevVoteCtx.isGranted()) {
                    doUnlock = false;
                    electSelf();
                }
            }
        } finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
        }
    }

这里参数中term是投票之前节点的term,peerId是当前节点发送preVote节点的PeerId信息,我们看下其判断逻辑:

  1. 如果当前term和投票前的term不相等,则表明发生了新一轮投票,当前响应作废直接返回
  2. 如果响应中term(响应节点的term)比当前节点term大,表明响应节点比当前节点的投票轮次更高,直接返回
  3. 如果响应允许这次投票,即response.getGranted=true,判断本轮发起的投票同意是否过半。

在NodeImpl中有两个Ballot,一个是支持preVote的prevVoteCtx,一个是支持vote,在发起preVote的时候,会对prevVoteCtx进行初始化:

public boolean init(final Configuration conf, final Configuration oldConf) {
        this.peers.clear();
        this.oldPeers.clear();
        this.quorum = this.oldQuorum = 0;
        int index = 0;
        if (conf != null) {
            for (final PeerId peer : conf) {
                this.peers.add(new UnfoundPeerId(peer, index++, false));
            }
        }

        this.quorum = this.peers.size() / 2 + 1;
        if (oldConf == null) {
            return true;
        }
        index = 0;
        for (final PeerId peer : oldConf) {
            this.oldPeers.add(new UnfoundPeerId(peer, index++, false));
        }

        this.oldQuorum = this.oldPeers.size() / 2 + 1;
        return true;
    }

可以看到这里init对Ballot的法定人数quorum 设置是当前节点/2+1个。
这里我们在来看当preVote请求被同意的情况下是怎么判断是否需要发起选举,在handlePreVoteResponse的最后,会执行this.prevVoteCtx.grant(peerId);

public void grant(final PeerId peerId) {
        grant(peerId, new PosHint());
    }
public PosHint grant(final PeerId peerId, final PosHint hint) {
        UnfoundPeerId peer = findPeer(peerId, this.peers, hint.pos0);
        if (peer != null) {
            if (!peer.found) {
                peer.found = true;
                this.quorum--;
            }
            hint.pos0 = peer.index;
        } else {
            hint.pos0 = -1;
        }
        if (this.oldPeers.isEmpty()) {
            hint.pos1 = -1;
            return hint;
        }
        peer = findPeer(peerId, this.oldPeers, hint.pos1);
        if (peer != null) {
            if (!peer.found) {
                peer.found = true;
                this.oldQuorum--;
            }
            hint.pos1 = peer.index;
        } else {
            hint.pos1 = -1;
        }

        return hint;
    }

这里的判断逻辑很简单,响应的节点如果同意了这次投票,那么对应的投票信息Ballot法定人数quorum–,同时这里为了防止一个节点多次响应,标记每个节点只能响应一次。然后判断本次preVote投票是否过半:

public boolean isGranted() {
        return this.quorum <= 0 && this.oldQuorum <= 0;
    }

如果过半,开始正式选举electSelf:

private void electSelf() {
        long oldTerm;
        try {
            LOG.info("Node {} start vote and grant vote self, term={}.", getNodeId(), this.currTerm);
            if (!this.conf.contains(this.serverId)) {
                LOG.warn("Node {} can't do electSelf as it is not in {}.", getNodeId(), this.conf);
                return;
            }
            if (this.state == State.STATE_FOLLOWER) {
                LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm);
                this.electionTimer.stop();
            }
            resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT,
                "A follower's leader_id is reset to NULL as it begins to request_vote."));
            this.state = State.STATE_CANDIDATE;
            this.currTerm++;
            this.votedId = this.serverId.copy();
            LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm);
            this.voteTimer.start();
            this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
            oldTerm = this.currTerm;
        } finally {
            this.writeLock.unlock();
        }

        final LogId lastLogId = this.logManager.getLastLogId(true);

        this.writeLock.lock();
        try {
            // vote need defense ABA after unlock&writeLock
            if (oldTerm != this.currTerm) {
                LOG.warn("Node {} raise term {} when getLastLogId.", getNodeId(), this.currTerm);
                return;
            }
            for (final PeerId peer : this.conf.listPeers()) {
                if (peer.equals(this.serverId)) {
                    continue;
                }
                if (!this.rpcService.connect(peer.getEndpoint())) {
                    LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
                    continue;
                }
                final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this);
                done.request = RequestVoteRequest.newBuilder() //
                    .setPreVote(false) // It's not a pre-vote request.
                    .setGroupId(this.groupId) //
                    .setServerId(this.serverId.toString()) //
                    .setPeerId(peer.toString()) //
                    .setTerm(this.currTerm) //
                    .setLastLogIndex(lastLogId.getIndex()) //
                    .setLastLogTerm(lastLogId.getTerm()) //
                    .build();
                this.rpcService.requestVote(peer.getEndpoint(), done.request, done);
            }

            this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId);
            this.voteCtx.grant(this.serverId);
            if (this.voteCtx.isGranted()) {
                becomeLeader();
            }
        } finally {
            this.writeLock.unlock();
        }
    }

正式投票会进行如下操作:

  1. 当前currTerm++,voteTimer启动,voteCtx初始化
  2. 发送RequestVoteRequest请求,与preVote基本差不多,唯一区别PreVote=false

我们再看其他节点收到投票怎么处理的:

public Message handleRequestVoteRequest(final RequestVoteRequest request) {
        boolean doUnlock = true;
        this.writeLock.lock();
        try {
            if (!this.state.isActive()) {
                LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
                return RpcFactoryHelper //
                    .responseFactory() //
                    .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
                        "Node %s is not in active state, state %s.", getNodeId(), this.state.name());
            }
            final PeerId candidateId = new PeerId();
            if (!candidateId.parse(request.getServerId())) {
                LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(),
                    request.getServerId());
                return RpcFactoryHelper //
                    .responseFactory() //
                    .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
                        "Parse candidateId failed: %s.", request.getServerId());
            }

            // noinspection ConstantConditions
            do {
                // check term
                if (request.getTerm() >= this.currTerm) {
                    LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
                        request.getServerId(), request.getTerm(), this.currTerm);
                    // increase current term, change state to follower
                    if (request.getTerm() > this.currTerm) {
                        stepDown(request.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
                            "Raft node receives higher term RequestVoteRequest."));
                    }
                } else {
                    // ignore older term
                    LOG.info("Node {} ignore RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
                        request.getServerId(), request.getTerm(), this.currTerm);
                    break;
                }
                doUnlock = false;
                this.writeLock.unlock();

                final LogId lastLogId = this.logManager.getLastLogId(true);

                doUnlock = true;
                this.writeLock.lock();
                // vote need ABA check after unlock&writeLock
                if (request.getTerm() != this.currTerm) {
                    LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
                    break;
                }

                final boolean logIsOk = new LogId(request.getLastLogIndex(), request.getLastLogTerm())
                    .compareTo(lastLogId) >= 0;

                if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) {
                    stepDown(request.getTerm(), false, new Status(RaftError.EVOTEFORCANDIDATE,
                        "Raft node votes for some candidate, step down to restart election_timer."));
                    this.votedId = candidateId.copy();
                    this.metaStorage.setVotedFor(candidateId);
                }
            } while (false);

            return RequestVoteResponse.newBuilder() //
                .setTerm(this.currTerm) //
                .setGranted(request.getTerm() == this.currTerm && candidateId.equals(this.votedId)) //
                .build();
        } finally {
            if (doUnlock) {
                this.writeLock.unlock();
            }
        }
    }

收到投票请求节点的处理与preVote请求的处理逻辑上差不多:

  1. 判断请求的term和当前term大小,比请求的term大,返回Granted=false
  2. 判读当前log的位置是否比请求位置小,如果小证明发起请求节点数据位置比当前节点新,如果当前节点没有投票给其他节点,那么设置当前节点term为请求节点的term,同时将当前节点的投票votedId设置为请求节点,表名当前节点将选票投给了请求节点,当前节点会执行stepDown操作,不会进行选举,节点变为STATE_FOLLOWER,同时开启electionTimer定时器
  3. 返回判断当前节点term是否等于请求term且当前节点的选票ID和请求节点是否一致,如果满足上面两个条件,表明当前节点将票投给了请求节点

接下来看请求节点收到响应节点的响应是如何处理的:

public void handleRequestVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {
        this.writeLock.lock();
        try {
            if (this.state != State.STATE_CANDIDATE) {
                LOG.warn("Node {} received invalid RequestVoteResponse from {}, state not in STATE_CANDIDATE but {}.",
                    getNodeId(), peerId, this.state);
                return;
            }
            // check stale term
            if (term != this.currTerm) {
                LOG.warn("Node {} received stale RequestVoteResponse from {}, term={}, currTerm={}.", getNodeId(),
                    peerId, term, this.currTerm);
                return;
            }
            // check response term
            if (response.getTerm() > this.currTerm) {
                LOG.warn("Node {} received invalid RequestVoteResponse from {}, term={}, expect={}.", getNodeId(),
                    peerId, response.getTerm(), this.currTerm);
                stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
                    "Raft node receives higher term request_vote_response."));
                return;
            }
            // check granted quorum?
            if (response.getGranted()) {
                this.voteCtx.grant(peerId);
                if (this.voteCtx.isGranted()) {
                    becomeLeader();
                }
            }
        } finally {
            this.writeLock.unlock();
        }
    }

大致逻辑如下:

  1. 判断响应的reponse的term和当前节点发起投票前的term是否一致,如果不一致,直接返回
  2. 判断响应的reposen的term是否比当前节点发起投票前的term大,如果满足,直接返回
  3. 判断当前节点已经获取的选票是否过半,如果过半,将当前节点晋升为Leader节点,执行becomeLeader逻辑

becomeLeader使当前节点晋升为Leader节点,我们看看其实现:

private void becomeLeader() {
        Requires.requireTrue(this.state == State.STATE_CANDIDATE, "Illegal state: " + this.state);
        LOG.info("Node {} become leader of group, term={}, conf={}, oldConf={}.", getNodeId(), this.currTerm,
            this.conf.getConf(), this.conf.getOldConf());
        // cancel candidate vote timer
        stopVoteTimer();
        this.state = State.STATE_LEADER;
        this.leaderId = this.serverId.copy();
        this.replicatorGroup.resetTerm(this.currTerm);
        // Start follower's replicators
        for (final PeerId peer : this.conf.listPeers()) {
            if (peer.equals(this.serverId)) {
                continue;
            }
            LOG.debug("Node {} add a replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
            if (!this.replicatorGroup.addReplicator(peer)) {
                LOG.error("Fail to add a replicator, peer={}.", peer);
            }
        }

        // Start learner's replicators
        for (final PeerId peer : this.conf.listLearners()) {
            LOG.debug("Node {} add a learner replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
            if (!this.replicatorGroup.addReplicator(peer, ReplicatorType.Learner)) {
                LOG.error("Fail to add a learner replicator, peer={}.", peer);
            }
        }

        // init commit manager
        this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1);
        // Register _conf_ctx to reject configuration changing before the first log
        // is committed.
        if (this.confCtx.isBusy()) {
            throw new IllegalStateException();
        }
        this.confCtx.flush(this.conf.getConf(), this.conf.getOldConf());
        this.stepDownTimer.start();
    }

主要逻辑为:

  1. 停止当前投票定时器
  2. 将所有非当前节点、角色为Follower的节点加入到复制组里面去
  3. 将所有角色为Learner的节点加入到复制组里面去
  4. 重置ballotBox(用来管理选票的)
  5. stepDownTimer启动

这样Leader节点就被选出来。

我们在看看如何写入数据的。

数据写入和复制

客户端通过RouteTable.getInstance().selectLeader(groupId)能够获取当前分组下的Leader节点信息,拼接待写入数据的Request对象,然后通过CliClientServiceImpl..getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {}能够向集群Leader节点进行数据写入。

服务端则通过对应的RpcProcessor来处理写入的请求,获取到请求后取出数据部分封装成Task,然后通过Node.apply将task写入到node中去。
而Node.apply只是将Task写入到了Disruptor的RingBuffer中去,如果对这块有疑问,可以看看这篇文章高性能队列Disruptor使用入门,原理和代码实现

数据写入的时候,首先会将task转换成LogEntryAndClosure,同时会将Task.done相关信息放入到BallotBox的pendingMetaQueue和closureQueue队列中去(当数据写入完成之后会通过这两个queue取出task对应的done执行),然后将一批LogEntryAndClosure通过logManagerj将数据持久化写入。
这里Node.apply传入的是一个Task类型:

public class Task implements Serializable {
    private static final long serialVersionUID = 2971309899898274575L;
    private ByteBuffer        data             = LogEntry.EMPTY_DATA;
    private Closure           done;
    private long              expectedTerm     = -1;
}

这里的data就是我们写入的数据,而Closure done则是一个回调接口,当数据被写入到集群1/2+1节点成功之后会调用Closure.run(final Status status)方法。

而NodeImpl中在写入Task是写入到了RingBuffer中,实际处理在LogEntryAndClosureHandler中:

public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch)
                                                                                                          throws Exception {
            if (event.shutdownLatch != null) {
                if (!this.tasks.isEmpty()) {
                    executeApplyingTasks(this.tasks);
                    reset();
                }
                final int num = GLOBAL_NUM_NODES.decrementAndGet();
                event.shutdownLatch.countDown();
                return;
            }
            this.tasks.add(event);
            if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
                executeApplyingTasks(this.tasks);
                reset();
            }
        }

最终在executeApplyingTasks进行实际写入,这块代码有点长,就不贴代码了,大概描述下:

  1. 首先判断当前节点是不是Leader节点,如果不是的话,Closure.run(错误状态)返回
  2. task的term和当前节点term不一致,同上,返回
  3. 调用BallotBoxappendPendingTask,这个逻辑需要注意下:
public boolean appendPendingTask(final Configuration conf, final Configuration oldConf, final Closure done) {
		// 每个写入Task都生成一个Ballot ,并放入pendingMetaQueue,后续其他
        final Ballot bl = new Ballot();
        if (!bl.init(conf, oldConf)) {
            LOG.error("Fail to init ballot.");
            return false;
        }
        final long stamp = this.stampedLock.writeLock();
        try {
            if (this.pendingIndex <= 0) {
                LOG.error("Fail to appendingTask, pendingIndex={}.", this.pendingIndex);
                return false;
            }
            this.pendingMetaQueue.add(bl);
            this.closureQueue.appendPendingClosure(done);
            return true;
        } finally {
            this.stampedLock.unlockWrite(stamp);
        }
    }

这里为每个写入任务都生成了一个Ballot,还记得上面选主投票的时候,用的也是这个来标记选举是否过半,这里也是一样,后续其他节点复制Leader该Task的数据的时候,会对应更新Leader中对应该Task的Ballot 投票信息,通过该Ballot 能够判断集群是否有过半节点已经完成了该Task的写入。同时也将Task写入集群过半节点成功之后的回调入口Closure 保存在closureQueue中,当其他节点写入Task成功更新对应Task的Ballot 的时候,会判断是否过半节点写入成功,如果成功则会回调对应Task的Closure的run方法。
4. 调用logManager.appendEntries将数据写入
5. 本地节点写入完成之后,回调LeaderStableClosure接口,逻辑为:

public void run(final Status status) {
            if (status.isOk()) {
                NodeImpl.this.ballotBox.commitAt(this.firstLogIndex, this.firstLogIndex + this.nEntries - 1,
                    NodeImpl.this.serverId);
            } else {
                    this.firstLogIndex + this.nEntries - 1, status);
            }
        }

而ballotBox.commitAt的逻辑如下:

public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final PeerId peer) {
        final long stamp = this.stampedLock.writeLock();
        long lastCommittedIndex = 0;
        try {
            if (this.pendingIndex == 0) {
                return false;
            }
            if (lastLogIndex < this.pendingIndex) {
                return true;
            }
            final long startAt = Math.max(this.pendingIndex, firstLogIndex);
            Ballot.PosHint hint = new Ballot.PosHint();
            for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) {
                final Ballot bl = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex));
                hint = bl.grant(peer, hint);
                if (bl.isGranted()) {
                    lastCommittedIndex = logIndex;
                }
            }
            if (lastCommittedIndex == 0) {
                return true;
            }
   
            this.pendingMetaQueue.removeFromFirst((int) (lastCommittedIndex - this.pendingIndex) + 1);
            this.pendingIndex = lastCommittedIndex + 1;
            this.lastCommittedIndex = lastCommittedIndex;
        } finally {
            this.stampedLock.unlockWrite(stamp);
        }
        this.waiter.onCommitted(lastCommittedIndex);
        return true;
    }

这里可以看到,就是每个节点写入成功后,调用Leader节点的.ballotBox.commitAt,更新对应写入数据的投票信息,如果bl.isGranted,即完成了过半节点的写入,那么会调用this.waiter.onCommitted逻辑,这里最终会调用到StateMachineAdapter.onApply方法。
6. 在节点成为Leader的时候,会初始化日志复制组:this.replicatorGroup.addReplicator(peer),对于集群中的每个除当前节点的节点都会启动一个Replicator进行复制同时会开启心跳超时定时器,开始的时候首先会发送一个空的EmptyEntries给到Follower,获取Follower节点的最新日志位置,获取到Follower节点的最新日志位置之后,会再次发送需要同步的日志
7. 在Replicator中对Follower的响应进行处理onAppendEntriesReturned,如果Follower写入成功,会调用Node.BallotBox().commitAt 这里和步骤5处理一样

这样就完成了数据的写入和日志的复制。

可以看到jraft中的日志复制就是Leader向Follower节点发送数据然后Follower将发送的日志写入到本地。

 类似资料: