当前位置: 首页 > 工具软件 > Follower > 使用案例 >

【ZooKeeper】zookeeper源码8-Follower和Leader状态同步

昝浩阔
2023-12-01


ZooKeeper状态同步、服务启动、Session管理机制

QuorumPeer一台ZooKeeper服务器的抽象,其中有很多组件,网络环境、工作组件、成员变量、线程、数据库等。
1、ServerCnxnFactory是一个Runnable实现,默认实现是:NIOServerCnxnFactory,内部启动了一个线程,启动了一个NIO服务端,监听了2181端口,等待客户端发送链接请求过来,然后创建一个ServerCnxn,负责完成该客户端的读写请求。
2、QuorumCnxManager是一个选举过程中的链接管理器,内部启动了一个BIO服务端,它监听了3888选举端口,等待其他客户端发送选举链接请求过来建立选举选票交换。
3、ZKDataBase是ZK数据库,每个QuorumPeer代表一台ZK物理机,每个QuorumPeer内部都有一个ZKDatabase对象,意味着,每个ZK节点都保存了该集群的所有数据。
4、内部还保存了一些跟选举有关的信息,比如myid、vote、Election选举算法实现等也会保存,从zoo.cfg中解析得到的其他Server的信息。
选举、冷启动恢复都是在QuorumPeer对象中的工作机制。
QuorumPeer重要的事是,进入while循环,ZAB工作模式。通过while循环+switch case表示。
while循环核心逻辑,根据当前服务器状态决定调用哪个方法来执行。所有服务器一开始上线都为LOOKING状态,即执行setCurrentVote(makeLEStrategy().lookForLeader()),发起选举的入口;当这个方法结束后,推举得到一个leader,此leader没有得到承认,还需执行认同和状态同步,因此每台服务器变更自己状态:变成observer,follower,一个服务器变为leader。

ELECTION:开始选举,当前server进入找Leader状态:Vote currentVote = lookForLeader()
DISCORY:当选举得出了结果,开始进入发现认同阶段,当超过半数Follower认同该Leader,意味着选举真正结束
SYNCHRONIZATION:经过确认,有超过半数节点都同意刚推举出来的Leader节点
BROADCAST:当有超过半数的Follower完成了和Leader的状态同步进入消息广播状态,正常对外提供服务

ZooKeeper的Follower和Leader状态同步

启动入口QuorumPeerMain

QuorumPeerMain.java
-main.initilizeAndRun(args)
	-runFromConfig
		-quorumPeer.start();//线程方法启动
		//进入QuorumPeer.java方法
QuorumPeer.java
-run()
//run方法中核心while循环
	-leader.lead()
	//进入Leader.java方法
	-follower.followLeader()
	//进入Follower.java方法
Leader.java
-lead()
	-self.setZabState(QuorumPeer.ZabState.DISCOVERY);
	//进入DISCOVERY状态
	-zk.loadData();
	-cnxAcceptor = new LearnerCnxAcceptor()
	//启动线程,此线程再启动其他线程,启动LeaderCnxAcceptorHandler线程
	//LeaderCnxAcceptorHandler线程启动BIO ServerSocket,绑定2888端口,阻塞再accept方法上,accept如果接收到一个follower请求的话,follower就会和leader去连接,启动LeaderHandler线程专门给Follower服务
	-newLeaderProposal.packet = new QuorumPacket(NEWLEADER,zk.getZxid(),null,null);
	//准备线程包,NEWLEADER
	-waitForEpochAck(self.getId(),leaderStateSummary)
	//等待DISCOVERY结束,相当于leader被选出来了,半数已投出
	-self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
	//进行状态同步
	-waitForNewLeaderAck(self.getId(),zk.getZxid());
	//等待状态退出

Follower.java

LearnerHandler是一个线程
Follower中会启动一个线程LeaderConnector,LeaderConnector会创建Socket BIO 客户端(即socket对象)去链接ServerSocket BIO服务端,Leader中会专门创建LearnerHandler线程为Follower服务。
Leader和Follower之间进行状态同步,10个步骤交互:(逻辑上)
1、Follower->LearnerHandler:FOLLOWERINFO(server.id,acceptedepoch)
2、LearnerHandler->Follower:LEADERINFO(new epoch)
3、Follower->LearnerHandler:ACKEPOCH(zxid,currentEpoch)
4、LearnerHandler->Follower:DIFF/SNAP/TRUNC
5、LearnerHandler->Follower:待同步的提案数据(Proposal)
6、LearnerHandler->Follower:COMMIT(提交)
7、LearnerHandler->Follower:NEWLEADER
8、Follower->LearnerHandler:ACK
9、LearnerHandler->Follower:UPTODATE(广播)
10、Follower->LearnerHandler:ACK

