当前位置: 首页 > 工具软件 > Follower > 使用案例 >

Zookeeper源码解析-Follower节点处理客户端请求全过程分析

林昱
2023-12-01

前言:

前两篇文章分别分析了Leader处理客户端非事务请求、事务请求的处理过程。最后我们来分析下Follower节点处理客户端请求的不同之处。

过程与Leader处理的过程基本差不多,所以相似的地方笔者就简略带过,重点分析不同之处。

1.FollowerZookeeperServer请求处理链

同样的,我们先从其构造上来分析下其处理链

public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true,
                getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this,
                new SendAckRequestProcessor((Learner)getFollower()));
        syncProcessor.start();
    }
}

所以,其处理链为FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor

同时还有一个SyncRequestProcessor响应leader的proposal,后续我们详细分析

2.FollowerRequestProcessor

public class FollowerRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
    LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
    
    // 从客户端获取的请求全部存入queuedRequests,后续通过run()方法调用执行
	public void processRequest(Request request) {
        if (!finished) {
            queuedRequests.add(request);
        }
    }
    
    public void run() {
        try {
            while (!finished) {
                Request request = queuedRequests.take();
                ...
                // 请求交由下一个processor(commitProcessor)处理    
                nextProcessor.processRequest(request);
                
                switch (request.type) {
                case OpCode.sync:
                    zks.pendingSyncs.add(request);
                    zks.getFollower().request(request);
                    break;
                case OpCode.create:
                case OpCode.delete:
                case OpCode.setData:
                case OpCode.setACL:
                case OpCode.createSession:
                case OpCode.closeSession:
                case OpCode.multi:
                    // 有关于事务类型请求,直接交由leader处理,具体见2.1     
                    zks.getFollower().request(request);
                    break;
                }
            }
        } catch (Exception e) {
            handleException(this.getName(), e);
        }
        LOG.info("FollowerRequestProcessor exited loop!");
    }
}

2.1 follower转发事务请求到leader

public class Learner {  
	void request(Request request) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream oa = new DataOutputStream(baos);
        oa.writeLong(request.sessionId);
        oa.writeInt(request.cxid);
        oa.writeInt(request.type);
        if (request.request != null) {
            request.request.rewind();
            int len = request.request.remaining();
            byte b[] = new byte[len];
            request.request.get(b);
            request.request.rewind();
            oa.write(b);
        }
        oa.close();
        QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
                .toByteArray(), request.authInfo);
        writePacket(qp, true);
    }
}

FollowerRequestProcessor将事务请求交由leader处理;同时继续将请求交由commitProcessor来处理;

我们来回忆下前文中Leader节点处理事务请求的过程:针对事务请求,leader生成proposal信息到各follower,follower处理完成后返回ack,leader接收到足够的ack后再次向各follower发送commit信息。

那么这个过程中,我们的follower相关处理在哪里呢?这时候又要回到FollowerServer启动的时候了,如下。

2.2 Follower.followLeader() follower启动

public class Follower extends Learner{
    void followLeader() throws InterruptedException {
        ...
        try {
            QuorumServer leaderServer = findLeader();            
            try {
                // 创建与leader连接
                connectToLeader(leaderServer.addr, leaderServer.hostname);
                // 将当前节点信息注册到leader上
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                // 与leader进行数据同步
                syncWithLeader(newEpochZxid);                
                QuorumPacket qp = new QuorumPacket();
                while (this.isRunning()) {
                    // 接收leader请求包,并进行处理
                    readPacket(qp);
                    processPacket(qp);
                }
            } ...
    }
        
    protected void processPacket(QuorumPacket qp) throws IOException{
        switch (qp.getType()) {
        // 接收到leader发送过来的proposal        
        case Leader.PROPOSAL:            
            TxnHeader hdr = new TxnHeader();
            Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
            if (hdr.getZxid() != lastQueued + 1) {
                LOG.warn("Got zxid 0x"
                        + Long.toHexString(hdr.getZxid())
                        + " expected 0x"
                        + Long.toHexString(lastQueued + 1));
            }
            lastQueued = hdr.getZxid();
            // 进行事务日志处理,具体见2.2.1
            fzk.logRequest(hdr, txn);
            break;
        case Leader.COMMIT:
            // 当leader收集到足够的ack后,向各follower发送commit,具体见2.2.2    
            fzk.commit(qp.getZxid());
            break;
        ...
        }
    }    

}

2.2.1 FollowerZooKeeperServer.logRequest() 创建事务请求日志

public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
	public void logRequest(TxnHeader hdr, Record txn) {
        Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
                hdr.getType(), null, null);
        request.hdr = hdr;
        request.txn = txn;
        request.zxid = hdr.getZxid();
        if ((request.zxid & 0xffffffffL) != 0) {
            pendingTxns.add(request);
        }
        // 直接交由syncProcessor处理
        syncProcessor.processRequest(request);
    }
}

SyncProcessor的作用我们都知道,就是将当前请求进行事务日志保存。

而事务日志保存完成后,则直接交由SendAckRequestProcessor来处理

2.2.2 SendAckRequestProcessor 返回leader ack响应

