desehawk 发表于 2014-11-25 18:11:47

Zookeeper源码分析之五 Leader选举

本帖最后由 desehawk 于 2014-11-25 22:22 编辑


问题导读


1.Obserer机器是否参与选举?
2.选举成功的条件是什么?为什么zookeeper要保存奇数?




static/image/hrline/4.gif



前面几篇文章简单介绍了zookeeper的单机server client处理。接下来几篇文章会介绍分布式部署下zookeeper的实现原理。我们假设有3台server的集群,zoo.cfg配置如下

tickTime=2000
dataDir=/home/admin/zk-data
clientPort=2181
#Learner初始化连接到Leader的超时时间
initLimit=10
#Learner和Leader之间消息发送,响应的超时时间
syncLimit=5
#集群配置,3台机器,2888为Leader服务端口,3888为选举时所用的端口
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888


在server.1的$dataDir下

echo '1'>myid


启动server.1

./zkServer.sh start




分析之前先看看选举相关的类图








入口函数QuorumPeerMain主线程启动



public void runFromConfig(QuorumPeerConfig config) throws IOException {
      ......
   
      LOG.info("Starting quorum peer");
      try {
      //对client提供读写的server,一般是2181端口
          ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
          cnxnFactory.configure(config.getClientPortAddress(),
                              config.getMaxClientCnxns());
      //zk的逻辑主线程,负责选举,投票等
          quorumPeer = new QuorumPeer();
          quorumPeer.setClientPortAddress(config.getClientPortAddress());
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      new File(config.getDataLogDir()),
                      new File(config.getDataDir())));
      //集群机器地址
          quorumPeer.setQuorumPeers(config.getServers());
          quorumPeer.setElectionType(config.getElectionAlg());
      //本机的集群编号
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
      //投票决定方式,默认超过半数就通过
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setLearnerType(config.getPeerType());
      //启动主线程
          quorumPeer.start();
          quorumPeer.join();
      } catch (InterruptedException e) {
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
      }
    }


QuorumPeer复写Thread.start方法,启动


   @Override
   public synchronized void start() {
//恢复DB,从zxid中回复epoch变量,代表投票轮数
       loadDataBase();
//启动针对client的IO线程
       cnxnFactory.start();
//选举初始化,主要是从配置获取选举类型         
       startLeaderElection();
//启动
       super.start();
   }



loadDataBase过程,恢复epoch数

