zookeeper源码浅析
扫描二维码
随时随地手机看文章
1.基本架构
2.ZAB协议
ZooKeeper并没有完全采用Paxos算法,而是使用了一种称为ZooKeeper Atomic Broadcast(ZAB,zookeeper原子消息广播协议)的协议作为其数据一致性的核心算法。
2.1选择Leader需用半数通过才选举成成功,同时集群中已经有过半的机器与该Leader服务器完成状态同步(数据同步)才能开始服务。
2.2所有事务请求必须由一个全局唯一的服务器来协调处理,这样的服务器称为Leader服务器,而余下的其他服务器则成为Follower服务器。Leader服务器负责将一个客户端事务请求转换成一个事务Proposal(提议),并将该Proposal分发给集群中所有的Follower服务器。之后Leader服务器需要等待所有Follower服务器的反馈,一旦超过半数的Follower服务器进行了正确反馈后,那么Leader就会再次向所有的Follower服务器分发Commit消息,要求其将前一个Proposal进行提交。
3.Leader和Follower启动过程
4.请求处理
4.1请求处理链
4.1.1leader请求处理链
4.1.2follower请求处理链
4.2处理流程
以creater服务端为leade为例流程如下
FollowerZooKeeperServer与LeaderZooKeeperServer处理流程的差别是FollowerRequestProcessor会将事务请求转发给leader,SendAckRequestProcessor向leader返回事务提议正确的响应,其他的处理链都是一致的。SendAckRequestProcessor和AckRequestProcessor的区别是AckRequestProcessor是leader的本地调用。FollowerRequestProcessor的事务请求的代码如下
Java代码 public void run() { try { while (!finished) { Request request = queuedRequests.take(); if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK, 'F', request, ""); } if (request == Request.requestOfDeath) { break; } // We want to queue the request to be processed before we submit // the request to the leader so that we are ready to receive // the response nextProcessor.processRequest(request); // We now ship the request to the leader. As with all // other quorum operations, sync also follows this code // path, but different from others, we need to keep track // of the sync operations this follower has pending, so we // add it to pendingSyncs. switch (request.type) { case OpCode.sync: zks.pendingSyncs.add(request); zks.getFollower().request(request); break; case OpCode.create: case OpCode.delete: case OpCode.setData: case OpCode.setACL: case OpCode.createSession: case OpCode.closeSession: case OpCode.multi: zks.getFollower().request(request); break; } } } catch (Exception e) { LOG.error("Unexpected exception causing exit", e); } LOG.info("FollowerRequestProcessor exited loop!"); }
5.数据同步
ZooKeeper集群数据同步分为4类,分别为直接差异化同步(DIFF)、先回滚再差异化同步(TRUNC+DIFF)、回滚同步(TRUNC)和全量同步(SNAP)。在同步之前,leader服务器先对peerLastZxid(该leader服务器最好处理的ZXID)、minCommittedLog(leader服务器提议缓存队列committedLog中的最小ZXID)、maxCommittedLog(leader服务器提议缓存队列committedLog中的最大ZXID)进行初始化,然后通过这3个ZXID值进行判断同步类型,并进行同步。代码见LearnerHandler的run方法:
Java代码 ..... long peerLastZxid; StateSummary ss = null; long zxid = qp.getZxid(); long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); if (this.getVersion() < 0x10000) { // we are going to have to extrapolate the epoch information long epoch = ZxidUtils.getEpochFromZxid(zxid); ss = new StateSummary(epoch, zxid); // fake the message leader.waitForEpochAck(this.getSid(), ss); } else { byte ver[] = new byte[4]; ByteBuffer.wrap(ver).putInt(0x10000); QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null); oa.writeRecord(newEpochPacket, "packet"); bufferedOutput.flush(); QuorumPacket ackEpochPacket = new QuorumPacket(); ia.readRecord(ackEpochPacket, "packet"); if (ackEpochPacket.getType() != Leader.ACKEPOCH) { LOG.error(ackEpochPacket.toString() + " is not ACKEPOCH"); return; ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData()); ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid()); leader.waitForEpochAck(this.getSid(), ss); } peerLastZxid = ss.getLastZxid(); /* the default to send to the follower */ int packetToSend = Leader.SNAP; long zxidToSend = 0; long leaderLastZxid = 0; /** the packets that the follower needs to get updates from **/ long updates = peerLastZxid; /* we are sending the diff check if we have proposals in memory to be able to * send a diff to the */ ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock(); ReadLock rl = lock.readLock(); try { rl.lock(); final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog(); final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog(); LOG.info("Synchronizing with Follower sid: " + sid +" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog) +" minCommittedLog=0x"+Long.toHexString(minCommittedLog) +" peerLastZxid=0x"+Long.toHexString(peerLastZxid)); LinkedList