Follower是 follower节点启动的和leader进行同步的功能类
主要逻辑如下:
1.和leader建立链接
2.向leader发送自己的epoch和zxid.
void observeLeader() throws InterruptedException { zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean); try { InetSocketAddress addr = findLeader(); LOG.info("Observing " + addr); try {
//建立连接 connectToLeader(addr);
//向leader发送自己的epoch和zxid,通过发送类型为Leader.OBSERVERINFO的packet,并阻塞到收到 long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
//和leader执行日志同步,并接收UPTODATE命令开始接收client的连接请求 syncWithLeader(newLeaderZxid); QuorumPacket qp = new QuorumPacket(); while (this.isRunning()) { readPacket(qp);
//读取Packet并处理 processPacket(qp); } } catch (Exception e) { LOG.warn("Exception when observing the leader", e); try { sock.close(); } catch (IOException e1) { e1.printStackTrace(); } // clear pending revalidations pendingRevalidations.clear(); } } finally { zk.unregisterJMX(this); } }
处理来自leader的数据包
protected void processPacket(QuorumPacket qp) throws IOException{ switch (qp.getType()) { case Leader.PING: //心跳 ping(qp); break; case Leader.PROPOSAL: //写入提议 TxnHeader hdr = new TxnHeader(); Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr); if (hdr.getZxid() != lastQueued + 1) { LOG.warn("Got zxid 0x" + Long.toHexString(hdr.getZxid()) + " expected 0x" + Long.toHexString(lastQueued + 1)); } lastQueued = hdr.getZxid(); fzk.logRequest(hdr, txn); break; case Leader.COMMIT: //提交提议 fzk.commit(qp.getZxid()); break; case Leader.UPTODATE: //此时follower已经在上面的syncWithLeader执行过了,所以忽略. LOG.error("Received an UPTODATE message after Follower started"); break; case Leader.REVALIDATE: revalidate(qp); break; case Leader.SYNC: fzk.sync(); break; } }