private void loadDataBase() {
      try {
      //从本地文件恢复db
            zkDb.loadDataBase();

            // load the epochs
      //从最新的zxid恢复epoch变量,zxid64位,前32位是epoch值,后32位是zxid
            long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
            long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
            try {
                currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
            } catch(FileNotFoundException e) {
                // pick a reasonable epoch number
                // this should only happen once when moving to a
                // new code version
                currentEpoch = epochOfZxid;
                LOG.info(CURRENT_EPOCH_FILENAME
                        + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                        currentEpoch);
                writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
            }
            if (epochOfZxid > currentEpoch) {
                throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
            }
.......
    }

选举初始化


synchronized public void startLeaderElection() {
      try {
      //先投自己
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
      } catch(IOException e) {
            RuntimeException re = new RuntimeException(e.getMessage());
            re.setStackTrace(e.getStackTrace());
            throw re;
      }
    //从配置中拿自己的选举地址
      for (QuorumServer p : getView().values()) {
            if (p.id == myid) {
                myQuorumAddr = p.addr;
                break;
            }
      }
      ......
    //根据配置,获取选举算法
      this.electionAlg = createElectionAlgorithm(electionType);
    }



获取选举算法,默认为FastLeaderElection算法

protected Election createElectionAlgorithm(int electionAlgorithm){
      Election le=null;
                  
      //TODO: use a factory rather than a switch
      switch (electionAlgorithm) {
      case 0:
            le = new LeaderElection(this);
            break;
      case 1:
            le = new AuthFastLeaderElection(this);
            break;
      case 2:
            le = new AuthFastLeaderElection(this, true);
            break;
      case 3:
      //leader选举IO负责类
            qcm = new QuorumCnxManager(this);
            QuorumCnxManager.Listener listener = qcm.listener;
                //启动已绑定3888端口的选举线程,等待集群其他机器连接
      if(listener != null){
                listener.start();
      //基于TCP的选举算法
                le = new FastLeaderElection(this, qcm);
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
      default:
            assert false;
      }
      return le;
    }



FastLeaderElection初始化
   private void starter(QuorumPeer self, QuorumCnxManager manager) {
       this.self = self;
       proposedLeader = -1;
       proposedZxid = -1;
//业务层发送队列,业务对象ToSend
       sendqueue = new LinkedBlockingQueue<ToSend>();
//业务层接受队列,业务对象Notificataion
       recvqueue = new LinkedBlockingQueue<Notification>();
//
       this.messenger = new Messenger(manager);
   }
essenger(QuorumCnxManager manager) {
    //启动业务层发送线程,将消息发给IO负责类QuorumCnxManager
         this.ws = new WorkerSender(manager);

         Thread t = new Thread(this.ws,
                   "WorkerSender");
         t.setDaemon(true);
         t.start();
    //启动业务层接受线程,从IO负责类QuorumCnxManager接受消息
         this.wr = new WorkerReceiver(manager);

         t = new Thread(this.wr,
                   "WorkerReceiver");
         t.setDaemon(true);
         t.start();
       }


QuorumPeer线程启动

run(){
.......
try {
            /*
             * Main loop
             */
            while (running) {
                switch (getPeerState()) {
      //如果状态是LOOKING,则进入选举流程
                case LOOKING:
                  LOG.info("LOOKING");

                  ......
                        try {
                //选举算法开始选举,主线程可能在这里耗比较长时间
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }
                  }
                  break;
      //其他流程处理         
                case OBSERVING:
                  try {
                        LOG.info("OBSERVING");
                        setObserver(makeObserver(logFactory));
                        observer.observeLeader();
                  } catch (Exception e) {
                        LOG.warn("Unexpected exception",e );                        
                  } finally {
                        observer.shutdown();
                        setObserver(null);
                        setPeerState(ServerState.LOOKING);
                  }
                  break;
                case FOLLOWING:
                  try {
                        LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                  } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                  } finally {
                        follower.shutdown();
                        setFollower(null);
                        setPeerState(ServerState.LOOKING);
                  }
                  break;
                case LEADING:
                  LOG.info("LEADING");
                  try {
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                  } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                  } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        setPeerState(ServerState.LOOKING);
                  }
                  break;
                }
.......
}

进入选举流程
public Vote lookForLeader() throws InterruptedException {
......
      try {
      //收到的投票
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
         
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = finalizeWait;

            synchronized(this){
                logicalclock++;
      //先投给自己
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info("New election. My id =" + self.getId() +
                  ", proposed zxid=0x" + Long.toHexString(proposedZxid));
      //发送投票,包括发给自己
            sendNotifications();

            /*
             * Loop in which we exchange notifications until we find a leader
             */
      //主循环,直到选出leader
            while ((self.getPeerState() == ServerState.LOOKING) &&
                  (!stop)){
                /*
               * Remove next notification from queue, times out after 2 times
               * the termination time
               */
      //从IO线程里拿到投票消息,自己的投票也在这里处理
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

                /*
               * Sends more notifications if haven't received enough.
               * Otherwise processes new notification.
               */
      //如果空闲
                if(n == null){
            //消息发完了,继续发送,一直到选出leader为止
                  if(manager.haveDelivered()){
                        sendNotifications();
                  } else {
            //消息还在,可能其他server还没启动,尝试连接
                        manager.connectAll();
                  }

                  /*
                     * Exponential backoff
                     */
            //延长超时时间
                  int tmpTimeOut = notTimeout*2;
                  notTimeout = (tmpTimeOut < maxNotificationInterval?
                            tmpTimeOut : maxNotificationInterval);
                  LOG.info("Notification time out: " + notTimeout);
                }
      //收到了投票消息
                else if(self.getVotingView().containsKey(n.sid)) {
                  /*
                     * Only proceed if the vote comes from a replica in the
                     * voting view.
                     */
                  switch (n.state) {
            //LOOKING消息,则
                  case LOOKING:
            ......
            //检查下收到的这张选票是否可以胜出,依次比较选举轮数epoch,事务zxid,服务器编号server id
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                              proposedLeader, proposedZxid, proposedEpoch)) {
                //胜出了,就把自己的投票修改为对方的,然后广播消息
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        ......
            //添加到本机投票集合,用来做选举终结判断
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
            
            //选举是否结束,默认算法是超过半数server同意
                        if (termPredicate(recvset,
                              new Vote(proposedLeader, proposedZxid,
                                        logicalclock, proposedEpoch))) {

                            ......
                //修改状态,LEADING or FOLLOWING
                              self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                //返回最终的选票结果
                              Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, proposedEpoch);
                              leaveInstance(endVote);
                              return endVote;
                            }
                        }
                        break;
            //如果收到的选票状态不是LOOKING,比如这台机器刚加入一个已经服务的zk集群时
            //OBSERVING机器不参数选举
                  case OBSERVING:
                        LOG.debug("Notification from observer: " + n.sid);
                        break;
            //这2种需要参与选举
                  case FOLLOWING:
                  case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                        if(n.electionEpoch == logicalclock){
                //同样需要加入到本机的投票集合
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                //投票是否结束,如果结束,再确认LEADER是否有效
                //如果结束,修改自己的状态并返回投票结果
                            if(termPredicate(recvset, new Vote(n.leader,
                                          n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                                          && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                              self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                              Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                              leaveInstance(endVote);
                              return endVote;
                            }
                        }

                        /**
                         * Before joining an established ensemble, verify that
                         * a majority are following the same leader.
                         */
                        outofelection.put(n.sid, new Vote(n.leader, n.zxid,
                              n.electionEpoch, n.peerEpoch, n.state));
                        ......
                        break;
                  default:


选举消息发送
    private void sendNotifications() {
    //循环发送
      for (QuorumServer server : self.getVotingView().values()) {
            long sid = server.id;
      //消息实体
            ToSend notmsg = new ToSend(ToSend.mType.notification,
                  proposedLeader,
                  proposedZxid,
                  logicalclock,
                  QuorumPeer.ServerState.LOOKING,
                  sid,
                  proposedEpoch);
......
      //添加到业务的发送队列,该队列会被WorkerSender消费
            sendqueue.offer(notmsg);
      }
    }


WorkerSender消费

public void run() {
    while (!stop) {
      try {
            ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
            if(m == null) continue;

            process(m);
      } catch (InterruptedException e) {
            break;
      }
    }
    LOG.info("WorkerSender is down");
}

private void process(ToSend m) {

    byte requestBytes[] = new byte;
    ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);

    /*
   * Building notification packet to send
   */

    requestBuffer.clear();
    requestBuffer.putInt(m.state.ordinal());
    requestBuffer.putLong(m.leader);
    requestBuffer.putLong(m.zxid);
    requestBuffer.putLong(m.electionEpoch);
    requestBuffer.putLong(m.peerEpoch);
CnxManager这个IO负责类发送消息
    manager.toSend(m.sid, requestBuffer);

}



QuorumCnxManager具体发送


   public void toSend(Long sid, ByteBuffer b) {
       /*
      * If sending message to myself, then simply enqueue it (loopback).
      */
//如果是自己,不走网络,直接添加到本地接受队列
       if (self.getId() == sid) {
            b.position(0);
            addToRecvQueue(new Message(b.duplicate(), sid));
         /*
            * Otherwise send to the corresponding thread to send.
            */
//否则,先添加到发送队列,然后尝试连接,连接成功则给每台server启动发送和接受线程
       } else {
            /*
             * Start a new connection if doesn't have one already.
             */
            if (!queueSendMap.containsKey(sid)) {
                ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
                        SEND_CAPACITY);
                queueSendMap.put(sid, bq);
                addToSendQueue(bq, b);

            } else {
                ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                if(bq != null){
                  addToSendQueue(bq, b);
                } else {
                  LOG.error("No queue for server " + sid);
                }
            }
            connectOne(sid);
               
       }
   }


尝试连接过程
synchronized void connectOne(long sid){
      if (senderWorkerMap.get(sid) == null){
      .......
      //对方的选举地址,3888端口
                electionAddr = self.quorumPeers.get(sid).electionAddr;
            .......
      //同步IO连接
                Socket sock = new Socket();
                setSockOpts(sock);
                sock.connect(self.getView().get(sid).electionAddr, cnxTO);
                if (LOG.isDebugEnabled()) {
                  LOG.debug("Connected to server " + sid);
                }
      //连上了,初始化IO线程
                initiateConnection(sock, sid);
            ......
    }


由于这个时候只有server.1启动,当它尝试去连接其他server时,会报错,选举线程会一直重试。此时,server.1只收到了自己的选票。然后我们启动server.2,server.2也会主动去连接server.1,这个时候server.1h和server.2会相互发起连接,但最终只有有一个连接成功,请看下问。

这个时候被连接的server的Listener选举线程会收到新连接
Listener主循环,接受连接
while (!shutdown) {
                        Socket client = ss.accept();
                        setSockOpts(client);
                        LOG.info("Received connection request "
                              + client.getRemoteSocketAddress());
                        receiveConnection(client);
                        numRetries = 0;
                  }

新连接处理

public boolean receiveConnection(Socket sock) {
       Long sid = null;
         
       try {
         // Read server id
    //读server id
         DataInputStream din = new DataInputStream(sock.getInputStream());
         sid = din.readLong();
         ......
         
       //If wins the challenge, then close the new connection.
//如果对方id比我小,则关闭连接,只允许大id的server连接小id的server
       if (sid < self.getId()) {
         /*
            * This replica might still believe that the connection to sid is
            * up, so we have to shut down the workers before trying to open a
            * new connection.
            */
         SendWorker sw = senderWorkerMap.get(sid);
         if (sw != null) {
               sw.finish();
         }

         /*
            * Now we start a new connection
            */
         LOG.debug("Create new connection to server: " + sid);
         closeSocket(sock);
         connectOne(sid);

         // Otherwise start worker threads to receive data.
       }   
//如果对方id比我大,允许连接,并初始化单独的IO线程
else {
         SendWorker sw = new SendWorker(sock, sid);
         RecvWorker rw = new RecvWorker(sock, sid, sw);
         sw.setRecv(rw);

         SendWorker vsw = senderWorkerMap.get(sid);
            
         if(vsw != null)
               vsw.finish();
            
         senderWorkerMap.put(sid, sw);
            
         if (!queueSendMap.containsKey(sid)) {
               queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                     SEND_CAPACITY));
         }
            
         sw.start();
         rw.start();
            
         return true;      
       }
       return false;
   }

连上后,自己server的IO线程初始化

public boolean initiateConnection(Socket sock, Long sid) {
      DataOutputStream dout = null;
      try {
            // Sending id and challenge
      //先发一个server id
            dout = new DataOutputStream(sock.getOutputStream());
            dout.writeLong(self.getId());
            dout.flush();
      }   
      ......
      // If lost the challenge, then drop the new connection
    //如果对方id比自己大,则关闭连接,这样导致的结果就是大id的server才会去连接小id的server,避免连接浪费
      if (sid > self.getId()) {
            LOG.info("Have smaller server identifier, so dropping the " +
                     "connection: (" + sid + ", " + self.getId() + ")");
            closeSocket(sock);
            // Otherwise proceed with the connection
      }   
    //如果对方id比自己小,则保持连接,并初始化单独的发送和接受线程
    else {
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, sid, sw);
            sw.setRecv(rw);

            SendWorker vsw = senderWorkerMap.get(sid);
            
            if(vsw != null)
                vsw.finish();
            
            senderWorkerMap.put(sid, sw);
            if (!queueSendMap.containsKey(sid)) {
                queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                        SEND_CAPACITY));
            }
            
            sw.start();
            rw.start();
            
            return true;      
            
      }
      return false;
    }

