Zookeeper源码分析之五 Leader选举
本帖最后由 desehawk 于 2014-11-25 22:22 编辑问题导读
1.Obserer机器是否参与选举?
2.选举成功的条件是什么?为什么zookeeper要保存奇数?
static/image/hrline/4.gif
前面几篇文章简单介绍了zookeeper的单机server client处理。接下来几篇文章会介绍分布式部署下zookeeper的实现原理。我们假设有3台server的集群,zoo.cfg配置如下
tickTime=2000
dataDir=/home/admin/zk-data
clientPort=2181
#Learner初始化连接到Leader的超时时间
initLimit=10
#Learner和Leader之间消息发送,响应的超时时间
syncLimit=5
#集群配置,3台机器,2888为Leader服务端口,3888为选举时所用的端口
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888
在server.1的$dataDir下
echo '1'>myid
启动server.1
./zkServer.sh start
分析之前先看看选举相关的类图
入口函数QuorumPeerMain主线程启动
public void runFromConfig(QuorumPeerConfig config) throws IOException {
......
LOG.info("Starting quorum peer");
try {
//对client提供读写的server,一般是2181端口
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
//zk的逻辑主线程,负责选举,投票等
quorumPeer = new QuorumPeer();
quorumPeer.setClientPortAddress(config.getClientPortAddress());
quorumPeer.setTxnFactory(new FileTxnSnapLog(
new File(config.getDataLogDir()),
new File(config.getDataDir())));
//集群机器地址
quorumPeer.setQuorumPeers(config.getServers());
quorumPeer.setElectionType(config.getElectionAlg());
//本机的集群编号
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
//投票决定方式,默认超过半数就通过
quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(config.getPeerType());
//启动主线程
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
QuorumPeer复写Thread.start方法,启动
@Override
public synchronized void start() {
//恢复DB,从zxid中回复epoch变量,代表投票轮数
loadDataBase();
//启动针对client的IO线程
cnxnFactory.start();
//选举初始化,主要是从配置获取选举类型
startLeaderElection();
//启动
super.start();
}
loadDataBase过程,恢复epoch数
private void loadDataBase() {
try {
//从本地文件恢复db
zkDb.loadDataBase();
// load the epochs
//从最新的zxid恢复epoch变量,zxid64位,前32位是epoch值,后32位是zxid
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZxid;
LOG.info(CURRENT_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
currentEpoch);
writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
}
if (epochOfZxid > currentEpoch) {
throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
}
.......
}
选举初始化
synchronized public void startLeaderElection() {
try {
//先投自己
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
//从配置中拿自己的选举地址
for (QuorumServer p : getView().values()) {
if (p.id == myid) {
myQuorumAddr = p.addr;
break;
}
}
......
//根据配置,获取选举算法
this.electionAlg = createElectionAlgorithm(electionType);
}
获取选举算法,默认为FastLeaderElection算法
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
//leader选举IO负责类
qcm = new QuorumCnxManager(this);
QuorumCnxManager.Listener listener = qcm.listener;
//启动已绑定3888端口的选举线程,等待集群其他机器连接
if(listener != null){
listener.start();
//基于TCP的选举算法
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
FastLeaderElection初始化
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
//业务层发送队列,业务对象ToSend
sendqueue = new LinkedBlockingQueue<ToSend>();
//业务层接受队列,业务对象Notificataion
recvqueue = new LinkedBlockingQueue<Notification>();
//
this.messenger = new Messenger(manager);
}
essenger(QuorumCnxManager manager) {
//启动业务层发送线程,将消息发给IO负责类QuorumCnxManager
this.ws = new WorkerSender(manager);
Thread t = new Thread(this.ws,
"WorkerSender");
t.setDaemon(true);
t.start();
//启动业务层接受线程,从IO负责类QuorumCnxManager接受消息
this.wr = new WorkerReceiver(manager);
t = new Thread(this.wr,
"WorkerReceiver");
t.setDaemon(true);
t.start();
}
QuorumPeer线程启动
run(){
.......
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
//如果状态是LOOKING,则进入选举流程
case LOOKING:
LOG.info("LOOKING");
......
try {
//选举算法开始选举,主线程可能在这里耗比较长时间
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
//其他流程处理
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
setPeerState(ServerState.LOOKING);
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
}
.......
}
进入选举流程
public Vote lookForLeader() throws InterruptedException {
......
try {
//收到的投票
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
logicalclock++;
//先投给自己
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id =" + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
//发送投票,包括发给自己
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
//主循环,直到选出leader
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
//从IO线程里拿到投票消息,自己的投票也在这里处理
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
//如果空闲
if(n == null){
//消息发完了,继续发送,一直到选出leader为止
if(manager.haveDelivered()){
sendNotifications();
} else {
//消息还在,可能其他server还没启动,尝试连接
manager.connectAll();
}
/*
* Exponential backoff
*/
//延长超时时间
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
//收到了投票消息
else if(self.getVotingView().containsKey(n.sid)) {
/*
* Only proceed if the vote comes from a replica in the
* voting view.
*/
switch (n.state) {
//LOOKING消息,则
case LOOKING:
......
//检查下收到的这张选票是否可以胜出,依次比较选举轮数epoch,事务zxid,服务器编号server id
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
//胜出了,就把自己的投票修改为对方的,然后广播消息
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
......
//添加到本机投票集合,用来做选举终结判断
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//选举是否结束,默认算法是超过半数server同意
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock, proposedEpoch))) {
......
//修改状态,LEADING or FOLLOWING
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
//返回最终的选票结果
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
//如果收到的选票状态不是LOOKING,比如这台机器刚加入一个已经服务的zk集群时
//OBSERVING机器不参数选举
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
//这2种需要参与选举
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock){
//同样需要加入到本机的投票集合
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//投票是否结束,如果结束,再确认LEADER是否有效
//如果结束,修改自己的状态并返回投票结果
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/**
* Before joining an established ensemble, verify that
* a majority are following the same leader.
*/
outofelection.put(n.sid, new Vote(n.leader, n.zxid,
n.electionEpoch, n.peerEpoch, n.state));
......
break;
default:
选举消息发送
private void sendNotifications() {
//循环发送
for (QuorumServer server : self.getVotingView().values()) {
long sid = server.id;
//消息实体
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock,
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch);
......
//添加到业务的发送队列,该队列会被WorkerSender消费
sendqueue.offer(notmsg);
}
}
WorkerSender消费
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
private void process(ToSend m) {
byte requestBytes[] = new byte;
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
/*
* Building notification packet to send
*/
requestBuffer.clear();
requestBuffer.putInt(m.state.ordinal());
requestBuffer.putLong(m.leader);
requestBuffer.putLong(m.zxid);
requestBuffer.putLong(m.electionEpoch);
requestBuffer.putLong(m.peerEpoch);
CnxManager这个IO负责类发送消息
manager.toSend(m.sid, requestBuffer);
}
QuorumCnxManager具体发送
public void toSend(Long sid, ByteBuffer b) {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
//如果是自己,不走网络,直接添加到本地接受队列
if (self.getId() == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the corresponding thread to send.
*/
//否则,先添加到发送队列,然后尝试连接,连接成功则给每台server启动发送和接受线程
} else {
/*
* Start a new connection if doesn't have one already.
*/
if (!queueSendMap.containsKey(sid)) {
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY);
queueSendMap.put(sid, bq);
addToSendQueue(bq, b);
} else {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if(bq != null){
addToSendQueue(bq, b);
} else {
LOG.error("No queue for server " + sid);
}
}
connectOne(sid);
}
}
尝试连接过程
synchronized void connectOne(long sid){
if (senderWorkerMap.get(sid) == null){
.......
//对方的选举地址,3888端口
electionAddr = self.quorumPeers.get(sid).electionAddr;
.......
//同步IO连接
Socket sock = new Socket();
setSockOpts(sock);
sock.connect(self.getView().get(sid).electionAddr, cnxTO);
if (LOG.isDebugEnabled()) {
LOG.debug("Connected to server " + sid);
}
//连上了,初始化IO线程
initiateConnection(sock, sid);
......
}
由于这个时候只有server.1启动,当它尝试去连接其他server时,会报错,选举线程会一直重试。此时,server.1只收到了自己的选票。然后我们启动server.2,server.2也会主动去连接server.1,这个时候server.1h和server.2会相互发起连接,但最终只有有一个连接成功,请看下问。
这个时候被连接的server的Listener选举线程会收到新连接
Listener主循环,接受连接
while (!shutdown) {
Socket client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request "
+ client.getRemoteSocketAddress());
receiveConnection(client);
numRetries = 0;
}
新连接处理
public boolean receiveConnection(Socket sock) {
Long sid = null;
try {
// Read server id
//读server id
DataInputStream din = new DataInputStream(sock.getInputStream());
sid = din.readLong();
......
//If wins the challenge, then close the new connection.
//如果对方id比我小,则关闭连接,只允许大id的server连接小id的server
if (sid < self.getId()) {
/*
* This replica might still believe that the connection to sid is
* up, so we have to shut down the workers before trying to open a
* new connection.
*/
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
/*
* Now we start a new connection
*/
LOG.debug("Create new connection to server: " + sid);
closeSocket(sock);
connectOne(sid);
// Otherwise start worker threads to receive data.
}
//如果对方id比我大,允许连接,并初始化单独的IO线程
else {
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
if (!queueSendMap.containsKey(sid)) {
queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));
}
sw.start();
rw.start();
return true;
}
return false;
}
连上后,自己server的IO线程初始化
public boolean initiateConnection(Socket sock, Long sid) {
DataOutputStream dout = null;
try {
// Sending id and challenge
//先发一个server id
dout = new DataOutputStream(sock.getOutputStream());
dout.writeLong(self.getId());
dout.flush();
}
......
// If lost the challenge, then drop the new connection
//如果对方id比自己大,则关闭连接,这样导致的结果就是大id的server才会去连接小id的server,避免连接浪费
if (sid > self.getId()) {
LOG.info("Have smaller server identifier, so dropping the " +
"connection: (" + sid + ", " + self.getId() + ")");
closeSocket(sock);
// Otherwise proceed with the connection
}
//如果对方id比自己小,则保持连接,并初始化单独的发送和接受线程
else {
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
if (!queueSendMap.containsKey(sid)) {
queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));
}
sw.start();
rw.start();
return true;
}
return false;
}
通过以上的连接处理,每2台选举机器之间只会建立一个选举连接。
IO发送线程SendWorker启动,开始发送选举消息
try {
while (running && !shutdown && sock != null) {
ByteBuffer b = null;
try {
//每个server一个发送队列
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
.get(sid);
if (bq != null) {
//拿消息
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for " +
"server " + sid);
break;
}
if(b != null){
//发消息
lastMessageSent.put(sid, b);
send(b);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue",
e);
}
}
} catch (Exception e) {
LOG.warn("Exception when using channel: for id " + sid + " my id = " +
self.getId() + " error = " + e);
}
this.finish();
......
这个时候,其他机器通过IO线程RecvWorker收到消息
public void run() {
threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
/**
* Reads the first int to determine the length of the
* message
*/
//包的长度
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException(
"Received packet with invalid packet: "
+ length);
}
/**
* Allocates a new ByteBuffer to receive the message
*/
//读到内存
byte[] msgArray = new byte;
din.readFully(msgArray, 0, length);
ByteBuffer message = ByteBuffer.wrap(msgArray);
//添加到接收队列,后续业务层的接收线程WorkerReceiver会来拿消息
addToRecvQueue(new Message(message.duplicate(), sid));
}
......
}
业务层的接受线程WorkerReceiver拿消息
public void run() {
Message response;
while (!stop) {
// Sleeps on receive
try{
//从IO线程拿数据
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
/*
* If it is from an observer, respond right away.
* Note that the following predicate assumes that
* if a server is not a follower, then it must be
* an observer. If we ever have any other type of
* learner in the future, we'll have to change the
* way we check for observers.
*/
//如果是Observer,则返回当前选举结果
if(!self.getVotingView().containsKey(response.sid)){
Vote current = self.getCurrentVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock,
self.getPeerState(),
response.sid,
current.getPeerEpoch());
sendqueue.offer(notmsg);
}
else {
// Receive new message
.......
// State of peer that sent this message
//对方节点状态
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (response.buffer.getInt()) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
}
// Instantiate Notification and set its attributes
//初始化Notification对象
Notification n = new Notification();
n.leader = response.buffer.getLong();
n.zxid = response.buffer.getLong();
n.electionEpoch = response.buffer.getLong();
n.state = ackstate;
n.sid = response.sid;
......
/*
* If this server is looking, then send proposed leader
*/
//如果自己也在LOOKING,则放入业务接收队列,选举主线程会消费该消息
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
recvqueue.offer(n);
......
}
//如果自己不在选举中,而对方server在LOOKING中,则向其发送当前的选举结果,当有server加入一个essemble时有用
else {
/*
* If this server is not looking, but the one that sent the ack
* is looking, then send back what it believes to be the leader.
*/
Vote current = self.getCurrentVote();
if(ackstate == QuorumPeer.ServerState.LOOKING){
if(LOG.isDebugEnabled()){
LOG.debug("Sending new notification. My id =" +
self.getId() + " recipient=" +
response.sid + " zxid=0x" +
Long.toHexString(current.getZxid()) +
" leader=" + current.getId());
}
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock,
self.getPeerState(),
response.sid,
current.getPeerEpoch());
sendqueue.offer(notmsg);
}
}
.......
}
由于整个集群只有3台机器,所以server.1和server.2启动后,即可选举出Leader。后续Leader和Follower开始数据交互,请看后文。
Leader选举小结
1.server启动时默认选举自己,并向整个集群广播
2.收到消息时,通过3层判断:选举轮数,zxid,server id大小判断是否同意对方,如果同意,则修改自己的选票,并向集群广播
3.QuorumCnxManager负责IO处理,每2个server建立一个连接,只允许id大的server连id小的server,每个server启动单独的读写线程处理,使用阻塞IO
4.默认超过半数机器同意时,则选举成功,修改自身状态为LEADING或FOLLOWING
5.Obserer机器不参与选举
相关文章推荐:
Zookeeper源码分析之一Server启动
Zookeeper源码分析之二Session建立
Zookeeper源码分析之三Exists请求和处理
深入浅出Zookeeper之四Create请求和处理
Zookeeper源码分析之六 Leader/Follower初始化
Zookeeper源码分析之七分布式CREATE事务处理
页:
[1]