当知道Leader后,所有Follower会作为客户端主动去连接Leader。
第一:1和2,用于计算选举新的epoch值,然后Leader通知该值给Follower
第二:3和4,Leader在收到超过半数的ACKEPOCH之后,开始给每个Follower,计算出何种的同步方式
第三:4和5和6,Follower根据Leader计算出来的同步方式,完成和Leader的数据同步
第四:7,Leader表示已完成和该Follower的同步
第五:8,Follower表示接收到Leader发送过来的NEWLEADER了
第六:9,Leader在确认集群中有超过半数Follower完成同步之后,就会给完成同步的Follower发送UPTODATE消息,表示可以对外提供服务了
第七:10,Follower收到Leader发送过来的UPTODATE消息,表示已经接收到这个通知了,开始对外提供服务了

关于同步过程中两种方式:
快照同步:Leader直接把最新的快照文件(这个快照文件必然包含了Leader的所有数据)直接通过网络发送给Follower
差异化同步:在确认同步方式的时候,如果得到的结果不是快照同步,则同时把要同步的数据,变成(Proposal+Commit消息放到队列中),队列中的数据形态:DIFF+Proposal+Commit+Proposal+Commit…+Proposal+Commit+NEWLEADER,当Follower接收到NEWLEADER消息的时候,意味着Follower已经接收到了需要同步的所有数据。

标准10步骤:
1.Follower发送FOLLOWERINFO消息给Leader,信息中包含Follower的AcceptedEpoch
2.Leader在接收到Follower的Follower的FOLLOWERINFO消息的时候,返回一个LEADERINFO消息给Follower,信息中包含Leader的AcceptedEpoch
3.Follower给Leader返回一个ACKEPOCH消息,表示已经接收到Leader的AcceptedEpoch了。Leader需要等待有超过半数的Follower发送回来ACKEPOCH消息,表示集群中,有超过半数节点在追随相同的Leader节点,则选举结束。开始进入同步阶段
4.Leader根据Follower发送过来的epoch信息给Follower计算同步方式,同步方式有可能是DIFF,SNAP,TRUNC的其中之一。计算得到的同步方式消息放入到LearnerHandler的queuedPackets队列中,跟后面计算出来的待同步的分布式事务日志一起执行发送。
5.如果同步方式是DIFF,则获取到需要同步的分布式事务的PROPOSAL和COMMIT日志,也放入LearnerHandler的queuedPackets队列中,如果同步方式是SNAP,则先写入一个SNAP消息给Follower,然后把快照文件发送过给Follower进行快照同步
6.通过startSendingPackets()方法启动一个匿名线程执行LearnerHandler的queuedPackets队列中的数据包发送
7.当该同步的数据(queuedPackets队列中的PROPOSAL和COMMIT日志,或者SNAP方式的快照文件)都发送完毕之后,Leader给Follower发送一个NEWLEADER消息表示所有待同步数据已经发送完毕
8.Follower在接收到Leader发送过来的NEWLEADER消息,就必然得知要和Leader进行同步的日志数据,都已经发送过来了,Follower也执行成功了,则可以给Leader发送过一个ACK反馈
9.Leader需要等待集群中,有超过半数的节点发送ACK反馈过来,如此,集群启动成功,Leader给发送过了ACK的Follower Server发送一个UPTODATE的消息表示集群已经启动成功。Leader和Follower都启动各自的一些必备基础服务可以开始对外提供服务了。
10.Follower接收到Leader的UPTODATE消息,即表示集群启动正常,Follower可以正常对外提供服务,Follower再给Leader返回一个ACK

注意:
Leader和Follower之间是有心跳的,如果维持心跳的节点数不超过集群半数节点了,则集群不能正常对外提供服务了,全部进入LOOKING状态。
Leader通过syncFollower()方法来计算和Follower的同步方式。
Leader和Follower再集群正常启动成功之后,需要启动一些基础服务,比如SessionTracker和RequestProcessor等