通过以上的连接处理,每2台选举机器之间只会建立一个选举连接。

IO发送线程SendWorker启动,开始发送选举消息
try {
                while (running && !shutdown && sock != null) {

                  ByteBuffer b = null;
                  try {
            //每个server一个发送队列
                        ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
                              .get(sid);
                        if (bq != null) {
                //拿消息
                            b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                        } else {
                            LOG.error("No queue of incoming messages for " +
                                    "server " + sid);
                            break;
                        }

                        if(b != null){
                //发消息
                            lastMessageSent.put(sid, b);
                            send(b);
                        }
                  } catch (InterruptedException e) {
                        LOG.warn("Interrupted while waiting for message on queue",
                              e);
                  }
                }
            } catch (Exception e) {
                LOG.warn("Exception when using channel: for id " + sid + " my id = " +   
                        self.getId() + " error = " + e);
            }
            this.finish();
......

这个时候,其他机器通过IO线程RecvWorker收到消息
public void run() {
         threadCnt.incrementAndGet();
         try {
               while (running && !shutdown && sock != null) {
                   /**
                  * Reads the first int to determine the length of the
                  * message
                  */
      //包的长度
                   int length = din.readInt();
                   if (length <= 0 || length > PACKETMAXSIZE) {
                     throw new IOException(
                               "Received packet with invalid packet: "
                                       + length);
                   }
                   /**
                  * Allocates a new ByteBuffer to receive the message
                  */
      //读到内存
                   byte[] msgArray = new byte;
                   din.readFully(msgArray, 0, length);
                   ByteBuffer message = ByteBuffer.wrap(msgArray);
      //添加到接收队列,后续业务层的接收线程WorkerReceiver会来拿消息
                   addToRecvQueue(new Message(message.duplicate(), sid));
               }
         ......
       }


业务层的接受线程WorkerReceiver拿消息

public void run() {

               Message response;
               while (!stop) {
                   // Sleeps on receive
                   try{
      //从IO线程拿数据
                     response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                     if(response == null) continue;

                     /*
                        * If it is from an observer, respond right away.
                        * Note that the following predicate assumes that
                        * if a server is not a follower, then it must be
                        * an observer. If we ever have any other type of
                        * learner in the future, we'll have to change the
                        * way we check for observers.
                        */
      //如果是Observer,则返回当前选举结果
                     if(!self.getVotingView().containsKey(response.sid)){
                           Vote current = self.getCurrentVote();
                           ToSend notmsg = new ToSend(ToSend.mType.notification,
                                 current.getId(),
                                 current.getZxid(),
                                 logicalclock,
                                 self.getPeerState(),
                                 response.sid,
                                 current.getPeerEpoch());

                           sendqueue.offer(notmsg);
                     }
      else {
                           // Receive new message
                           .......
                           // State of peer that sent this message
            //对方节点状态
                           QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                           switch (response.buffer.getInt()) {
                           case 0:
                               ackstate = QuorumPeer.ServerState.LOOKING;
                               break;
                           case 1:
                               ackstate = QuorumPeer.ServerState.FOLLOWING;
                               break;
                           case 2:
                               ackstate = QuorumPeer.ServerState.LEADING;
                               break;
                           case 3:
                               ackstate = QuorumPeer.ServerState.OBSERVING;
                               break;
                           }

                           // Instantiate Notification and set its attributes
            //初始化Notification对象
                           Notification n = new Notification();
                           n.leader = response.buffer.getLong();
                           n.zxid = response.buffer.getLong();
                           n.electionEpoch = response.buffer.getLong();
                           n.state = ackstate;
                           n.sid = response.sid;
                           ......

                           /*
                            * If this server is looking, then send proposed leader
                            */
            //如果自己也在LOOKING,则放入业务接收队列,选举主线程会消费该消息
                           if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
                               recvqueue.offer(n);

                               ......
                           }   
            //如果自己不在选举中,而对方server在LOOKING中,则向其发送当前的选举结果,当有server加入一个essemble时有用
            else {
                               /*
                              * If this server is not looking, but the one that sent the ack
                              * is looking, then send back what it believes to be the leader.
                              */
                               Vote current = self.getCurrentVote();
                               if(ackstate == QuorumPeer.ServerState.LOOKING){
                                 if(LOG.isDebugEnabled()){
                                       LOG.debug("Sending new notification. My id =" +
                                             self.getId() + " recipient=" +
                                             response.sid + " zxid=0x" +
                                             Long.toHexString(current.getZxid()) +
                                             " leader=" + current.getId());
                                 }
                                 ToSend notmsg = new ToSend(
                                           ToSend.mType.notification,
                                           current.getId(),
                                           current.getZxid(),
                                           logicalclock,
                                           self.getPeerState(),
                                           response.sid,
                                           current.getPeerEpoch());
                                 sendqueue.offer(notmsg);
                               }
                           }
                     .......
         }



由于整个集群只有3台机器,所以server.1和server.2启动后,即可选举出Leader。后续Leader和Follower开始数据交互,请看后文。



Leader选举小结
1.server启动时默认选举自己,并向整个集群广播
2.收到消息时,通过3层判断:选举轮数,zxid,server id大小判断是否同意对方,如果同意,则修改自己的选票,并向集群广播
3.QuorumCnxManager负责IO处理,每2个server建立一个连接,只允许id大的server连id小的server,每个server启动单独的读写线程处理,使用阻塞IO
4.默认超过半数机器同意时,则选举成功,修改自身状态为LEADING或FOLLOWING
5.Obserer机器不参与选举


相关文章推荐:
Zookeeper源码分析之一Server启动

Zookeeper源码分析之二Session建立

Zookeeper源码分析之三Exists请求和处理

深入浅出Zookeeper之四Create请求和处理

Zookeeper源码分析之六 Leader/Follower初始化

Zookeeper源码分析之七分布式CREATE事务处理


页: [1]
查看完整版本: Zookeeper源码分析之五 Leader选举