public class SendAckRequestProcessor implements RequestProcessor, Flushable {
    public void processRequest(Request si) {
        if(si.type != OpCode.sync){
            // 直接返回leader ack响应包
            QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
                null);
            try {
                learner.writePacket(qp, false);
            } catch (IOException e) {
               ...
            }
        }
    }
}

2.2.3 FollowerZooKeeperServer.commit() 提交事务proposal

public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
	public void commit(long zxid) {
        if (pendingTxns.size() == 0) {
            LOG.warn("Committing " + Long.toHexString(zxid)
                    + " without seeing txn");
            return;
        }
        long firstElementZxid = pendingTxns.element().zxid;
        if (firstElementZxid != zxid) {
            LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
                    + " but next pending txn 0x"
                    + Long.toHexString(firstElementZxid));
            System.exit(12);
        }
        Request request = pendingTxns.remove();
        // 最终交由commitProcessor处理,详见3.1
        commitProcessor.commit(request);
    }
}

总结:关于这块的处理,读者可以对照着Leader处理事务请求的过程来比对着看。

Follower关于事务请求还是分为两部分:

    接收leader proposal请求,记录事务日志后,返回ack响应;

    接收leader commit请求,将请求交由CommitProcessor处理;

3.CommitProcessor

3.1 CommitProcessor.commit() 提交leader事务proposal

public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
	synchronized public void commit(Request request) {
        if (!finished) {
            if (request == null) {
                LOG.warn("Committed a null!",
                         new Exception("committing a null! "));
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Committing request:: " + request);
            }
            // 很简单,直接将请求放入committedRequests
            committedRequests.add(request);
            notifyAll();
        }
    }
}

follower提交事务proposal的方式很简单,就是将请求放入committedRequests集合中,依据我们之前Leader节点对CommitProcessor的分析,在如下

3.2 CommitProcessor.run() 处理请求

public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
 
    // leader获取的请求集合
    LinkedList<Request> queuedRequests = new LinkedList<Request>();
    // 已经被follower 提交的请求集合
    LinkedList<Request> committedRequests = new LinkedList<Request>();
    
    public void run() {
        try {
            Request nextPending = null;            
            while (!finished) {
                int len = toProcess.size();
                for (int i = 0; i < len; i++) {
                    // 5.请求proposal已完成,交由下个processor处理即可
                    nextProcessor.processRequest(toProcess.get(i));
                }
                toProcess.clear();
                synchronized (this) {
                    // 2.若没有收到请求且没有收到leader的commit请求,则等待
                    if ((queuedRequests.size() == 0 || nextPending != null)
                            && committedRequests.size() == 0) {
                        wait();
                        continue;
                    }
                    // 3.committedRequests不为空,说明当前follower已经接受到leader的commit请求
                    if ((queuedRequests.size() == 0 || nextPending != null)
                            && committedRequests.size() > 0) {
                        Request r = committedRequests.remove();
                        if (nextPending != null
                                && nextPending.sessionId == r.sessionId
                                && nextPending.cxid == r.cxid) {
                            nextPending.hdr = r.hdr;
                            nextPending.txn = r.txn;
                            nextPending.zxid = r.zxid;
                            // 4.本次请求可以提交给下个processor处理
                            toProcess.add(nextPending);
                            nextPending = null;
                        } else {
                            // this request came from someone else so just
                            // send the commit packet
                            toProcess.add(r);
                        }
                    }
                }

                // We haven't matched the pending requests, so go back to
                // waiting
                if (nextPending != null) {
                    continue;
                }

                // 1.请求达到时,nextPending被设置为当前request,下次循环时会使用到
                synchronized (this) {
                    // Process the next requests in the queuedRequests
                    while (nextPending == null && queuedRequests.size() > 0) {
                        Request request = queuedRequests.remove();
                        switch (request.type) {
                        case OpCode.create:
                        case OpCode.delete:
                        case OpCode.setData:
                        case OpCode.multi:
                        case OpCode.setACL:
                        case OpCode.createSession:
                        case OpCode.closeSession:
                            nextPending = request;
                            break;
                        case OpCode.sync:
                            if (matchSyncs) {
                                nextPending = request;
                            } else {
                                toProcess.add(request);
                            }
                            break;
                        default:
                            toProcess.add(request);
                        }
                    }
                }
            }
        } catch (InterruptedException e) {
            LOG.warn("Interrupted exception while waiting", e);
        } catch (Throwable e) {
            LOG.error("Unexpected exception causing CommitProcessor to exit", e);
        }
        LOG.info("CommitProcessor exited loop!");
    }
}

与leader中CommitProcessor的处理类似,读者可以按照上面数字排序来分析整个过程

4.FinalRequestProcessor

最终都是交由FinalRequestProcessor来处理,这块我们已经分析过很多次了,不再赘述。

总结:

这里通过分析Follower节点处理请求(事务请求)的过程,可以了解到:Follower本身并不处理事务请求,而是直接转发给leader来处理;

但是follower会配合leader进行proposal的处理,最终将节点信息添加到当前ZKDatabase。

 类似资料: