QuorumPeer一台ZooKeeper服务器的抽象,其中有很多组件,网络环境、工作组件、成员变量、线程、数据库等。
1、ServerCnxnFactory是一个Runnable实现,默认实现是:NIOServerCnxnFactory,内部启动了一个线程,启动了一个NIO服务端,监听了2181端口,等待客户端发送链接请求过来,然后创建一个ServerCnxn,负责完成该客户端的读写请求。
2、QuorumCnxManager是一个选举过程中的链接管理器,内部启动了一个BIO服务端,它监听了3888选举端口,等待其他客户端发送选举链接请求过来建立选举选票交换。
3、ZKDataBase是ZK数据库,每个QuorumPeer代表一台ZK物理机,每个QuorumPeer内部都有一个ZKDatabase对象,意味着,每个ZK节点都保存了该集群的所有数据。
4、内部还保存了一些跟选举有关的信息,比如myid、vote、Election选举算法实现等也会保存,从zoo.cfg中解析得到的其他Server的信息。
选举、冷启动恢复都是在QuorumPeer对象中的工作机制。
QuorumPeer重要的事是,进入while循环,ZAB工作模式。通过while循环+switch case表示。
while循环核心逻辑,根据当前服务器状态决定调用哪个方法来执行。所有服务器一开始上线都为LOOKING状态,即执行setCurrentVote(makeLEStrategy().lookForLeader()),发起选举的入口;当这个方法结束后,推举得到一个leader,此leader没有得到承认,还需执行认同和状态同步,因此每台服务器变更自己状态:变成observer,follower,一个服务器变为leader。
ELECTION:开始选举,当前server进入找Leader状态:Vote currentVote = lookForLeader()
DISCORY:当选举得出了结果,开始进入发现认同阶段,当超过半数Follower认同该Leader,意味着选举真正结束
SYNCHRONIZATION:经过确认,有超过半数节点都同意刚推举出来的Leader节点
BROADCAST:当有超过半数的Follower完成了和Leader的状态同步进入消息广播状态,正常对外提供服务
启动入口QuorumPeerMain
QuorumPeerMain.java
-main.initilizeAndRun(args)
-runFromConfig
-quorumPeer.start();//线程方法启动
//进入QuorumPeer.java方法
QuorumPeer.java
-run()
//run方法中核心while循环
-leader.lead()
//进入Leader.java方法
-follower.followLeader()
//进入Follower.java方法
Leader.java
-lead()
-self.setZabState(QuorumPeer.ZabState.DISCOVERY);
//进入DISCOVERY状态
-zk.loadData();
-cnxAcceptor = new LearnerCnxAcceptor()
//启动线程,此线程再启动其他线程,启动LeaderCnxAcceptorHandler线程
//LeaderCnxAcceptorHandler线程启动BIO ServerSocket,绑定2888端口,阻塞再accept方法上,accept如果接收到一个follower请求的话,follower就会和leader去连接,启动LeaderHandler线程专门给Follower服务
-newLeaderProposal.packet = new QuorumPacket(NEWLEADER,zk.getZxid(),null,null);
//准备线程包,NEWLEADER
-waitForEpochAck(self.getId(),leaderStateSummary)
//等待DISCOVERY结束,相当于leader被选出来了,半数已投出
-self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
//进行状态同步
-waitForNewLeaderAck(self.getId(),zk.getZxid());
//等待状态退出
Follower.java
LearnerHandler是一个线程
Follower中会启动一个线程LeaderConnector,LeaderConnector会创建Socket BIO 客户端(即socket对象)去链接ServerSocket BIO服务端,Leader中会专门创建LearnerHandler线程为Follower服务。
Leader和Follower之间进行状态同步,10个步骤交互:(逻辑上)
1、Follower->LearnerHandler:FOLLOWERINFO(server.id,acceptedepoch)
2、LearnerHandler->Follower:LEADERINFO(new epoch)
3、Follower->LearnerHandler:ACKEPOCH(zxid,currentEpoch)
4、LearnerHandler->Follower:DIFF/SNAP/TRUNC
5、LearnerHandler->Follower:待同步的提案数据(Proposal)
6、LearnerHandler->Follower:COMMIT(提交)
7、LearnerHandler->Follower:NEWLEADER
8、Follower->LearnerHandler:ACK
9、LearnerHandler->Follower:UPTODATE(广播)
10、Follower->LearnerHandler:ACK
当知道Leader后,所有Follower会作为客户端主动去连接Leader。
第一:1和2,用于计算选举新的epoch值,然后Leader通知该值给Follower
第二:3和4,Leader在收到超过半数的ACKEPOCH之后,开始给每个Follower,计算出何种的同步方式
第三:4和5和6,Follower根据Leader计算出来的同步方式,完成和Leader的数据同步
第四:7,Leader表示已完成和该Follower的同步
第五:8,Follower表示接收到Leader发送过来的NEWLEADER了
第六:9,Leader在确认集群中有超过半数Follower完成同步之后,就会给完成同步的Follower发送UPTODATE消息,表示可以对外提供服务了
第七:10,Follower收到Leader发送过来的UPTODATE消息,表示已经接收到这个通知了,开始对外提供服务了
关于同步过程中两种方式:
快照同步:Leader直接把最新的快照文件(这个快照文件必然包含了Leader的所有数据)直接通过网络发送给Follower
差异化同步:在确认同步方式的时候,如果得到的结果不是快照同步,则同时把要同步的数据,变成(Proposal+Commit消息放到队列中),队列中的数据形态:DIFF+Proposal+Commit+Proposal+Commit…+Proposal+Commit+NEWLEADER,当Follower接收到NEWLEADER消息的时候,意味着Follower已经接收到了需要同步的所有数据。
标准10步骤:
1.Follower发送FOLLOWERINFO消息给Leader,信息中包含Follower的AcceptedEpoch
2.Leader在接收到Follower的Follower的FOLLOWERINFO消息的时候,返回一个LEADERINFO消息给Follower,信息中包含Leader的AcceptedEpoch
3.Follower给Leader返回一个ACKEPOCH消息,表示已经接收到Leader的AcceptedEpoch了。Leader需要等待有超过半数的Follower发送回来ACKEPOCH消息,表示集群中,有超过半数节点在追随相同的Leader节点,则选举结束。开始进入同步阶段
4.Leader根据Follower发送过来的epoch信息给Follower计算同步方式,同步方式有可能是DIFF,SNAP,TRUNC的其中之一。计算得到的同步方式消息放入到LearnerHandler的queuedPackets队列中,跟后面计算出来的待同步的分布式事务日志一起执行发送。
5.如果同步方式是DIFF,则获取到需要同步的分布式事务的PROPOSAL和COMMIT日志,也放入LearnerHandler的queuedPackets队列中,如果同步方式是SNAP,则先写入一个SNAP消息给Follower,然后把快照文件发送过给Follower进行快照同步
6.通过startSendingPackets()方法启动一个匿名线程执行LearnerHandler的queuedPackets队列中的数据包发送
7.当该同步的数据(queuedPackets队列中的PROPOSAL和COMMIT日志,或者SNAP方式的快照文件)都发送完毕之后,Leader给Follower发送一个NEWLEADER消息表示所有待同步数据已经发送完毕
8.Follower在接收到Leader发送过来的NEWLEADER消息,就必然得知要和Leader进行同步的日志数据,都已经发送过来了,Follower也执行成功了,则可以给Leader发送过一个ACK反馈
9.Leader需要等待集群中,有超过半数的节点发送ACK反馈过来,如此,集群启动成功,Leader给发送过了ACK的Follower Server发送一个UPTODATE的消息表示集群已经启动成功。Leader和Follower都启动各自的一些必备基础服务可以开始对外提供服务了。
10.Follower接收到Leader的UPTODATE消息,即表示集群启动正常,Follower可以正常对外提供服务,Follower再给Leader返回一个ACK
注意:
Leader和Follower之间是有心跳的,如果维持心跳的节点数不超过集群半数节点了,则集群不能正常对外提供服务了,全部进入LOOKING状态。
Leader通过syncFollower()方法来计算和Follower的同步方式。
Leader和Follower再集群正常启动成功之后,需要启动一些基础服务,比如SessionTracker和RequestProcessor等
Leader.java
-lead()
-self.setZabState(QuorumPeer.ZabState.DISCOVERY);
-ZabState
//进入QuorumPeer.java
-zk.loadData();
//进入ZooKeeperServer.java
-cnxAcceptor = new LearnerCnxAcceptor();
-run()
-serverSockets.forEach(serverSocket->executor.submit(new LearnerCnxAcceptorHandler(serverSocket,latch)));
-run()//LearnerCnxAcceptorHandler线程的
-while(!stop.get())
-acceptConnections();
//这个方法内部有一个是阻塞的,serverSocket.accept();
-socket = serverSocket.accept();
//当Follower节点作为Socket客户端发送连接请求给Leader的时,这句代码返回,意味着Follower和Leader建立了连接
//阻塞在这里服务端,然后去看客户端Follower.java
-LearnerHandler fh = new LearnerHandler(socket,is,Leader.this);
//此线程专门为Follower提供服务
//进入LearnHandler.java
-run()
-waitForEpochAck
-QuorumVerifier verifier = self.getQuorumVerifier();
//少数服从多数的算法
if(electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) {
electionFinished = true;
electionFollowers.notifyAll();
}
//self.getId()为发送ACKEPOCH消息过来的那个Follower
//1.这个follower是否有选举权
//2.得到的ackepoch是否数量超过集群总节点数的半数
LearnerHandler.java
-run()
-QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp,"packet");
//创建数据包,将数据读到数据包中,qp就是FOLLOWERINFO
-long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
-long zxid = qp.getZxid();
-long newEpoch = learnerMaster.getEpochToPropose(this.getSid(),lastAcceptedEpoch);
-long newLeaderZxid = ZxidUtils.makeZxid(newEpoch,0);
//准备leader给follower发送的数据
-QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO,newLeaderZxid,ver,null);
//leader发给follower的LEADERINFO信息
-oa.writeRecord(newEpochPacket,"packet");
//leader写数据,follower那边就是读数据
-learnerMaster.waitForEpochAck(this.getSid(),ss);
//线程会阻塞这里
//把对方服务器的信息ss,加入leaderMaster来判断集群是否超过半数
//进入Leader.java
-boolean needSnap = syncFollower(peerLastZxid,learnerMaster);
//1.计算同步方式
-if(forceSnapSync)
//默认为fasle
-else if(lastProcessedZxid == peerLastZxid)
//如果follower的zxid和leader的一样,则发送DIFF
//lastProcessedZxid是leader的最大zxid
//peerLastZxid是follower的zxid
-else if(peerLastZxid > maxCommittedLog && !isPeerEpochZxid)
//如果follower的zxid大于leader的最大zxid,则发送trunc
-else if(maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid))
//如果follower的zxid在minCommittedLog和maxCommittedLog之间,发送DIFF进行日志同步
-else if(peerLastZxid < minCommittedLog && txnLogSyncEnabled)
//如果follower的zxid比minCommittedLog还小,则同步日志
-return needSnap;
//重点是needOpPacket的赋值
//返回为true,表示通过快照方式同步
//返回为false,表示通过DIFF或TRUNC方式执行同步
-if(needSnap){}
//2.执行快照方式同步,则queuePackets队列必然为空
//3.启动一个线程来消费queuedPackets,执行队列中的消息真正发送(发送给Follower)
-oa.writeRecord(new QuorumPacket(Leader.SNAP,zxidToSend,null,null),"packet");
//发送SNAP消息给Follower
-learnerMaster.getZKDatabase().serializeSnapshot(oa);
//序列化zkdatabase到follower
//进入ZKDatabase.java
-startSendingPackets();
//启动一个线程,开始queuePackets队列中的消息消费
-sendPackets();
-p = queuePackets.poll();
-oa.writeRecord(p,"packet");
-ia.readRecord(qp,"packet");
//读取第8步,follower返回给leader的ack信息
-learnerMaster.waitForNewLeaderAck(getSid(),qp.getZxid());
//等待Follower返回ack消息同步完后,进行反馈
-queuedPackets.add(new QuorumPacket(Leader.UPTODATE,-1,null,null));
//当Leader确认集群启动成功之后,给所有发送了ACK的Follower发回一个UPTODATE消息
//只要queuePackets队列中添加了UPTODATE消息,马上发给follower
-while(true)
//用来接收follower发送过来的消息
-case Leaer.REQUEST:
//Follower转发过来的事务请求
-if(type == OpCode.sync)
//同步请求,ZK不保证客户端一定能拿到最新数据,所以客户端调用一个sync操作,保持最新数据
//ZooKeeper.java中的sync方法可以保证最新数据
-si = new Request(null,sessionId,cxid,type,bb,qp.getAuthinfo());
//事务请求
ZKDatabase.java
-serializeSnapshot
-SerializeUtils.serializedSnapshot(getDataTree(),oa,getSessionWithTimeOuts());
//进入SerializeUtils.java
SerializeUtils.java
-serializeSnapshot
//遍历datatree中的数据进行序列化
ZooKeeperServer.java
-loadData()
-if(zkDb.isInitialized())
//如果初始化了,直接获取最大zxid
-setZxid(zkDb.getDataTreeLastProcessedZxid());
-else
//如果没有初始化,先去初始化
-setZxid(zkDb.loadDatabase());
//start:进入空间换时间,降低复杂度环节
List<Long> deadSessions = new ArrayList<>();
for(Long session:zkDb.getSessions()) {
if(zkDb.getSessionWithTimeOuts().get(session)==null) {
deadSessions.add(session);
}
}
for(long session: deadSessions) {
killSession,zkDb.getDataTreeLastProcessedZxid());
}
//end:获取超时session,并杀死
-takeSnapShot();
//1、save 执行快照
//2、restore 反序列化
-txnLogFactory.save(zkDb.getDataTree(),zkDb.getSessionWithTimeOuts(),syncSnap);
//进入FileTxnSnapLog.java
FileTxnSnapLog.java
-save
-File snapshotFile = new File(snapDir,Util.makeSnapshotName(lastZxid));
//生成快照文件名,snapshot.b0000000002
-snapLog.serialize(dataTree,sessionWithTimeouts,snapshotFile,syncSnap);
//拍摄快照,进入FileSnap.java
FileSnap.java
-serialize
-FileHeader header = new FileHeader(SNAP_MAGIC,VERSION,dbId);
//构建快照文件头
-serialize(dt,sessions,oa,header);
-header.serialize(oa,"fileheader");
-SerializeUtils.serializeSnapshot(dt,oa,sessions);
//序列化session,进入SerializeUtils.java
SerializeUtils.java
-serializeSnapshot
-oa.writeInt(sessSnap.size(),"count");
-for (Entry<Long,Integer> entry : sessSnap.entrySet()) {
oa.writeLong(entry.getKey().longValue(),"id");
oa.writeLong(etnry.getValue().intValut(),"timeout");
}
-dt.serialize(oa,"tree");
//进入DataTree.java
DataTree.java
-serialize
-serializeNodes(oa);
-serializeNode(oa,new StringBuilder());
//序列化一个节点
-serializeNodeData(oa,pathString,nodeCopy);
-oa.writeString(path,"path");
-oa.writeRecord(node,"mode");
QuorumPeer.java
-ZabState
-ELECTION
-DISCOVERY
-SYNCHRONIZATION
-BROADCAST
Follower.java
-followerLeader()
-self.setZabState(QuorumPeer.ZabState.DISCOVERY);
//首先切换状态
-QuorumServer leaderServer = findLeader();
//进入Learner.java
-connectToLeader(leaderServer.addr,leaderServer.hostname);
//连接leader,进入Learner.java
-long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
//进入Learner.java,执行1,2,3步骤
-syncWithLeader(newEpochZxid);
//进入Learner.java
Learner.java
-findLeader()
-Vote current = self.getCurrentVote();
//拿到推举结果
-connectToLeader
-address.stream().map(address->new LeaderConnector(address,socket,latch)).forEach(executor::submit);
-run()
-Socket sock = connectToLeader();
-Socket sock = createSocket();
//创建socket客户端
-sockConnect(sock,address,Math.min(connectTimeout,remainingTimeout));
//BIO客户端发起请求给BIO服务端
-sock.connect(addr,timeout);
//Follower执行该代码时,Leader的socket = serverSocket.accept()返回
-registerWithLeader(int pktType)
-QuorumPacket qp = new QuorumPacket();
//构建数据包
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(),0));
-LearnerInfo li = new LearnerInfo(self.getId(),0x10000,self.getQuorumVerifier().getVersion());
-writePacket(qp,true);
//写FOLLOWERINFO信息给Leader
-readPacket(qp);
//当leader写数据时,此处读取
-QuorumPacket ackNewPacket = new QuorumPacket(Leader.ACKEPOCH,lastLoggedZxid,epochBytes,null);
//follower给leader返回ACKEPOCH信息
-syncWithLeader
//Leader主动把要同步的数据,顺序发送给Follower
-readPacket(qp);
//读取同步方式:DIFF TRUNC SNAP
-if(qp.getType() == Leader.DIFF)
//并没有进行同步
-else if(qp.getType() == Leader.SNAP)
//通过快照方式
-zk.getZKDatabase().deserializeSnapshot(leaderIs);
//进行反序列化
-else if(qp.getType() == Leader.TRUNC)
-boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
//删除一部分数据
-outerLoop:
//不停接收Leader发送过来的消息
-case Leader.PROPOSAL:
//部分事务数据
-packetsNotCommitted.add(pif);
//处理完毕之后,加入未提交的packets队列
//放在内存集合里面
-case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
-zk.processTxn(pif.hdr,pif.rec);
packetsNotCommitted.remove()
//处理事务成功后,从packetsNotCommitted中移除
//需要proposal+commit一起
-case Leader.NEWLEADER:
-zk.takerSnapshot(syncSnapshot);
//拍摄快照
-zk.startupWithoutServing();
//启动ZooKeeper中各种服务,follower的服务启动
//进入ZooKeeperServer.java
-writePacket(new QuorumPacket(Leader.ACK,newLeaderuZxid,null,null),true);
//8.返回一个acke消息给Leader
-case Leader.UPTODATE:
//当follower端接收到leader的UPTODATE消息,说明完成同步
-break outerLoop;
//退出while循环
-writePacket(ack,true);
//在同步完成之后,给leader返回ACK确认消息,第10步
ZooKeeperServer.java
-startupWithoutServing()
-startupWithServerState(State.INITIAL);
在Leader的lead()方法最后,也就是Leader完成了和集群过半Follower的同步之后,就会调用startZkServer()来启动必要的服务,主要包括:
SessionTracker
RequestProcessor
更新Leader ZooKeeperServer的状态
Leader.java
-lead()
-cnxAcceptor = new LearnerCnxAcceptor();
//启动BIO服务端
-waitForEpoch(self.getId(),leaderStateSummary);
//等待半数节点认同leader
-waitForNewLeaderAck(self.getId(),zk.getZxid());
//等待半数节点完成状态同步
-startZkServer();
-zk.startup();
//进入LeaderZooKeeperServer.java
LeaderZooKeeperServer.java
//父类ZooKeeperServer.java
-startup()
-super.startup();
//进入ZooKeeperServer.java
ZooKeeperServer.java
-startup()
-startupWithServerState(State.RUNNING);
//此方法Leader使用,Follower也使用,启动基础服务
LearnHandler.java
-run()
-while(true)
Follower.java
-follower()
-QuorumPackets qp = new QuorumPackets();
while(this.isRunning) {
readPackets(qp);
processPackets(qp);
}
//严格串行执行,qp在while外进行复用