zk源码—6.Leader选举的实现原理

大纲

1.zk是如何实现数据一致性的

(1)数据一致性分析

(2)实现数据一致性的广播模式

(3)实现数据一致性的恢复模式

2.zk是如何进行Leader选举的

(1)服务器启动时的Leader选举

(2)服务器运行时的Leader选举

(3)Leader选举的算法设计

(4)Leader选举的实现细节

 

1.zk是如何实现数据一致性的

(1)数据一致性分析

(2)实现数据一致性的广播模式

(3)实现数据一致性的恢复模式

 

zk集群中的服务器分为Leader服务器、Follower服务器及Observer服务器。Leader选举是一个过程,在这个过程中主要做了两项工作:

工作一:选举出Leader服务器

工作二:进行数据同步

 

zk中实现的一致性不是强一致性,而是最终一致性。即集群中各个服务器上的数据并不是每时每刻都保持一致的,而是即经过一段时间后,集群服务器上的数据才最终保持一致。

 

Leader服务器主要负责处理事务请求,当Leader服务器接收到客户端的事务请求时,会先向集群中的各机器针对该请求的提议发起投票询问。

 

(1)数据一致性分析

zk在集群中采取的是多数原则的方式来保证数据一致性。即当一个事务请求导致服务器上的数据发生改变时,只要保证多数机器的数据都正确变更了,就可保证系统数据一致性。

 

因为每个Follower服务器都可以看作是Leader服务器的数据副本,所以只要保证集群中大多数机器数据是一致的,那么在集群中个别机器出现故障时,zk集群依然能保证稳定运行。

 

(2)实现数据一致性的广播模式

一.首先Leader启动时会创建网络连接管理器LearnerCnxAcceptor等待Learner的连接

LearnerCnxAcceptor监听到Learner发起的连接后,会新建一个LearnerHandler实例专门负责Leader和该Learner之间的连接。启动LearnerHandler时,又会开启一个线程专门负责发送消息给Learner。如果Learner发生故障,那么Leader中为该Learner维护的LearnerHandler的ping()方法会检测到然后关闭相关线程和实例。

public class Leader {     ...     private final ServerSocket ss;          Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {         ...         //创建BIO的ServerSocket,监听端口,等待客户端发起连接         ss = new ServerSocket();         ss.bind(self.getQuorumAddress());         ...     }          void lead() throws IOException, InterruptedException {         ...         cnxAcceptor = new LearnerCnxAcceptor();//网络连接器         cnxAcceptor.start();         ...         while (true) {             ...             for (LearnerHandler f : getLearners()) {                 //Leader向Learner发出心跳检测                 f.ping();             }             ...         }     }          class LearnerCnxAcceptor extends ZooKeeperCriticalThread {         ...         public void run() {             ...             while (!stop) {                 //监听到客户端发起的连接,新建一个线程LearnerHandler专门进行处理                 Socket s = ss.accept();                 s.setSoTimeout(self.tickTime * self.initLimit);                 s.setTcpNoDelay(nodelay);                 BufferedInputStream is = new BufferedInputStream(s.getInputStream());                 LearnerHandler fh = new LearnerHandler(s, is, Leader.this);                 fh.start();             }             ...         }         ...     }     ... }  public class LearnerHandler extends ZooKeeperThread {     ...     public void run() {         //先进行数据同步         ...         //开启线程发送信息给Learner         startSendingPackets();         ...         //处理Learner发过来的消息,比如投票响应ACK消息、心跳响应PING消息等         while (true) {             qp = new QuorumPacket();             ia.readRecord(qp, "packet");             ...         }     }          protected void startSendingPackets() {         if (!sendingThreadStarted) {             new Thread() {                 public void run() {                     Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());                     try {                         sendPackets();                     } catch (InterruptedException e) {                         LOG.warn("Unexpected interruption " + e.getMessage());                     }                 }             }.start();             sendingThreadStarted = true;         } else {             LOG.error("Attempting to start sending thread after it already started");         }     }          private void sendPackets() throws InterruptedException {         long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;         while (true) {             try {                 //从queuedPackets队列中提取消息出来发送给Learner                 QuorumPacket p = queuedPackets.poll();                 if (p == proposalOfDeath) {                     // Packet of death!                     break;                 }                 oa.writeRecord(p, "packet");             } catch (IOException e) {                 //假如Leader在这里向Learner发送消息时,Learner故障了,那么就会在这里报错                 //此时,这里的报错并不影响对应的LearnerHandler实例和Leader实例                 if (!sock.isClosed()) {                     LOG.warn("Unexpected exception at " + this, e);                     try {                         sock.close();                     } catch(IOException ie) {                         LOG.warn("Error closing socket for handler " + this, ie);                     }                 }                 break;             }         }     }          public void ping() {         if (!sendingThreadStarted) {             return;         }         long id;         if (syncLimitCheck.check(System.nanoTime())) {             synchronized(leader) {                 id = leader.lastProposed;             }             QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);             queuePacket(ping);         } else {             LOG.warn("Closing connection to peer due to transaction timeout.");             //Learner故障,那么就关闭当前Learner实例             shutdown();         }     }          public void shutdown() {         // Send the packet of death         try {             queuedPackets.put(proposalOfDeath);         } catch (InterruptedException e) {             LOG.warn("Ignoring unexpected exception", e);         }         try {             if (sock != null && !sock.isClosed()) {                 sock.close();             }         } catch (IOException e) {             LOG.warn("Ignoring unexpected exception during socket close", e);         }         this.interrupt();         leader.removeLearnerHandler(this);     }     ... }

二.然后Leader处理Learner的事务投票响应后进行事务提交

Leader有一个HashSet为forwardingFollowers,用来管理Follower服务器。当Leader对一个事务请求发起Proposal提议的投票并发现投票通过后,也就是调用如下方法时:

Leader的processAck()方法 ->  Leader的tryToCommit()方法 -> Leader的commit()方法 ->  Leader的sendPacket()方法

会在Leader的sendPacket()方法中遍历forwardingFollowers里的LearnerHandler实例,将Commit请求交给Learner和Leader建立连接时生成的LearnerHandler,最后由Leader的每个LearnerHandler实例广播给对应的Learner进行事务提交。

//1.Leader通过Leader.propose方法对事务请求生成Proposal提议并进行广播给所有Follower public class Leader {     private final HashSet<LearnerHandler> forwardingFollowers = new HashSet<LearnerHandler>();     final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();     ...     public Proposal propose(Request request) throws XidRolloverException {         ...         byte[] data = SerializeUtils.serializeRequest(request);         proposalStats.setLastBufferSize(data.length);         QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);         //生成Proposal提议         Proposal p = new Proposal();         p.packet = pp;         p.request = request;                 synchronized(this) {             p.addQuorumVerifier(self.getQuorumVerifier());             if (request.getHdr().getType() == OpCode.reconfig) {                 self.setLastSeenQuorumVerifier(request.qv, true);                                    }             if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {                 p.addQuorumVerifier(self.getLastSeenQuorumVerifier());             }             lastProposed = p.packet.getZxid();             //将发送的Proposal提议放入outstandingProposals队列中             outstandingProposals.put(lastProposed, p);             //发送Proposal提议,其实就是把Proposal提议交给LearnerHandler处理             sendPacket(pp);         }         return p;     }          void sendPacket(QuorumPacket qp) {         synchronized (forwardingFollowers) {             for (LearnerHandler f : forwardingFollowers) {                 //LearnerHandler会将提议放入其发送队列里                 f.queuePacket(qp);             }         }     }     ... }  //2.Leader完成事务日志记录后,便会通过Leader.processAck方法记录Leader已对Proposal提议完成投票 //SyncRequestProcessor的nextProcessor就是AckRequestProcessor class AckRequestProcessor implements RequestProcessor {     ...     public void processRequest(Request request) {         ...         //Leader也作为参与Proposal投票的一份子进行ACK响应         //将LeaderSID添加到Proposal提议的投票收集器里 + 检查Proposal提议的投票收集器是否有过半ACK才提交         leader.processAck(self.getId(), request.zxid, null);         ...     } }  //3.Follower收到提议的投票请求后返回ACK响应给Leader //Leader接收到FollowerACK响应后,便会通过Leader.processAck方法记录该Follower已对提议完成投票 public class LearnerHandler extends ZooKeeperThread {     //The packets to be sent to the learner     final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>();     ...     @Override     public void run() {         ...         startSendingPackets();//开启一个线程发送queuedPackets里的PacketLearner         ...         while (true) {             qp = new QuorumPacket();             ia.readRecord(qp, "packet");//读取Learner的响应             ...             switch (qp.getType()) {                 case Leader.ACK:                     ...                     //如果Leader收到Follower对某Proposal提议请求返回的ACK响应                     //那么就将FollowerSID添加到该Proposal提议的投票收集器里                     leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());                     break;                 ...             }         }     }          protected void startSendingPackets() {         ...         new Thread() {             public void run() {                 Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());                 sendPackets();             }         }.start();         ...     }          private void sendPackets() throws InterruptedException {         while (true) {             ...             QuorumPacket p = queuedPackets.poll();             oa.writeRecord(p, "packet");             ...         }     }     ... }  public class Leader {     private final HashSet<LearnerHandler> forwardingFollowers = new HashSet<LearnerHandler>();     final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();     ...     synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {                 ...         //检查请求的ZXID,需要比上次已提交的请求的ZXID也就是lastCommitted要大         if (lastCommitted >= zxid) {             if (LOG.isDebugEnabled()) {                 LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid));             }             // The proposal has already been committed             return;         }         Proposal p = outstandingProposals.get(zxid);         //将LeaderSID添加到Proposal提议的投票收集器里         p.addAck(sid);         //尝试提交,即检查Proposal提议的投票收集器中是否有过半ACK响应         boolean hasCommitted = tryToCommit(p, zxid, followerAddr);         ...     }          synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {                //如果提议队列中存在该提议的前一个提议,说明该提议的前一个提议还没提交,那么就返回false         if (outstandingProposals.containsKey(zxid - 1)) return false;         //getting a quorum from all necessary configurations.         //Proposal提议的投票收集器是否已过半         if (!p.hasAllQuorums()) {             return false;                          }         ...         outstandingProposals.remove(zxid);         if (p.request != null) {             toBeApplied.add(p);         }         ...         //一旦提议通过,马上就要在Leader中标记lastCommitted为最新的提交ZXID         commit(zxid);//给Follower广播commit消息         inform(p);//给Observer发送commit消息         ...         //调用CommitProcessor处理器的commit方法提交请求         zk.commitProcessor.commit(p.request);//让Leader执行commit消息         //下面处理的是Learner发起的同步请求         if (pendingSyncs.containsKey(zxid)) {             for (LearnerSyncRequest r: pendingSyncs.remove(zxid)) {                 sendSync(r);             }                        }          return  true;        }          //广播commit消息     public void commit(long zxid) {         synchronized(this) {             //标记lastCommitted为最新的提交ZXID             lastCommitted = zxid;         }         QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);         sendPacket(qp);     }          void sendPacket(QuorumPacket qp) {         synchronized (forwardingFollowers) {             for (LearnerHandler f : forwardingFollowers) {                 //调用LearnerHandlerqueuePacket方法添加Packet到发送队列                 f.queuePacket(qp);             }         }     }          public void inform(Proposal proposal) {         QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null);         sendObserverPacket(qp);     }     ...     static public class Proposal extends SyncedLearnerTracker {         public QuorumPacket packet;         public Request request;         ...     } }  public class SyncedLearnerTracker {     protected ArrayList<QuorumVerifierAcksetPair> qvAcksetPairs = new ArrayList<QuorumVerifierAcksetPair>();     ...     //添加到投票收集器     public boolean addAck(Long sid) {         boolean change = false;         for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {             if (qvAckset.getQuorumVerifier().getVotingMembers().containsKey(sid)) {                 qvAckset.getAckset().add(sid);                 change = true;             }         }         return change;     }          //判断投票收集器是否过半     public boolean hasAllQuorums() {         for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {             if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))                 return false;         }         return true;     }     ... }

(3)实现数据一致性的恢复模式

当Leader故障时,Follower服务器会发生如下操作:首先Follower的followLeader()方法里的while循环会被中断运行,然后在QuorumPeer线程中就会触发执行Follower的shutdown()方法,接着执行QuorumPeer的updateServerState()方法更改节点的状态为LOOKING,之后Follower服务器在QuorumPeer线程中会重新进行Leader选举。

 

重新选举Leader需要经历一段时间,此时集群会短暂没有Leader服务器,而且重新选举Leader期间,Follower也会被关闭。

 

注意:Leader故障时,ZooKeeperServer的shutdown()方法会关闭firstProcessor线程。所以恢复模式下的选举过程中,发送到Learner的请求会进入firstProcessor,但是这些请求都会先被queuedRequests存起来,暂时不处理。

public class QuorumPeerMain {     protected QuorumPeer quorumPeer;          public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {         ...         quorumPeer.start();         quorumPeer.join();         ...     }     ... }  public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {     ...     public synchronized void start() {         loadDataBase();         startServerCnxnFactory();         adminServer.start();         //初始化Leader选举(初始化当前投票+监听选举端口+启动选举守护线程)         startLeaderElection();         startJvmPauseMonitor();         super.start();     }          @Override     public void run() {         ...         while (running) {             switch (getPeerState()) {                 case LOOKING:                     ...                     if (shuttingDownLE) {                         shuttingDownLE = false;                         startLeaderElection();                     }                     //调用QuorumPeer.electionAlg的lookForLeader(),也就是FastLeaderElection.lookForLeader()开启一轮选举                     setCurrentVote(makeLEStrategy().lookForLeader());                     ...                 case FOLLOWING:                     try {                         LOG.info("FOLLOWING");                         setFollower(makeFollower(logFactory));                         //Leader故障,那么就会中断follower.followLeader()里的的while循环                         follower.followLeader();                     } catch (Exception e) {                         LOG.warn("Unexpected exception",e);                     } finally {                         //Leader故障,就会执行这里的方法                         follower.shutdown();                         setFollower(null);                         updateServerState();                     }                     break;                 case LEADING:                     LOG.info("LEADING");                     try {                         setLeader(makeLeader(logFactory));                         leader.lead();                         setLeader(null);                     } catch (Exception e) {                         LOG.warn("Unexpected exception",e);                     } finally {                         if (leader != null) {                             leader.shutdown("Forcing shutdown");                             setLeader(null);                         }                         updateServerState();                     }                     break;             }             ...         }         ...     }          private synchronized void updateServerState() {         //reconfigFlag初始化就为false         if (!reconfigFlag) {             setPeerState(ServerState.LOOKING);             LOG.warn("PeerState set to LOOKING");             return;         }         ...         //updateServerStatez方法被执行后又会重置为false         reconfigFlag = false;     }          //开始Leader选举     //创建选举服务端QuorumCnxManager并启动监听 + 创建选举算法FastLeaderElection并启动     //将FastLeaderElection实例赋值给QuorumPeer.electionAlg     synchronized public void startLeaderElection() {         if (getPeerState() == ServerState.LOOKING) {             currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());         }         ...         this.electionAlg = createElectionAlgorithm(electionType);     }     ... }  public class Follower extends Learner{     ...     public void shutdown() {             super.shutdown();     }          void followLeader() throws InterruptedException {         ...         QuorumServer leaderServer = findLeader();         try {             //Follower启动时向Leader发起连接             connectToLeader(leaderServer.addr, leaderServer.hostname);             long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);             ...             syncWithLeader(newEpochZxid);             ...             QuorumPacket qp = new QuorumPacket();             while (this.isRunning()) {                 //读取BIO输入流                 readPacket(qp);                 processPacket(qp);             }           } catch (Exception e) {             //Leader故障,那么这里就会报异常,从而中断上面的while循环             LOG.warn("Exception when following the leader", e);             closeSocket();             pendingRevalidations.clear();         }         ...     }     ... }  public class Learner {            ...     public void shutdown() {         self.setZooKeeperServer(null);         self.closeAllConnections();         self.adminServer.setZooKeeperServer(null);         closeSocket();         //关闭比如FollowerZooKeeperServer         if (zk != null) {             //关闭sessionTracker的超时检查线程 + 设置firstProcessor为null + 清空zkDb             zk.shutdown(self.getSyncMode().equals(QuorumPeer.SyncMode.SNAP));         }     }          protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {         //创建Learner当前BIO的客户端Socket         this.sock = createSocket();         int initLimitTime = self.tickTime * self.initLimit;         int remainingInitLimitTime = initLimitTime;         long startNanoTime = nanoTime();         //尝试重连最多5for (int tries = 0; tries < 5; tries++) {             try {                 remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);                 if (remainingInitLimitTime <= 0) {                     LOG.error("initLimit exceeded on retries.");                     throw new IOException("initLimit exceeded on retries.");                 }                 //向Leader发起连接请求                 sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));                 if (self.isSslQuorum())  {                     ((SSLSocket) sock).startHandshake();                 }                 sock.setTcpNoDelay(nodelay);                 break;             } catch (IOException e) {                 ...             }             Thread.sleep(1000);         }         self.authLearner.authenticate(sock, hostname);         //初始化输入输出流         leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));         bufferedOutput = new BufferedOutputStream(sock.getOutputStream());         leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);     }          private Socket createSocket() throws X509Exception, IOException {         //创建客户端Socket         Socket sock;         if (self.isSslQuorum()) {             sock = self.getX509Util().createSSLSocket();         } else {             sock = new Socket();         }         sock.setSoTimeout(self.tickTime * self.initLimit);         return sock;     }          protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) throws IOException {         //向BIO的服务端发起连接请求         sock.connect(addr, timeout);     }          void readPacket(QuorumPacket pp) throws IOException {         synchronized (leaderIs) {             leaderIs.readRecord(pp, "packet");         }     }     ... }  public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {     ...     public synchronized void shutdown(boolean fullyShutDown) {         ...         //关闭会话的超时检查线程         if (sessionTracker != null) {             sessionTracker.shutdown();         }         //关闭firstProcessor线程         //所以Leader故障时,发送到Learner的请求会在firstProcessor的queuedRequests中存起来,暂时不处理         if (firstProcessor != null) {             firstProcessor.shutdown();         }         if (zkDb != null) {             if (fullyShutDown) {                 zkDb.clear();             } else {                 ...             }         }         unregisterJMX();     }     ... }

 

2.zk是如何进行Leader选举的

(1)服务器启动时的Leader选举流程概述

(2)服务器运行时的Leader选举流程概述

(3)Leader选举的规则

(4)Leader选举的实现细节

(5)Leader选举算法的实现流程

 

(1)服务器启动时的Leader选举流程概述

一.向其他服务器发出一个投自己的投票

二.接收来自其他服务器的投票

三.PK投票

四.统计投票

五.改变服务器状态

 

一个zk服务要想满足集群运行方式,至少需要三台服务器。下面以3台机器组成的服务器集群为例。当只有一台服务器启动时,是无法进行Leader选举的。当有两台服务器启动,两台机器可以相互通信时,每台机器都会试图找到一个Leader,于是便进入了Leader选举流程。

 

一.向其他服务器发出一个投自己的投票

每个服务器刚启动时,都会将自己作为Leader服务器来生成投票。投票包括的信息是:服务器ID(SID)、事务ID(ZXID),可记为(SID, ZXID)。该投票信息会发给集群中的其他所有机器。

 

二.接收来自其他服务器的投票

每个服务器接收到投票后,首先会检查该投票的有效性,包括检查是否是本轮投票、是否来自LOOKING状态的服务器等。

 

三.PK投票

每个服务器接收到投票并检查有效后,会PK自己的投票和收到的投票。

 

PK规则一:ZXID比较大的优先作为Leader

PK规则二:ZXID相同则SID较大的为Leader

 

PK出的Leader不是服务器自己,则更新自己的投票并重新把投票发出去。

 

四.统计投票

每次投票后,都会统计所有投票,判断是否已有过半机器收到相同投票。

 

五.改变服务器状态

一旦确定了Leader,每个服务器都会更新自己的状态。如果是Leader,那么服务器状态就变为LEADING。如果是Follower,那么服务器状态就变为FOLLOWING。

 

(2)服务器运行时的Leader选举流程概述

zk集群一旦选出一个Leader,所有服务器的集群角色一般不会再发生变化。如果有非Leader挂了或新机器加入,此时是不会影响Leader的。如果Leader挂了,那么整个集群将暂时无法服务,进入新一轮Leader选举。服务器运行期间的Leader选举和启动时的Leader选举过程是一致的。

 

一.变更状态

当Leader挂了,Follower服务器都会将其服务器状态变更为LOOKING。变更为LOOKING状态后,Follower服务器便开始进入Leader选举流程。

二.向其他服务器发出一个投自己的投票

三.接收来自其他服务器的投票

四.PK投票

五.统计投票

六.改变服务器状态

 

(3)Leader选举的规则

一.集群进入Leader选举的情况

二.一台机器进入Leader选举的情况

三.变更投票的规则

四.确定Leader的规则

 

一.集群进入Leader选举的情况

情况一:集群一开始启动时没有Leader

情况二:集群运行期间Leader挂了

 

二.一台机器进入Leader选举的情况

情况一:集群中本来就已经存在一个Leader了,即该机器是加入集群的。这种情况通常是集群中的某一台机器启动比较晚,在它启动前集群已工作。对于这种情况,当该机器试图去选举Leader时,会被告知当前的Leader。于是该机器只需要和Leader建立起连接,并进行数据同步即可。

 

情况二:集群中确实不存在Leader。

 

三.变更投票的规则

集群中的每台机器在发出自己的投票后,都会开始收到其他机器的投票。每台机器都会根据如下规则来PK收到的投票,并以此决定是否变更投票。每次PK投票,都是对比(vote_sid, vote_zxid)和(self_sid, self_zxid)的过程。

 

规则一:如果vote_zxid大于self_zxid,那么就认可收到的投票(vote_sid, vote_zxid),并再次将该投票发送出去。

 

规则二:如果vote_zxid小于self_zxid,那么就坚持自己的投票(self_sid, self_zxid),不做任何变更。

 

规则三:如果vote_zxid等于self_zxid,且vote_sid大于self_sid,那么就认可收到的投票(vote_sid, vote_zxid),并再次将该投票发送出去。

 

规则四:如果vote_zxid等于self_zxid,且vote_sid小于self_sid,那么就坚持自己的投票(self_sid, self_zxid),不做任何变更。

 

四.确定Leader的规则

如果一台机器收到了过半相同投票,那么这个投票对应的SID就是Leader。哪台服务器上的数据越新,ZXID越大,那么就越有可能成为Leader。

 

(4)Leader选举的实现细节

一.服务器状态

二.投票数据结构

三.网络连接管理器QuorumCnxManager

四.建立连接和消息接收与发送

五.FastLeaderElection的选票管理

 

一.服务器状态

QuorumPeer的ServerState枚举类列举了4种服务器状态。

public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {     ...     public enum ServerState {         //寻找Leader状态         LOOKING,//当服务器处于该状态时,认为集群中没有Leader,因此会进入Leader选举流程         //跟随者状态         FOLLOWING,//表明当前服务器的集群角色是Follower         //领导者状态         LEADING,//表明当前服务器的集群角色是Leader         //观察者状态         OBSERVING;//表明当前服务器的集群角色是Observer     }     ... }

二.投票数据结构

public class Vote {     final private long id;//被选举的Leader的SID     final private long zxid;//被选举的Leader的ZXID     final private long electionEpoch;//选举轮次,每次进入新一轮的投票后,都会对该值加1     final private long peerEpoch;//被选举的Leader的epoch     final private ServerState state;//当前服务器的状态     ... }

三.网络连接管理器QuorumCnxManager

ClientCnxn是zk客户端用于处理客户端请求的网络连接管理器; ServerCnxnFactory是zk服务端用于处理客户端请求的网络连接工厂; LearnerCnxAcceptor是Leader用来处理Learner连接的网络连接管理器; LearnerHandler是Leader用来处理Learner请求的网络处理器; QuorumCnxManager是QurumPeer处理Leader选举的网络连接管理器;

每个服务器启动时,都会启动一个QuorumCnxManager。QuorumCnxManager会负责Leader选举过程中服务器间的网络通信。

 

QuorumCnxManager的核心数据结构:

消息接收队列:recvQueue 各服务器的消息发送队列集合:queueSendMap 各服务器的发送器集合:senderWorkerMap 各服务器的最近发送消息集合:lastMessageSent
public class QuorumCnxManager {     //消息接收队列,用于存放从其他服务器接收到的消息     public final ArrayBlockingQueue<Message> recvQueue;     //各服务器对应的消息发送队列集合,用于保存那些待发送的消息,按SID分组,保证各台机器间的消息发送互不影响     final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;     //各服务器对应的发送器集合,按SID分组,每一台服务器都对应一个SendWorker发送器负责消息的发送     final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;     //各服务器对应的最近发送消息集合,在这个集合中会为每个SID保留最近发送过的一个消息     final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;     ... }

四.建立连接和消息接收与发送

为了能够相互投票,zk集群中的所有机器都需要两两建立网络连接。QuorumCnxManager启动时,会创建一个ServerSocket来监听3888端口。开启监听后,服务器就能接收到其他服务器发起的创建连接请求。在QuorumPeer启动时,会通过Election的lookForLeader()方法来发起连接。

 

服务器在收到其他服务器的连接请求时,会由QuorumCnxManager的receiveConnection()方法处理。为了避免两台机器重复创建TCP连接,zk设计了一套建立TCP连接的规则:只允许SID大的服务器主动和其他服务器建立连接,否则断开当前连接。

 

在QuorumCnxManager的receiveConnection()方法中,服务器会通过对比自己和远程服务器的SID值来判断是否接受连接请求。如果当前服务器发现自己的SID值更大,那么会断开当前连接,然后自己主动去和远程服务器建立连接。

 

一旦建立起连接,就会根据远程服务器的SID,创建并启动相应的消息发送器SendWorker和消息接收器RecvWorker。

 

消息的接收过程是由消息接收器RecvWorker负责的,zk服务器会为每个远程服务器单独分配一个消息接收器RecvWorker。每个RecvWorker只需不断从TCP连接中读取消息保存到recvQueue队列中。

 

消息的发送过程是由消息发送器SendWorker负责的,zk服务器会为每个远程服务器单独分配一个消息发送器SendWorker。每个SendWorker只需不断从对应的消息发送队列获取消息进行发送即可。

 

一旦zk服务器发现针对当前远程服务器的消息发送队列为空,那么就从lastMessageSent中取出一个最近发送的消息进行再次发送,以此解决上次发送的消息没有被接收到和没有被正确处理的问题。

 

建立连接的代码如下:

public class QuorumPeerMain {     protected QuorumPeer quorumPeer;          public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {         ...         quorumPeer.start();         quorumPeer.join();         ...     }     ... }  public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {     ...     public synchronized void start() {         loadDataBase();         startServerCnxnFactory();         adminServer.start();         //初始化Leader选举(初始化当前投票+监听选举端口+启动选举守护线程)         startLeaderElection();         startJvmPauseMonitor();         super.start();     }          @Override     public void run() {         ...         while (running) {             switch (getPeerState()) {                 case LOOKING:                     ...                     if (shuttingDownLE) {                         shuttingDownLE = false;                         startLeaderElection();                     }                     //调用QuorumPeer.electionAlg的lookForLeader(),也就是FastLeaderElection.lookForLeader()开启一轮选举                     setCurrentVote(makeLEStrategy().lookForLeader());                 ...                       }             ...         }         ...     }          //开始Leader选举     //创建选举服务端QuorumCnxManager并启动监听 + 创建选举算法FastLeaderElection并启动     //将FastLeaderElection实例赋值给QuorumPeer.electionAlg     synchronized public void startLeaderElection() {         if (getPeerState() == ServerState.LOOKING) {             //设置当前投票             currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());         }         ...         this.electionAlg = createElectionAlgorithm(electionType);     }          protected Election createElectionAlgorithm(int electionAlgorithm){         Election le=null;         ...         //创建网络连接管理器QuorumCnxManager         QuorumCnxManager qcm = createCnxnManager();         ...         QuorumCnxManager.Listener listener = qcm.listener;         //启动服务器并监听3888端口,等待其他服务器过来建立连接         listener.start();         FastLeaderElection fle = new FastLeaderElection(this, qcm);         fle.start();         le = fle;         return le;     }          public QuorumCnxManager createCnxnManager() {         return new QuorumCnxManager(...);     }     ... }  public class QuorumCnxManager {     //消息接收队列,用于存放从其他服务器接收到的消息     public final ArrayBlockingQueue<Message> recvQueue;     //各服务器对应的消息发送队列集合,用于保存那些待发送的消息,按SID分组,保证各台机器间的消息发送互不影响     final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;     //各服务器对应的发送器集合,按SID分组,每一台服务器都对应一个SendWorker发送器负责消息的发送     final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;     //各服务器对应的最近发送消息集合,在这个集合中会为每个SID保留最近发送过的一个消息     final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;     ...     public class Listener extends ZooKeeperThread {         volatile ServerSocket ss = null;         ...         public void run() {             ...             InetSocketAddress addr;             Socket client = null;             while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {                 ...                 ss = new ServerSocket();                 ss.setReuseAddress(true);                 addr = new InetSocketAddress(port);                 ss.bind(addr);                 while (!shutdown) {                     client = ss.accept();                     ...                     //处理其他服务发送过来的建立连接请求                     receiveConnection(client);                     ...                 }             }             ...         }         ...     }          public void receiveConnection(final Socket sock) {         DataInputStream din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));         handleConnection(sock, din);     }          private void handleConnection(Socket sock, DataInputStream din) throws IOException {         ...         //通过对比当前服务器自己的SID和远程服务器的SID,来判断是否接受连接请求         if (sid < self.getId()) {             //当前服务器自己的SID更大,则断开连接             SendWorker sw = senderWorkerMap.get(sid);             if (sw != null) {//先关闭消息发送器                 sw.finish();             }             //断开当前连接             closeSocket(sock);             //当前服务器重新主动去和远程服务器建立连接             if (electionAddr != null) {                 connectOne(sid, electionAddr);             } else {                 connectOne(sid);             }         } else if (sid == self.getId()) {             ...         } else {             //当前服务器自己的SID小             //则创建并启动相应的消息发送器SendWorker和消息接收器RecvWorker             SendWorker sw = new SendWorker(sock, sid);             RecvWorker rw = new RecvWorker(sock, din, sid, sw);             sw.setRecv(rw);//将消息接收器传入消息发送器中             SendWorker vsw = senderWorkerMap.get(sid);             if (vsw != null) {                 vsw.finish();             }             senderWorkerMap.put(sid, sw);             queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));             sw.start();             rw.start();         }     }          private void closeSocket(Socket sock) {         ...         sock.close();     } }

消息接收与发送的代码如下:

public class QuorumCnxManager {     //消息接收队列,用于存放从其他服务器接收到的消息     public final ArrayBlockingQueue<Message> recvQueue;     //各服务器对应的消息发送队列集合,用于保存那些待发送的消息,按SID分组,保证各台机器间的消息发送互不影响     final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;     //各服务器对应的发送器集合,按SID分组,每一台服务器都对应一个SendWorker发送器负责消息的发送     final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;     //各服务器对应的最近发送消息集合,在这个集合中会为每个SID保留最近发送过的一个消息     final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;     ...     //消息发送器     class SendWorker extends ZooKeeperThread {         Long sid;         Socket sock;         RecvWorker recvWorker;         ...         SendWorker(Socket sock, Long sid) {             super("SendWorker:" + sid);             this.sid = sid;             this.sock = sock;             ...         }                  synchronized void setRecv(RecvWorker recvWorker) {             this.recvWorker = recvWorker;         }         ...         public void run() {             ...             //一旦zk服务器发现针对sid服务器的消息发送队列为空,             //那么就从lastMessageSent中取出一个最近发送的消息进行再次发送,             //以此解决上次发送的消息没有被接收到和没有被正确处理的问题;             ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);             if (bq == null || isSendQueueEmpty(bq)) {                 ByteBuffer b = lastMessageSent.get(sid);                 if (b != null) {                     LOG.debug("Attempting to send lastMessage to sid=" + sid);                     send(b);                 }             }             while (running && !shutdown && sock != null) {                 ByteBuffer b = null;                 //取出要发送给sid机器的消息队列                 ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);                 if (bq != null) {                     //取出要发送的消息                     b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);                 } else {                     LOG.error("No queue of incoming messages for " + "server " + sid);                     break;                 }                 if (b != null) {                     //设置这台sid机器最近一次发送的消息                     lastMessageSent.put(sid, b);                     send(b);                 }                 ...             }             //关闭针对sid机器的消息发送线程             this.finish();         }                  //关闭针对sid机器的消息发送线程         synchronized boolean finish() {             LOG.debug("Calling SendWorker.finish for {}", sid);             if (!running) return running;             running = false;             closeSocket(sock);             this.interrupt();             if (recvWorker != null) recvWorker.finish();             LOG.debug("Removing entry from senderWorkerMap sid=" + sid);             senderWorkerMap.remove(sid, this);             threadCnt.decrementAndGet();             return running;         }     }     ...     //消息接收器     class RecvWorker extends ZooKeeperThread {         Long sid;         Socket sock;         final SendWorker sw;         final DataInputStream din;         ...         RecvWorker(Socket sock, DataInputStream din, Long sid, SendWorker sw) {             super("RecvWorker:" + sid);             this.sid = sid;             this.sock = sock;             this.sw = sw;             this.din = din;             ...         }                  public void run() {             ...             while (running && !shutdown && sock != null) {                 int length = din.readInt();                 ...                 byte[] msgArray = new byte[length];                 din.readFully(msgArray, 0, length);                 ByteBuffer message = ByteBuffer.wrap(msgArray);                 //将消息添加到消息接收队列中                 addToRecvQueue(new Message(message.duplicate(), sid));             }             sw.finish();             closeSocket(sock);         }                  synchronized boolean finish() {             LOG.debug("RecvWorker.finish called. sid: {}. myId: {}", sid, QuorumCnxManager.this.mySid);             if (!running) return running;             running = false;             this.interrupt();             threadCnt.decrementAndGet();             return running;         }     }     ...     //将消息添加到消息接收队列中     public void addToRecvQueue(Message msg) {         synchronized(recvQLock) {             if (recvQueue.remainingCapacity() == 0) {                 recvQueue.remove();             }             recvQueue.add(msg);         }     } }

五.FastLeaderElection的选票管理

FastLeaderElection的核心数据结构:

选票发送队列:sendqueue

选票接收队列:recvqueue

选票管理器:messenger

选票接收器:WorkerReceiver

选票发送器:WorkerSender

 

选票接收器WorkerReceiver会不断从QuorumCnxManager中,获取其他服务器发来的选举投票消息并转换成一个选票,然后保存到recvqueue选票接收队列中。在此过程中,如果发现其他服务器发来的投票的选举轮次小于当前服务器,那么就直接忽略这个其他服务器发来的投票,同时立即发出自己的投票。

 

选票发送器WorkerSender会不断从sendqueue队列中获取待发送的选票,并将其传递给QuorumCnxManager中进行发送。

public class FastLeaderElection implements Election {     LinkedBlockingQueue<ToSend> sendqueue;//选票发送队列,用于保存待发送的选票     LinkedBlockingQueue<Notification> recvqueue;//选票接收队列,用于保存接收到的外部选票     Messenger messenger;//选票管理器     QuorumCnxManager manager;//Leader选举时的网络连接管理器     ...     //通过构造方法传入Leader选举时的网络连接管理器QuorumCnxManager     public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {         this.stop = false;         this.manager = manager;         starter(self, manager);     }          private void starter(QuorumPeer self, QuorumCnxManager manager) {         ...         sendqueue = new LinkedBlockingQueue<ToSend>();//初始化选票发送器         recvqueue = new LinkedBlockingQueue<Notification>();//初始化选票接收器         this.messenger = new Messenger(manager);//创建选票管理器     }          public void start() {         this.messenger.start();//启动选票管理器     }          //选票管理器     protected class Messenger {         WorkerReceiver wr;//选票接收器         WorkerSender ws;//选票发送器         Thread wsThread = null;         Thread wrThread = null;                  Messenger(QuorumCnxManager manager) {             this.ws = new WorkerSender(manager);             this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");             this.wsThread.setDaemon(true);             this.wr = new WorkerReceiver(manager);             this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");             this.wrThread.setDaemon(true);         }                void start(){             this.wsThread.start();             this.wrThread.start();         }         ...         //选票接收器         class WorkerReceiver extends ZooKeeperThread {             ...             public void run() {                 Message response;                 while (!stop) {                     //从QuorumCnxManager中获取其他服务器发来的投票消息                     response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);                     if (response == null) continue;                     ...                     //将获取到的其他服务器发来的投票消息转换成一个选票Notification                     Notification n = new Notification();                     ...                     if (self.getPeerState() == QuorumPeer.ServerState.LOOKING) {                         //将选票n保存到recvqueue选票接收队列中                         recvqueue.offer(n);                         //如果其他服务器发来的投票的选举轮次小于当前服务器                         if ((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get())){                             //发出自己的投票                             Vote v = getVote();                             QuorumVerifier qv = self.getQuorumVerifier();                             ToSend notmsg = new ToSend(...);                             sendqueue.offer(notmsg);                         }                         ...                     }                     ...                 }             }             ...         }                  //选票发送器         class WorkerSender extends ZooKeeperThread {             volatile boolean stop;             QuorumCnxManager manager;                          WorkerSender(QuorumCnxManager manager) {                 super("WorkerSender");                 this.stop = false;                 this.manager = manager;             }                          public void run() {                 while (!stop) {                     //选票发送器WorkerSender会不断从sendqueue队列中获取待发送的选票                     ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);                     if (m == null) continue;                     //将待发送的选票传递给QuorumCnxManager中进行发送                     process(m);                 }                 LOG.info("WorkerSender is down");             }                          void process(ToSend m) {                 ByteBuffer requestBuffer = buildMsg(...);                 manager.toSend(m.sid, requestBuffer);             }         }     }     ... }  public class QuorumCnxManager {     //消息接收队列,用于存放从其他服务器接收到的消息     public final ArrayBlockingQueue<Message> recvQueue;     ...     public Message pollRecvQueue(long timeout, TimeUnit unit) throws InterruptedException {         return recvQueue.poll(timeout, unit);     }          public void toSend(Long sid, ByteBuffer b) {         //If sending message to myself, then simply enqueue it (loopback).         if (this.mySid == sid) {             b.position(0);             addToRecvQueue(new Message(b.duplicate(), sid));             //Otherwise send to the corresponding thread to send.         } else {             //Start a new connection if doesn't have one already.             ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);             ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);             if (oldq != null) {                 addToSendQueue(oldq, b);             } else {                 addToSendQueue(bq, b);             }             connectOne(sid);         }     }     ... }

如下是选票管理各组件间的协作图:

zk源码—6.Leader选举的实现原理

(5)Leader选举算法的实现流程

当zk服务器检测到当前服务器状态为LOOKING时,就会触发Leader选举,也就是调用FastLeaderElection的lookForLeader()方法来进行Leader选举。Leader选举算法的具体流程如下:

zk源码—6.Leader选举的实现原理

一.自增选举轮次

FastLeaderElection.logicalclock用于标识当前Leader的选举轮次,Leader选举规定所有有效的投票都必须在同一轮次中。

 

二.初始化选票

在开始新一轮投票之前,每个服务器都会首先初始化自己的选票。在初始化阶段,每个服务器都会将自己推荐为Leader。

 

三.发送初始化选票

在完成选票的初始化后,服务器就会发起第一次投票。zk会将刚刚初始化好的选票放入sendqueue选票发送队列中,然后由选票发送器WorkerSender负责发送出去。

 

四.接收外部投票

接着通过一个while循环不断从recvqueue选票接收队列中获取外部投票。如果服务器发现无法获取到任何的外部投票,那么就会确认和其他服务器建立的连接是否还有效。如果发现连接没有效,那么就会马上建立连接。如果连接还有效,那么就再次发送服务器自己的内部投票。

 

五.判断选举轮次

判断选举轮次的原因:只有在同一个选举轮次的投票才是有效的投票。

 

情况一:如果外部投票的选举轮次大于内部投票,那么服务器会先更新自己的选举轮次logicalclock。然后清空所有已经收到的投票,即清空归档选票集合recvset。接着让内部投票和外部投票进行PK以确定是否要变更内部投票。

 

情况二:如果外部投票的选举轮次小于内部投票,那么服务器会直接忽略该外部投票,不做任何处理。

 

情况三:如果外部投票的选举轮次等于内部投票,那么就让内部投票和外部投票进行PK。

 

六.选票PK

在接收到来自其他服务器的有效的外部投票后,接着通过FastLeaderElection的totalOrderPredicate()方法进行选票PK。主要从选举轮次、ZXID、和SID来考虑。如果外部投票的选举轮次大,则进行投票变更。如果选举轮次一致,且外部投票的ZXID大,则进行投票变更。如果选举轮次+ZXID一致,且外部投票的SID大,也进行投票变更。

 

七.变更投票

也就是使用外部投票的选票信息来覆盖内部投票。

 

八.选票归档

无论是否变更投票,都会将收到的有效的外部投票放入选票集合recvset。recvset会按SID记录当前服务器在本轮次的选举中收到的所有外部投票。

 

九.统计投票

完成选票归档后,就可以开始统计投票了。统计投票就是确定是否已有过半服务器认可当前的内部投票。如果是,则终止投票;否则,继续接收外部投票进行处理。

 

十.更新服务器状态

判断被过半服务器认可的投票对应的Leader是否是自己。如果是自己,则更新服务器状态为LEADING,否则FOLLOWING。

 

注意finalizeWait:

如果统计投票发现已经有过半的服务器认可了当前的投票,那么zk并不会立即更新服务器状态,而是会等待一段finalizeWait时间(200毫秒)来确定是否有新的更优的投票。

public class FastLeaderElection implements Election {     LinkedBlockingQueue<ToSend> sendqueue;//选票发送队列,用于保存待发送的选票     LinkedBlockingQueue<Notification> recvqueue;//选票接收队列,用于保存接收到的外部选票     Messenger messenger;//选票管理器     QuorumCnxManager manager;//Leader选举时的网络连接管理器     //Determine how much time a process has to wait once it believes that it has reached the end of leader election.     final static int finalizeWait = 200;     AtomicLong logicalclock = new AtomicLong();//标识当前Leader的选举轮次     long proposedLeader;//SID     long proposedZxid;//ZXID     long proposedEpoch;//epoch     ...     //触发Leader选举     public Vote lookForLeader() throws InterruptedException {         //用于归档的选票集合         HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();         ...         int notTimeout = finalizeWait;         synchronized(this) {             //1.自增选举轮次             logicalclock.incrementAndGet();             //2.初始化选票             updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());         }         //3.发送初始化选票         sendNotifications();         //4.接收外部投票         while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {             //不断从recvqueue中获取其他服务器发过来的投票             Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);             //如果服务器发现无法获取到任何的外部投票,那么就会确认和其他服务器建立的连接是否还有效             if (n == null) {                 if (manager.haveDelivered()) {                     //如果连接还有效,那么就再次发送服务器自己的内部投票                     sendNotifications();                 } else {                     //如果连接没有效,那么就会马上建立连接                     manager.connectAll();                 }                 ...             } else if (validVoter(n.sid) && validVoter(n.leader)) {                 switch (n.state) {                     case LOOKING:                         //5.判断选举轮次                         //外部投票的选举轮次n.electionEpoch,大于内部投票的选举轮次logicalclock                         if (n.electionEpoch > logicalclock.get()) {                             //更新自己的选举轮次logicalclock                             logicalclock.set(n.electionEpoch);                             //清空所有已经收到的投票,因为这样可以确保recvset保存的都是同一轮次的投票                             recvset.clear();                             //用初始化的投票通过totalOrderPredicate()方法来进行PK以确定是否变更内部投票                             if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {                                 updateProposal(n.leader, n.zxid, n.peerEpoch);                             } else {                                 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());                             }                             //最后将内部投票发送出去                             sendNotifications();                         } else if (n.electionEpoch < logicalclock.get()) {                             ...                             //外部头的选举轮次n.electionEpoch小于内部投票的选举轮次logicalclock                             //直接忽略该外部投票,不做任何处理,break掉这次while循环                             break;                             //6.通过totalOrderPredicate()方法进行选票PK                         } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {                             //如果外部投票的选举轮次和内部投票的选举轮次一致                             //那么就在判断条件里通过totalOrderPredicate()方法进行选票PK                             //totalOrderPredicate()方法返回true,说明外部投票胜出,于是变更投票                             //7.变更投票                             updateProposal(n.leader, n.zxid, n.peerEpoch);                             sendNotifications();                         }                          // don't care about the version if it's in LOOKING state                         //8.选票归档                         recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));                         //9.统计投票                         if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) {                             //termPredicate()方法返回true说明有过半服务器认可当前服务器的内部投票了                             while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {                                 if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){                                     recvqueue.put(n);                                     break;                                 }                             }                              //This predicate is true once we don't read any new relevant message from the reception queue                             if (n == null) {                                 //10.更新服务器状态                                 self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());                                 Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);                                 leaveInstance(endVote);                                 return endVote;                             }                         }                         break;                     case OBSERVING:                         ...                 }             } else {                 ...             }         }         return null;     }     ...     //更新选票     synchronized void updateProposal(long leader, long zxid, long epoch){         proposedLeader = leader;//SID         proposedZxid = zxid;//ZXID         proposedEpoch = epoch;     }          //发送选票给所有机器:Send notifications to all peers upon a change in our vote     private void sendNotifications() {         for (long sid : self.getCurrentAndNextConfigVoters()) {             QuorumVerifier qv = self.getQuorumVerifier();             ToSend notmsg = new ToSend(ToSend.mType.notification,                 proposedLeader,                 proposedZxid,                 logicalclock.get(),                 QuorumPeer.ServerState.LOOKING,                 sid,                 proposedEpoch, qv.toString().getBytes());             sendqueue.offer(notmsg);         }     }          //进行选票PK     protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {         if (self.getQuorumVerifier().getWeight(newId) == 0) {             return false;         }         //We return true if one of the following three cases hold:         //1- New epoch is higher         //2- New epoch is the same as current epoch, but new zxid is higher         //3- New epoch is the same as current epoch, new zxid is the same as current zxid, but server id is higher.         return ((newEpoch > curEpoch) ||             ((newEpoch == curEpoch) &&             ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));     }          //统计选票     protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {         SyncedLearnerTracker voteSet = new SyncedLearnerTracker();         voteSet.addQuorumVerifier(self.getQuorumVerifier());         if (self.getLastSeenQuorumVerifier() != null                  && self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {             voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());         }         //遍历归档的投票votes,将认可内部投票vote的那些投票添加到voteSet集合         for (Map.Entry<Long, Vote> entry : votes.entrySet()) {             if (vote.equals(entry.getValue())) {                 voteSet.addAck(entry.getKey());             }         }         //是否有过半服务器认可内部投票vote         return voteSet.hasAllQuorums();     } }

 

发表评论

您必须 [ 登录 ] 才能发表留言!

相关文章

当前内容话题
  • 0