Zookeeper的Follower和Leader状态同步源码

Leader.java
-lead()
	-self.setZabState(QuorumPeer.ZabState.DISCOVERY);
		-ZabState
		//进入QuorumPeer.java
	-zk.loadData();
	//进入ZooKeeperServer.java
	-cnxAcceptor = new LearnerCnxAcceptor();
		-run()
			-serverSockets.forEach(serverSocket->executor.submit(new LearnerCnxAcceptorHandler(serverSocket,latch)));
				-run()//LearnerCnxAcceptorHandler线程的
					-while(!stop.get())
						-acceptConnections();
						//这个方法内部有一个是阻塞的,serverSocket.accept();
							-socket = serverSocket.accept();
							//当Follower节点作为Socket客户端发送连接请求给Leader的时,这句代码返回,意味着Follower和Leader建立了连接
							//阻塞在这里服务端,然后去看客户端Follower.java
							-LearnerHandler fh = new LearnerHandler(socket,is,Leader.this);
							//此线程专门为Follower提供服务
							//进入LearnHandler.java
								-run()
-waitForEpochAck
	-QuorumVerifier verifier = self.getQuorumVerifier();
	//少数服从多数的算法
	 if(electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) {
	 	electionFinished = true;
	 	electionFollowers.notifyAll();
 	}
 	//self.getId()为发送ACKEPOCH消息过来的那个Follower
 	//1.这个follower是否有选举权
 	//2.得到的ackepoch是否数量超过集群总节点数的半数

LearnerHandler.java
-run()
	-QuorumPacket qp = new QuorumPacket();
	 ia.readRecord(qp,"packet");
	//创建数据包,将数据读到数据包中,qp就是FOLLOWERINFO
	-long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
	-long zxid = qp.getZxid();
	-long newEpoch = learnerMaster.getEpochToPropose(this.getSid(),lastAcceptedEpoch);
	-long newLeaderZxid = ZxidUtils.makeZxid(newEpoch,0);
	//准备leader给follower发送的数据
	-QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO,newLeaderZxid,ver,null);
	//leader发给follower的LEADERINFO信息
	-oa.writeRecord(newEpochPacket,"packet");
	//leader写数据,follower那边就是读数据
	-learnerMaster.waitForEpochAck(this.getSid(),ss);
	//线程会阻塞这里
	//把对方服务器的信息ss,加入leaderMaster来判断集群是否超过半数
	//进入Leader.java
	-boolean needSnap = syncFollower(peerLastZxid,learnerMaster);
	//1.计算同步方式
		-if(forceSnapSync)
		//默认为fasle
		-else if(lastProcessedZxid == peerLastZxid)
		//如果follower的zxid和leader的一样,则发送DIFF
		//lastProcessedZxid是leader的最大zxid
		//peerLastZxid是follower的zxid
		-else if(peerLastZxid > maxCommittedLog && !isPeerEpochZxid)
		//如果follower的zxid大于leader的最大zxid,则发送trunc
		-else if(maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid))
		//如果follower的zxid在minCommittedLog和maxCommittedLog之间,发送DIFF进行日志同步
		-else if(peerLastZxid < minCommittedLog && txnLogSyncEnabled)
		//如果follower的zxid比minCommittedLog还小,则同步日志
		-return needSnap;
		//重点是needOpPacket的赋值
		//返回为true,表示通过快照方式同步
		//返回为false,表示通过DIFF或TRUNC方式执行同步
		
	-if(needSnap){}
	//2.执行快照方式同步,则queuePackets队列必然为空
	//3.启动一个线程来消费queuedPackets,执行队列中的消息真正发送(发送给Follower)
		-oa.writeRecord(new QuorumPacket(Leader.SNAP,zxidToSend,null,null),"packet");
		//发送SNAP消息给Follower
		-learnerMaster.getZKDatabase().serializeSnapshot(oa);
		//序列化zkdatabase到follower
		//进入ZKDatabase.java
	-startSendingPackets();
	//启动一个线程,开始queuePackets队列中的消息消费
		-sendPackets();
			-p = queuePackets.poll();
			-oa.writeRecord(p,"packet");
	-ia.readRecord(qp,"packet");
	//读取第8步,follower返回给leader的ack信息
	-learnerMaster.waitForNewLeaderAck(getSid(),qp.getZxid());
	//等待Follower返回ack消息同步完后,进行反馈
	-queuedPackets.add(new QuorumPacket(Leader.UPTODATE,-1,null,null));
	//当Leader确认集群启动成功之后,给所有发送了ACK的Follower发回一个UPTODATE消息
	//只要queuePackets队列中添加了UPTODATE消息,马上发给follower
	-while(true)
	//用来接收follower发送过来的消息
		-case Leaer.REQUEST:
		//Follower转发过来的事务请求
			-if(type == OpCode.sync)
			//同步请求,ZK不保证客户端一定能拿到最新数据,所以客户端调用一个sync操作,保持最新数据
			//ZooKeeper.java中的sync方法可以保证最新数据
			-si = new Request(null,sessionId,cxid,type,bb,qp.getAuthinfo());
			//事务请求
	

ZKDatabase.java
-serializeSnapshot
	-SerializeUtils.serializedSnapshot(getDataTree(),oa,getSessionWithTimeOuts());
	//进入SerializeUtils.java

SerializeUtils.java
-serializeSnapshot
//遍历datatree中的数据进行序列化
	
ZooKeeperServer.java
-loadData()
	-if(zkDb.isInitialized())
	//如果初始化了,直接获取最大zxid
		-setZxid(zkDb.getDataTreeLastProcessedZxid());
	-else
	//如果没有初始化,先去初始化
		-setZxid(zkDb.loadDatabase());
	//start:进入空间换时间,降低复杂度环节
	List<Long> deadSessions = new ArrayList<>();
	for(Long session:zkDb.getSessions()) {
		if(zkDb.getSessionWithTimeOuts().get(session)==null) {
			deadSessions.add(session);
		}
	}
	for(long session: deadSessions) {
		killSession,zkDb.getDataTreeLastProcessedZxid());
	}
	//end:获取超时session,并杀死

	-takeSnapShot();
	//1、save 执行快照
	//2、restore	反序列化
		-txnLogFactory.save(zkDb.getDataTree(),zkDb.getSessionWithTimeOuts(),syncSnap);
		//进入FileTxnSnapLog.java

FileTxnSnapLog.java
-save
	-File snapshotFile = new File(snapDir,Util.makeSnapshotName(lastZxid));
	//生成快照文件名,snapshot.b0000000002
	-snapLog.serialize(dataTree,sessionWithTimeouts,snapshotFile,syncSnap);
	//拍摄快照,进入FileSnap.java

FileSnap.java
-serialize
	-FileHeader header = new FileHeader(SNAP_MAGIC,VERSION,dbId);
	//构建快照文件头
	-serialize(dt,sessions,oa,header);
		-header.serialize(oa,"fileheader");
		-SerializeUtils.serializeSnapshot(dt,oa,sessions);
		//序列化session,进入SerializeUtils.java

SerializeUtils.java
-serializeSnapshot
	-oa.writeInt(sessSnap.size(),"count");
	-for (Entry<Long,Integer> entry : sessSnap.entrySet()) {
		oa.writeLong(entry.getKey().longValue(),"id");
		oa.writeLong(etnry.getValue().intValut(),"timeout");
	 }
	-dt.serialize(oa,"tree");
	//进入DataTree.java

DataTree.java
-serialize
	-serializeNodes(oa);
		-serializeNode(oa,new StringBuilder());
		//序列化一个节点
			-serializeNodeData(oa,pathString,nodeCopy);
				-oa.writeString(path,"path");
				-oa.writeRecord(node,"mode");
	

QuorumPeer.java
-ZabState
	-ELECTION
	-DISCOVERY
	-SYNCHRONIZATION
	-BROADCAST
Follower.java
-followerLeader()
	-self.setZabState(QuorumPeer.ZabState.DISCOVERY);
	//首先切换状态
	-QuorumServer leaderServer = findLeader();
	//进入Learner.java
	-connectToLeader(leaderServer.addr,leaderServer.hostname);
	//连接leader,进入Learner.java
	-long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
	//进入Learner.java,执行1,2,3步骤
	-syncWithLeader(newEpochZxid);
	//进入Learner.java
	
Learner.java
-findLeader()
	-Vote current = self.getCurrentVote();
	//拿到推举结果
