前两篇文章分别分析了Leader处理客户端非事务请求、事务请求的处理过程。最后我们来分析下Follower节点处理客户端请求的不同之处。
过程与Leader处理的过程基本差不多,所以相似的地方笔者就简略带过,重点分析不同之处。
同样的,我们先从其构造上来分析下其处理链
public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true,
getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}
}
所以,其处理链为FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor
同时还有一个SyncRequestProcessor响应leader的proposal,后续我们详细分析
public class FollowerRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
// 从客户端获取的请求全部存入queuedRequests,后续通过run()方法调用执行
public void processRequest(Request request) {
if (!finished) {
queuedRequests.add(request);
}
}
public void run() {
try {
while (!finished) {
Request request = queuedRequests.take();
...
// 请求交由下一个processor(commitProcessor)处理
nextProcessor.processRequest(request);
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
zks.getFollower().request(request);
break;
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
case OpCode.multi:
// 有关于事务类型请求,直接交由leader处理,具体见2.1
zks.getFollower().request(request);
break;
}
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("FollowerRequestProcessor exited loop!");
}
}
public class Learner {
void request(Request request) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream oa = new DataOutputStream(baos);
oa.writeLong(request.sessionId);
oa.writeInt(request.cxid);
oa.writeInt(request.type);
if (request.request != null) {
request.request.rewind();
int len = request.request.remaining();
byte b[] = new byte[len];
request.request.get(b);
request.request.rewind();
oa.write(b);
}
oa.close();
QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
.toByteArray(), request.authInfo);
writePacket(qp, true);
}
}
FollowerRequestProcessor将事务请求交由leader处理;同时继续将请求交由commitProcessor来处理;
我们来回忆下前文中Leader节点处理事务请求的过程:针对事务请求,leader生成proposal信息到各follower,follower处理完成后返回ack,leader接收到足够的ack后再次向各follower发送commit信息。
那么这个过程中,我们的follower相关处理在哪里呢?这时候又要回到FollowerServer启动的时候了,如下。
public class Follower extends Learner{
void followLeader() throws InterruptedException {
...
try {
QuorumServer leaderServer = findLeader();
try {
// 创建与leader连接
connectToLeader(leaderServer.addr, leaderServer.hostname);
// 将当前节点信息注册到leader上
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
// 与leader进行数据同步
syncWithLeader(newEpochZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
// 接收leader请求包,并进行处理
readPacket(qp);
processPacket(qp);
}
} ...
}
protected void processPacket(QuorumPacket qp) throws IOException{
switch (qp.getType()) {
// 接收到leader发送过来的proposal
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();
// 进行事务日志处理,具体见2.2.1
fzk.logRequest(hdr, txn);
break;
case Leader.COMMIT:
// 当leader收集到足够的ack后,向各follower发送commit,具体见2.2.2
fzk.commit(qp.getZxid());
break;
...
}
}
}
2.2.1 FollowerZooKeeperServer.logRequest() 创建事务请求日志
public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
public void logRequest(TxnHeader hdr, Record txn) {
Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
hdr.getType(), null, null);
request.hdr = hdr;
request.txn = txn;
request.zxid = hdr.getZxid();
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
// 直接交由syncProcessor处理
syncProcessor.processRequest(request);
}
}
SyncProcessor的作用我们都知道,就是将当前请求进行事务日志保存。
而事务日志保存完成后,则直接交由SendAckRequestProcessor来处理
2.2.2 SendAckRequestProcessor 返回leader ack响应
public class SendAckRequestProcessor implements RequestProcessor, Flushable {
public void processRequest(Request si) {
if(si.type != OpCode.sync){
// 直接返回leader ack响应包
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
null);
try {
learner.writePacket(qp, false);
} catch (IOException e) {
...
}
}
}
}
2.2.3 FollowerZooKeeperServer.commit() 提交事务proposal
public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
public void commit(long zxid) {
if (pendingTxns.size() == 0) {
LOG.warn("Committing " + Long.toHexString(zxid)
+ " without seeing txn");
return;
}
long firstElementZxid = pendingTxns.element().zxid;
if (firstElementZxid != zxid) {
LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
+ " but next pending txn 0x"
+ Long.toHexString(firstElementZxid));
System.exit(12);
}
Request request = pendingTxns.remove();
// 最终交由commitProcessor处理,详见3.1
commitProcessor.commit(request);
}
}
总结:关于这块的处理,读者可以对照着Leader处理事务请求的过程来比对着看。
Follower关于事务请求还是分为两部分:
接收leader proposal请求,记录事务日志后,返回ack响应;
接收leader commit请求,将请求交由CommitProcessor处理;
public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
synchronized public void commit(Request request) {
if (!finished) {
if (request == null) {
LOG.warn("Committed a null!",
new Exception("committing a null! "));
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Committing request:: " + request);
}
// 很简单,直接将请求放入committedRequests
committedRequests.add(request);
notifyAll();
}
}
}
follower提交事务proposal的方式很简单,就是将请求放入committedRequests集合中,依据我们之前Leader节点对CommitProcessor的分析,在如下
public class CommitProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
// leader获取的请求集合
LinkedList<Request> queuedRequests = new LinkedList<Request>();
// 已经被follower 提交的请求集合
LinkedList<Request> committedRequests = new LinkedList<Request>();
public void run() {
try {
Request nextPending = null;
while (!finished) {
int len = toProcess.size();
for (int i = 0; i < len; i++) {
// 5.请求proposal已完成,交由下个processor处理即可
nextProcessor.processRequest(toProcess.get(i));
}
toProcess.clear();
synchronized (this) {
// 2.若没有收到请求且没有收到leader的commit请求,则等待
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() == 0) {
wait();
continue;
}
// 3.committedRequests不为空,说明当前follower已经接受到leader的commit请求
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() > 0) {
Request r = committedRequests.remove();
if (nextPending != null
&& nextPending.sessionId == r.sessionId
&& nextPending.cxid == r.cxid) {
nextPending.hdr = r.hdr;
nextPending.txn = r.txn;
nextPending.zxid = r.zxid;
// 4.本次请求可以提交给下个processor处理
toProcess.add(nextPending);
nextPending = null;
} else {
// this request came from someone else so just
// send the commit packet
toProcess.add(r);
}
}
}
// We haven't matched the pending requests, so go back to
// waiting
if (nextPending != null) {
continue;
}
// 1.请求达到时,nextPending被设置为当前request,下次循环时会使用到
synchronized (this) {
// Process the next requests in the queuedRequests
while (nextPending == null && queuedRequests.size() > 0) {
Request request = queuedRequests.remove();
switch (request.type) {
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.multi:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
nextPending = request;
break;
case OpCode.sync:
if (matchSyncs) {
nextPending = request;
} else {
toProcess.add(request);
}
break;
default:
toProcess.add(request);
}
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting", e);
} catch (Throwable e) {
LOG.error("Unexpected exception causing CommitProcessor to exit", e);
}
LOG.info("CommitProcessor exited loop!");
}
}
与leader中CommitProcessor的处理类似,读者可以按照上面数字排序来分析整个过程
最终都是交由FinalRequestProcessor来处理,这块我们已经分析过很多次了,不再赘述。
这里通过分析Follower节点处理请求(事务请求)的过程,可以了解到:Follower本身并不处理事务请求,而是直接转发给leader来处理;
但是follower会配合leader进行proposal的处理,最终将节点信息添加到当前ZKDatabase。