-connectToLeader
	-address.stream().map(address->new LeaderConnector(address,socket,latch)).forEach(executor::submit);
		-run()
			-Socket sock = connectToLeader();
				-Socket sock = createSocket();
				//创建socket客户端
				-sockConnect(sock,address,Math.min(connectTimeout,remainingTimeout));
				//BIO客户端发起请求给BIO服务端
					-sock.connect(addr,timeout);
					//Follower执行该代码时,Leader的socket = serverSocket.accept()返回
-registerWithLeader(int pktType)
	-QuorumPacket qp = new QuorumPacket();
	//构建数据包
	 qp.setType(pktType);
	 qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(),0));
	-LearnerInfo li = new LearnerInfo(self.getId(),0x10000,self.getQuorumVerifier().getVersion());
	-writePacket(qp,true);
	//写FOLLOWERINFO信息给Leader
	-readPacket(qp);
	//当leader写数据时,此处读取
	-QuorumPacket ackNewPacket = new QuorumPacket(Leader.ACKEPOCH,lastLoggedZxid,epochBytes,null);
	//follower给leader返回ACKEPOCH信息

-syncWithLeader
//Leader主动把要同步的数据,顺序发送给Follower
	-readPacket(qp);
	//读取同步方式:DIFF TRUNC SNAP
	-if(qp.getType() == Leader.DIFF)
	//并没有进行同步
	-else if(qp.getType() == Leader.SNAP)
	//通过快照方式
		-zk.getZKDatabase().deserializeSnapshot(leaderIs);
		//进行反序列化
	-else if(qp.getType() == Leader.TRUNC)
		-boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
		//删除一部分数据
	-outerLoop:
	//不停接收Leader发送过来的消息
		-case Leader.PROPOSAL:
		//部分事务数据
			-packetsNotCommitted.add(pif);
			//处理完毕之后,加入未提交的packets队列
			//放在内存集合里面
		-case Leader.COMMIT:
		 case Leader.COMMITANDACTIVATE:
		 	-zk.processTxn(pif.hdr,pif.rec);
		 	 packetsNotCommitted.remove()
		 	//处理事务成功后,从packetsNotCommitted中移除
		 	//需要proposal+commit一起
		-case Leader.NEWLEADER:
			-zk.takerSnapshot(syncSnapshot);
			//拍摄快照
			-zk.startupWithoutServing();
			//启动ZooKeeper中各种服务,follower的服务启动
			//进入ZooKeeperServer.java
			-writePacket(new QuorumPacket(Leader.ACK,newLeaderuZxid,null,null),true);
			//8.返回一个acke消息给Leader
		-case Leader.UPTODATE:
		//当follower端接收到leader的UPTODATE消息,说明完成同步
			-break outerLoop;
			//退出while循环
		-writePacket(ack,true);
		//在同步完成之后,给leader返回ACK确认消息,第10步

ZooKeeperServer.java
-startupWithoutServing()
	-startupWithServerState(State.INITIAL);
		

ZooKeeper服务启动

在Leader的lead()方法最后,也就是Leader完成了和集群过半Follower的同步之后,就会调用startZkServer()来启动必要的服务,主要包括:
SessionTracker
RequestProcessor
更新Leader ZooKeeperServer的状态

Leader.java
-lead()
	-cnxAcceptor = new LearnerCnxAcceptor();
	//启动BIO服务端
	-waitForEpoch(self.getId(),leaderStateSummary);
	//等待半数节点认同leader
	-waitForNewLeaderAck(self.getId(),zk.getZxid());
	//等待半数节点完成状态同步
	-startZkServer();
		-zk.startup();
		//进入LeaderZooKeeperServer.java

LeaderZooKeeperServer.java
//父类ZooKeeperServer.java
-startup()
	-super.startup();
	//进入ZooKeeperServer.java

ZooKeeperServer.java
-startup()
	-startupWithServerState(State.RUNNING);
	//此方法Leader使用,Follower也使用,启动基础服务
	
LearnHandler.java
-run()
	-while(true)
Follower.java
-follower()
	-QuorumPackets qp = new QuorumPackets();
	 while(this.isRunning) {
	 	readPackets(qp);
	 	processPackets(qp);
 	 }
 	 //严格串行执行,qp在while外进行复用
 	 
 类似资料: