分享

Zookeeper源码分析之二Session建立

desehawk 2014-11-25 17:51:52 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 72977
本帖最后由 desehawk 于 2014-11-25 22:22 编辑

问题导读


1.Session建立核心流程的步骤是什么?
2.CREATE_SESSION的作用是什么?







上一篇,小编给大家介绍了zookeeper server端的启动。这一篇将来说一下client和server端是如何建立session的。通过官网的DataMonitor例子来说明。通过Session建立这个例子,可以大概知道client端和server端是如何处理请求的,之间是如何通信的。
官网Datamonitor的代码:
Executor
  1. public class Executor implements Watcher, Runnable,  
  2.         DataMonitor.DataMonitorListener {  
  3.     String znode;  
  4.   
  5.     DataMonitor dm;  
  6.   
  7.     ZooKeeper zk;  
  8.   
  9.     String filename;  
  10.   
  11.     String exec[];  
  12.   
  13.     Process child;  
  14.   
  15.         //Executor是一个watcher,不过其处理都代理给DataMonitor了  
  16.     public Executor(String hostPort, String znode, String filename,  
  17.             String exec[]) throws KeeperException, IOException {  
  18.         this.filename = filename;  
  19.         this.exec = exec;  
  20.                 //初始化zookeeper的client,这一步会建立连接,创建session,启动client端的SendThread线程,当然都是异步的  
  21.         zk = new ZooKeeper(hostPort, 3000, this);  
  22.                 //datamonitor是真实的处理类  
  23.         dm = new DataMonitor(zk, znode, null, this);  
  24.     }  
复制代码



DataMonitor
  1. public class DataMonitor implements Watcher, StatCallback {  
  2.   
  3. .......  
  4.   
  5.     public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,  
  6.             DataMonitorListener listener) {  
  7. ......  
  8.         // Get things started by checking if the node exists. We are going  
  9.         // to be completely event driven,异步exist,注册watcher,设置回调  
  10.         zk.exists(znode, true, this, null);  
  11.     }  
  12.   
  13. ......  
  14.     //处理watcher通知事件  
  15.     public void process(WatchedEvent event) {  
  16.         String path = event.getPath();  
  17.         //如果exist操作的对应的事件触发(create.delete,setdata),则再次注册watcher(watcher是单次的),业务处理都在回调里处理  
  18.         } else {  
  19.             if (path != null && path.equals(znode)) {  
  20.                 // Something has changed on the node, let's find out  
  21.                 zk.exists(znode, true, this, null);  
  22.             }  
  23.         }  
  24.         if (chainedWatcher != null) {  
  25.             chainedWatcher.process(event);  
  26.         }  
  27.     }  
  28.     //处理exist操作的回掉结果  
  29.     public void processResult(int rc, String path, Object ctx, Stat stat) {  
  30.         boolean exists;  
  31.         switch (rc) {  
  32.         case Code.Ok:  
  33.             exists = true;  
  34.             break;  
  35.         case Code.NoNode:  
  36.             exists = false;  
  37.             break;  
  38.         case Code.SessionExpired:  
  39.         case Code.NoAuth:  
  40.             dead = true;  
  41.             listener.closing(rc);  
  42.             return;  
  43.         default:  
  44.             // Retry errors  
  45.             zk.exists(znode, true, this, null);  
  46.             return;  
  47.         }  
  48.         //如果节点存在,则同步获取节点数据  
  49.         byte b[] = null;  
  50.         if (exists) {  
  51.             try {  
  52.                 b = zk.getData(znode, false, null);  
  53.             } catch (KeeperException e) {  
  54.                 // We don't need to worry about recovering now. The watch  
  55.                 // callbacks will kick off any exception handling  
  56.                 e.printStackTrace();  
  57.             } catch (InterruptedException e) {  
  58.                 return;  
  59.             }  
  60.         }  
  61.         //如果数据有变化,则处理之  
  62.         if ((b == null && b != prevData)  
  63.                 || (b != null && !Arrays.equals(prevData, b))) {  
  64.             listener.exists(b);  
  65.             prevData = b;  
  66.         }  
  67.     }  
  68. }  
复制代码




  从这个例子出发,我们来分析下zookeeper的第一步session是如何建立的,主要就是Zookeeper类的构造。
Zookeeper构造
  1.   public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,  
  2.            boolean canBeReadOnly)  
  3.        throws IOException  
  4.    {  
  5.        LOG.info("Initiating client connection, connectString=" + connectString  
  6.                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);  
  7. //设置默认watcher  
  8.        watchManager.defaultWatcher = watcher;  
  9.   
  10.        ConnectStringParser connectStringParser = new ConnectStringParser(  
  11.                connectString);  
  12. //从配置的serverList,解析成serverAddresses,这里做了shuffle,server顺序被打乱了  
  13.        HostProvider hostProvider = new StaticHostProvider(  
  14.                connectStringParser.getServerAddresses());  
  15. //创建客户端连接,初始化SendThread和EventThread  
  16.        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),  
  17.                hostProvider, sessionTimeout, this, watchManager,  
  18.                getClientCnxnSocket(), canBeReadOnly);  
  19. //启动SendThread和EventThread  
  20.        cnxn.start();  
  21.    }  
复制代码


初始化连接
  1. public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,  
  2.             ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,  
  3.             long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {  
  4.         this.zooKeeper = zooKeeper;  
  5.         this.watcher = watcher;  
  6.     //客户端sessionId  
  7.         this.sessionId = sessionId;  
  8.         this.sessionPasswd = sessionPasswd;  
  9.     //客户端设置的超时时间  
  10.         this.sessionTimeout = sessionTimeout;  
  11.     //主机列表  
  12.         this.hostProvider = hostProvider;  
  13.         this.chrootPath = chrootPath;  
  14.     //连接超时  
  15.         connectTimeout = sessionTimeout / hostProvider.size();  
  16.     //读超时  
  17.         readTimeout = sessionTimeout * 2 / 3;  
  18.         readOnly = canBeReadOnly;  
  19.     //初始化client2个核心线程,SendThread是client的IO核心线程,EventThread从SendThread里拿到event,调用对应watcher  
  20.         sendThread = new SendThread(clientCnxnSocket);  
  21.         eventThread = new EventThread();  
  22.   
  23.     }  
复制代码


SendThread核心流程
  1. public void run() {  
  2.             .....  
  3.             while (state.isAlive()) {  
  4.                 try {  
  5.             //如果还没连上,则启动连接过程,这个方法有歧义,其实现是判断sockkey是否已注册,可能此时连接上server  
  6.                     if (!clientCnxnSocket.isConnected()) {  
  7.                         ......  
  8.             //异步连接  
  9.                         startConnect();  
  10.                         clientCnxnSocket.updateLastSendAndHeard();  
  11.                     }  
  12.             //如果状态为连接上,则真的是连上server了  
  13.                     if (state.isConnected()) {  
  14.                         ......  
  15.             //下一次select超时时间  
  16.                         to = readTimeout - clientCnxnSocket.getIdleRecv();  
  17.                     } else {  
  18.             //如果没连上,则递减连接超时  
  19.                         to = connectTimeout - clientCnxnSocket.getIdleRecv();  
  20.                     }  
  21.                     //session超时,包括连接超时  
  22.                     if (to <= 0) {  
  23.                         throw new SessionTimeoutException(  
  24.                                 "Client session timed out, have not heard from server in "  
  25.                                         + clientCnxnSocket.getIdleRecv() + "ms"  
  26.                                         + " for sessionid 0x"  
  27.                                         + Long.toHexString(sessionId));  
  28.                     }  
  29.             //如果send空闲,则发送心跳包  
  30.                     if (state.isConnected()) {  
  31.                         int timeToNextPing = readTimeout / 2  
  32.                                 - clientCnxnSocket.getIdleSend();  
  33.                         if (timeToNextPing <= 0) {  
  34.                             sendPing();  
  35.                             clientCnxnSocket.updateLastSend();  
  36.                         } else {  
  37.                             if (timeToNextPing < to) {  
  38.                                 to = timeToNextPing;  
  39.                             }  
  40.                         }  
  41.                     }  
  42.   
  43.                     // If we are in read-only mode, seek for read/write server  
  44.             //如果是只读模式,则寻找R/W server,如果找到,则清理之前的连接,并重新连接到R/W server  
  45.                     if (state == States.CONNECTEDREADONLY) {  
  46.                         long now = System.currentTimeMillis();  
  47.                         int idlePingRwServer = (int) (now - lastPingRwServer);  
  48.                         if (idlePingRwServer >= pingRwTimeout) {  
  49.                             lastPingRwServer = now;  
  50.                             idlePingRwServer = 0;  
  51.                             pingRwTimeout =  
  52.                                 Math.min(2*pingRwTimeout, maxPingRwTimeout);  
  53.                 //同步测试下个server是否是R/W server,如果是则抛出RWServerFoundException  
  54.                             pingRwServer();  
  55.                         }  
  56.                         to = Math.min(to, pingRwTimeout - idlePingRwServer);  
  57.                     }  
  58.             //处理IO  
  59.                     clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);  
  60.                 } catch (Throwable e) {  
  61.                     if (closing) {  
  62.                         if (LOG.isDebugEnabled()) {  
  63.                             // closing so this is expected  
  64.                             LOG.debug("An exception was thrown while closing send thread for session 0x"  
  65.                                     + Long.toHexString(getSessionId())  
  66.                                     + " : " + e.getMessage());  
  67.                         }  
  68.                         break;  
  69.                     } else {  
  70.                         // this is ugly, you have a better way speak up  
  71.                         if (e instanceof SessionExpiredException) {  
  72.                             LOG.info(e.getMessage() + ", closing socket connection");  
  73.                         } else if (e instanceof SessionTimeoutException) {  
  74.                             LOG.info(e.getMessage() + RETRY_CONN_MSG);  
  75.                         } else if (e instanceof EndOfStreamException) {  
  76.                             LOG.info(e.getMessage() + RETRY_CONN_MSG);  
  77.                         } else if (e instanceof RWServerFoundException) {  
  78.                             LOG.info(e.getMessage());  
  79.                         } else {  
  80.                 ......  
  81.                         }  
  82.             //清理之前的连接,找下一台server连接  
  83.                         cleanup();  
  84.                         if (state.isAlive()) {  
  85.                             eventThread.queueEvent(new WatchedEvent(  
  86.                                     Event.EventType.None,  
  87.                                     Event.KeeperState.Disconnected,  
  88.                                     null));  
  89.                         }  
  90.                         clientCnxnSocket.updateNow();  
  91.                         clientCnxnSocket.updateLastSendAndHeard();  
  92.                     }  
  93.                 }  
  94.             }  
  95.      ......  
  96.         }  
复制代码


具体过程
  1. private void startConnect() throws IOException {  
  2.         //状态改为CONNETING  
  3.             state = States.CONNECTING;  
  4.         //拿目标地址  
  5.             InetSocketAddress addr;  
  6.             if (rwServerAddress != null) {  
  7.                 addr = rwServerAddress;  
  8.                 rwServerAddress = null;  
  9.             } else {  
  10.                 addr = hostProvider.next(1000);  
  11.             }  
  12.   
  13.             setName(getName().replaceAll("\\(.*\\)",  
  14.                     "(" + addr.getHostName() + ":" + addr.getPort() + ")"));  
  15.         ......  
  16.         //异步连接  
  17.             clientCnxnSocket.connect(addr);  
  18.         }  
复制代码




具体connect
  1.    void connect(InetSocketAddress addr) throws IOException {  
  2. //创建客户端SocketChannel  
  3.        SocketChannel sock = createSock();  
  4.        try {  
  5.     //注册OP_CONNECT事件,尝试连接  
  6.           registerAndConnect(sock, addr);  
  7.        } catch (IOException e) {  
  8.            LOG.error("Unable to open socket to " + addr);  
  9.            sock.close();  
  10.            throw e;  
  11.        }  
  12. //session还未初始化  
  13.        initialized = false;  
  14.   
  15.        /*
  16.         * Reset incomingBuffer
  17.         */  
  18. //重置2个读buffer,准备下一次读  
  19.        lenBuffer.clear();  
  20.        incomingBuffer = lenBuffer;  
  21.    }  
复制代码




registerAndConnect过程:

  1. void registerAndConnect(SocketChannel sock, InetSocketAddress addr)   
  2. throws IOException {  
  3.     sockKey = sock.register(selector, SelectionKey.OP_CONNECT);  
  4. 试连接  
  5.     boolean immediateConnect = sock.connect(addr);  
  6. 果网络情况很好,立马可以连上,则发送ConnectRequest请求,请求和server建立session  
  7.     if (immediateConnect) {  
  8.         sendThread.primeConnection();  
  9.     }  
  10. }  
复制代码


primeConnection代表连上之后的操作,主要是建立session:

  1. void primeConnection() throws IOException {  
  2.             ......  
  3.         //客户端sessionId默认为0  
  4.             long sessId = (seenRwServerBefore) ? sessionId : 0;  
  5.         //构造连接请求  
  6.             ConnectRequest conReq = new ConnectRequest(0, lastZxid,  
  7.                     sessionTimeout, sessId, sessionPasswd);  
  8.             synchronized (outgoingQueue) {  
  9.                 ......  
  10.         //组合成通讯层的Packet对象,添加到发送队列,对于ConnectRequest其requestHeader为null  
  11.                 outgoingQueue.addFirst(new Packet(null, null, conReq,  
  12.                             null, null, readOnly));  
  13.             }  
  14.         //确保读写事件都监听  
  15.             clientCnxnSocket.enableReadWriteOnly();  
  16.             .....  
  17.         }  
复制代码



此时ConnectRequest请求已经添加到发送队列,SendThread进入doTransport处理流程:

  1. void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,  
  2.                     ClientCnxn cnxn)  
  3.            throws IOException, InterruptedException {  
  4. //select  
  5.        selector.select(waitTimeOut);  
  6.        Set<SelectionKey> selected;  
  7.        synchronized (this) {  
  8.            selected = selector.selectedKeys();  
  9.        }  
  10.        // Everything below and until we get back to the select is  
  11.        // non blocking, so time is effectively a constant. That is  
  12.        // Why we just have to do this once, here  
  13.        updateNow();  
  14.        for (SelectionKey k : selected) {  
  15.            SocketChannel sc = ((SocketChannel) k.channel());  
  16.     //如果之前连接没有立马连上,则在这里处理OP_CONNECT事件  
  17.            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {  
  18.                if (sc.finishConnect()) {  
  19.                    updateLastSendAndHeard();  
  20.                    sendThread.primeConnection();  
  21.                }  
  22.            }   
  23. //如果读写就位,则处理之  
  24. else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {  
  25.                doIO(pendingQueue, outgoingQueue, cnxn);  
  26.            }  
  27.        }  
  28.        if (sendThread.getZkState().isConnected()) {  
  29.            synchronized(outgoingQueue) {  
  30.                if (findSendablePacket(outgoingQueue,  
  31.                        cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {  
  32.                    enableWrite();  
  33.                }  
  34.            }  
  35.        }  
  36.        selected.clear();  
  37.    }  
复制代码


假设我们此时连接已经好了,WRITE事件ok,则SendThread开始发送我们的ConnectRequest

  1. if (sockKey.isWritable()) {  
  2.         //同步处理  
  3.             synchronized(outgoingQueue) {  
  4.         //从发送队列中拿请求  
  5.                 Packet p = findSendablePacket(outgoingQueue,  
  6.                         cnxn.sendThread.clientTunneledAuthenticationInProgress());  
  7.   
  8.                 if (p != null) {  
  9.             //修改上次发送时间  
  10.                     updateLastSend();  
  11.                     // If we already started writing p, p.bb will already exist  
  12.         //序列化Packet到ByteBuffer  
  13.                     if (p.bb == null) {  
  14.             //如果是业务请求,则需要设置事务Id  
  15.                         if ((p.requestHeader != null) &&  
  16.                                 (p.requestHeader.getType() != OpCode.ping) &&  
  17.                                 (p.requestHeader.getType() != OpCode.auth)) {  
  18.                             p.requestHeader.setXid(cnxn.getXid());  
  19.                         }  
  20.             //序列化  
  21.                         p.createBB();  
  22.                     }  
  23.             //写数据  
  24.                     sock.write(p.bb);  
  25.             //写完了,太好了,发送成功  
  26.                     if (!p.bb.hasRemaining()) {  
  27.             //已发送的业务Packet数量  
  28.                         sentCount++;  
  29.             //发送完了,那从发送队列删掉,方便后续发送请求处理  
  30.                         outgoingQueue.removeFirstOccurrence(p);  
  31.             //如果是业务请求,则添加到Pending队列,方便对server端返回做相应处理,如果是其他请求,发完就扔了。。。  
  32.                         if (p.requestHeader != null  
  33.                                 && p.requestHeader.getType() != OpCode.ping  
  34.                                 && p.requestHeader.getType() != OpCode.auth) {  
  35.                             synchronized (pendingQueue) {  
  36.                                 pendingQueue.add(p);  
  37.                             }  
  38.                         }  
  39.                     }  
  40.                 }  
  41.         //请求发完了,不需要再监听OS的写事件了,如果没发完,那还是要继续监听的,继续写嘛  
  42.                 if (outgoingQueue.isEmpty()) {  
  43.                     // No more packets to send: turn off write interest flag.  
  44.                     // Will be turned on later by a later call to enableWrite(),  
  45.                     // from within ZooKeeperSaslClient (if client is configured  
  46.                     // to attempt SASL authentication), or in either doIO() or  
  47.                     // in doTransport() if not.  
  48.                     disableWrite();  
  49.                 } else {  
  50.                     // Just in case  
  51.                     enableWrite();  
  52.                 }  
  53.             }  
  54.         }  
复制代码



具体序列化方式,ConnRequest的packet没有协议头
  1. public void createBB() {  
  2.            try {  
  3.                ByteArrayOutputStream baos = new ByteArrayOutputStream();  
  4.                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);  
  5.     //写一个int,站位用,整个packet写完会来更新这个值,代表packet的从长度,4个字节  
  6.                boa.writeInt(-1, "len"); // We'll fill this in later  
  7.     //序列化协议头  
  8.                if (requestHeader != null) {  
  9.                    requestHeader.serialize(boa, "header");  
  10.                }  
  11.     //序列化协议体  
  12.                if (request instanceof ConnectRequest) {  
  13.                    request.serialize(boa, "connect");  
  14.                    // append "am-I-allowed-to-be-readonly" flag  
  15.                    boa.writeBool(readOnly, "readOnly");  
  16.                } else if (request != null) {  
  17.                    request.serialize(boa, "request");  
  18.                }  
  19.                baos.close();  
  20.     //生成ByteBuffer  
  21.                this.bb = ByteBuffer.wrap(baos.toByteArray());  
  22.     //将bytebuffer的前4个字节修改成真正的长度,总长度减去一个int的长度头  
  23.                this.bb.putInt(this.bb.capacity() - 4);  
  24.     //准备给后续读  
  25.                this.bb.rewind();  
  26.            } catch (IOException e) {  
  27.                LOG.warn("Ignoring unexpected exception", e);  
  28.            }  
  29.        }  
复制代码




这里我们的第一个Packet是ConnReq,它构造的packet没有header,所以发完就直接丢掉了,但是SendThread还需要监听server端的返回,以确认连上,并进行session的初始化。那到这里client端等待server端返回了,我们看看server是怎么处理ConnReq请求的。


假设server的selector线程已经就位,则selector会拿到一个读就位的事件,也就是client的connReq请求


  1. else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {  
  2.                         NIOServerCnxn c = (NIOServerCnxn) k.attachment();  
  3.                         c.doIO(k);  
复制代码



   if (k.isReadable()) {
  1. //先从Channel读4个字节,代表头  
  2.               int rc = sock.read(incomingBuffer);  
  3.               if (rc < 0) {  
  4.                   throw new EndOfStreamException(  
  5.                           "Unable to read additional data from client sessionid 0x"  
  6.                           + Long.toHexString(sessionId)  
  7.                           + ", likely client has closed socket");  
  8.               }  
  9. //int读好,继续往下读  
  10.               if (incomingBuffer.remaining() == 0) {  
  11.                   boolean isPayload;  
  12.     //2个一样,就可以继续读下一个请求了  
  13.                   if (incomingBuffer == lenBuffer) { // start of next request  
  14.                       incomingBuffer.flip();  
  15.     //给incomingBuffer分配一个length长度的内存,将后续的数据都给读进来  
  16.                       isPayload = readLength(k);  
  17.     //clear一下,准备写  
  18.                       incomingBuffer.clear();  
  19.                   } else {  
  20.                       // continuation  
  21.                       isPayload = true;  
  22.                   }  
  23.     //好,读后续数据  
  24.                   if (isPayload) { // not the case for 4letterword  
  25.                       readPayload();  
  26.                   }  
  27.                   else {  
  28.                       // four letter words take care  
  29.                       // need not do anything else  
  30.                       return;  
  31.                   }  
  32.               }  
  33.           }  
复制代码





具体的后续数据流程:

  1. /** Read the request payload (everything following the length prefix) */  
  2.     private void readPayload() throws IOException, InterruptedException {  
  3.         if (incomingBuffer.remaining() != 0) { // have we read length bytes?  
  4.         //尝试一次读进来  
  5.             int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok  
  6.             if (rc < 0) {  
  7.                 throw new EndOfStreamException(  
  8.                         "Unable to read additional data from client sessionid 0x"  
  9.                         + Long.toHexString(sessionId)  
  10.                         + ", likely client has closed socket");  
  11.             }  
  12.         }  
  13.     //哈哈,一次读完  
  14.         if (incomingBuffer.remaining() == 0) { // have we read length bytes?  
  15.         //server的packet统计  
  16.             packetReceived();  
  17.         //准备使用这个buffer了  
  18.             incomingBuffer.flip();  
  19.         //嘿嘿,如果CoonectRequst还没来,那第一个packet肯定是他了  
  20.             if (!initialized) {  
  21.                 readConnectRequest();  
  22.             }   
  23.         //处理请他请求  
  24.         else {  
  25.                 readRequest();  
  26.             }  
  27.         //清理现场,为下一个packet读做准备  
  28.             lenBuffer.clear();  
  29.             incomingBuffer = lenBuffer;  
  30.         }  
  31.     }  
复制代码





我们现在发的ConnReq已经被server端接受了,处理如下

  1. private void readConnectRequest() throws IOException, InterruptedException {  
  2.        if (zkServer == null) {  
  3.            throw new IOException("ZooKeeperServer not running");  
  4.        }  
  5. //开始执行ConnectRequest的处理链  
  6.        zkServer.processConnectRequest(this, incomingBuffer);  
  7. //处理完了,说明业务连接已经建立好了  
  8.        initialized = true;  
  9.    }  
复制代码





具体处理:
  1. public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {  
  2.     //ConnectReq的packet是没有header的,所以直接读内容,反序列化  
  3.         BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));  
  4.         ConnectRequest connReq = new ConnectRequest();  
  5.         connReq.deserialize(bia, "connect");  
  6.         ...  
  7.         boolean readOnly = false;  
  8.         try {  
  9.         //是否readOnly  
  10.             readOnly = bia.readBool("readOnly");  
  11.             cnxn.isOldClient = false;  
  12.         } catch (IOException e) {  
  13.           ....  
  14.         }  
  15.         ...  
  16.         //设置客户端请求的session相关参数  
  17.         int sessionTimeout = connReq.getTimeOut();  
  18.         byte passwd[] = connReq.getPasswd();  
  19.         int minSessionTimeout = getMinSessionTimeout();  
  20.         if (sessionTimeout < minSessionTimeout) {  
  21.             sessionTimeout = minSessionTimeout;  
  22.         }  
  23.         int maxSessionTimeout = getMaxSessionTimeout();  
  24.         if (sessionTimeout > maxSessionTimeout) {  
  25.             sessionTimeout = maxSessionTimeout;  
  26.         }  
  27.         cnxn.setSessionTimeout(sessionTimeout);  
  28.         // We don't want to receive any packets until we are sure that the  
  29.         // session is setup  
  30.     //暂时先不读后续请求了,直到session建立  
  31.         cnxn.disableRecv();  
  32.     //拿客户端的sessionId  
  33.         long sessionId = connReq.getSessionId();  
  34.     //重试  
  35.         if (sessionId != 0) {  
  36.             long clientSessionId = connReq.getSessionId();  
  37.             LOG.info("Client attempting to renew session 0x"  
  38.                     + Long.toHexString(clientSessionId)  
  39.                     + " at " + cnxn.getRemoteSocketAddress());  
  40.             serverCnxnFactory.closeSession(sessionId);  
  41.             cnxn.setSessionId(sessionId);  
  42.             reopenSession(cnxn, sessionId, passwd, sessionTimeout);  
  43.         } else {  
  44.             LOG.info("Client attempting to establish new session at "  
  45.                     + cnxn.getRemoteSocketAddress());  
  46.         //创建新Session  
  47.             createSession(cnxn, passwd, sessionTimeout);  
  48.         }  
  49.     }  
复制代码




创建新session如下:
  1.    long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {  
  2. //server端创建session,sessionId自增  
  3.        long sessionId = sessionTracker.createSession(timeout);  
  4. //随机密码  
  5.        Random r = new Random(sessionId ^ superSecret);  
  6.        r.nextBytes(passwd);  
  7.        ByteBuffer to = ByteBuffer.allocate(4);  
  8.        to.putInt(timeout);  
  9. //每个server端连接都有一个唯一的SessionId  
  10.        cnxn.setSessionId(sessionId);  
  11. //提交请求给后面的执行链  
  12.        submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);  
  13.        return sessionId;  
  14.    }  
复制代码




提交过程:
  1. private void submitRequest(ServerCnxn cnxn, long sessionId, int type,  
  2.             int xid, ByteBuffer bb, List<Id> authInfo) {  
  3.         Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);  
  4.         submitRequest(si);  
  5.     }  
复制代码



  Server端开始执行链,参数是内部的Request对象,此时type是CREATE_SESSION:
  1. public void submitRequest(Request si) {  
  2.        ......  
  3.         try {  
  4.             touch(si.cnxn);  
  5.             boolean validpacket = Request.isValid(si.type);  
  6.             if (validpacket) {  
  7.         //提交给后续的processor执行,一般用异步以提升性能  
  8.                 firstProcessor.processRequest(si);  
  9.                 if (si.cnxn != null) {  
  10.                     incInProcess();  
  11.                 }  
  12.        ......  
  13.     }  
复制代码




第一个processor PrepRequestProcessor执行:

  1. public void run() {  
  2.         try {  
  3.             while (true) {  
  4.                 Request request = submittedRequests.take();  
  5.                 ......  
  6.                 pRequest(request);  
  7.             }  
  8.       ......  
  9.     }  
复制代码



对于CREATE_SESSION具体处理:
  1. //create/close session don't require request record  
  2.             case OpCode.createSession:  
  3.             case OpCode.closeSession:  
  4.         //在这里,组装了Request的header和txh实现,方便后续processor处理  
  5.                 pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);  
  6.                 break;  
  7.         ......  
  8.     request.zxid = zks.getZxid();  
  9.     //让后续processor处理,这里一般是异步以提高性能  
  10.         nextProcessor.processRequest(request);  
复制代码
  1. case OpCode.createSession:  
  2.     //读session超时值  
  3.                request.request.rewind();  
  4.                int to = request.request.getInt();  
  5.     //组装具体的Record实现,这里是CreateSessionTxn,方便后续processor处理  
  6.                request.txn = new CreateSessionTxn(to);  
  7.                request.request.rewind();  
  8.                zks.sessionTracker.addSession(request.sessionId, to);  
  9.                zks.setOwner(request.sessionId, request.getOwner());  
  10.                break;  
复制代码




从上可见,PrepRequestProcessor主要是负责组装Request的header和txn参数的,相当于是预处理
第二个Processor SyncRequestProcessor处理:
  1. int randRoll = r.nextInt(snapCount/2);  
  2.             while (true) {  
  3.                 Request si = null;  
  4.         //flush队列如果为空,阻塞等待,代表之前的请求都被处理了  
  5.                 if (toFlush.isEmpty()) {  
  6.                     si = queuedRequests.take();  
  7.                 }   
  8.         //如果不为空,就是说还有请求等待处理,先非阻塞拿一下,如果系统压力小,正好没有请求进来,则处理之前积压的请求  
  9.         //如果系统压力大,则可能需要flush满1000个才会继续处理  
  10.         else {  
  11.                     si = queuedRequests.poll();  
  12.             //任务queue空闲,处理积压的待flush请求  
  13.                     if (si == null) {  
  14.                         flush(toFlush);  
  15.                         continue;  
  16.                     }  
  17.                 }  
  18.                 if (si == requestOfDeath) {  
  19.                     break;  
  20.                 }  
  21.                 if (si != null) {  
  22.                     // track the number of records written to the log  
  23.             //将Request append到log输出流,先序列化再append,注意此时request还没flush到磁盘,还在内存呢  
  24.                     if (zks.getZKDatabase().append(si)) {  
  25.             //成功计数器  
  26.                         logCount++;  
  27.             //如果成功append的request累计数量大于某个值,则执行flush log的操作  
  28.             //并启动一个线程异步将内存里的Database和session状态写入到snapshot文件,相当于一个checkpoint  
  29.             //snapCount默认是100000  
  30.                         if (logCount > (snapCount / 2 + randRoll)) {  
  31.                             randRoll = r.nextInt(snapCount/2);  
  32.                             // roll the log  
  33.                 //将内存中的log flush到磁盘  
  34.                             zks.getZKDatabase().rollLog();  
  35.                             // take a snapshot  
  36.                 //启动线程异步将内存中的database和sessions状态写入snapshot文件中  
  37.                             if (snapInProcess != null && snapInProcess.isAlive()) {  
  38.                                 LOG.warn("Too busy to snap, skipping");  
  39.                             } else {  
  40.                                 snapInProcess = new Thread("Snapshot Thread") {  
  41.                                         public void run() {  
  42.                                             try {  
  43.                                                 zks.takeSnapshot();  
  44.                                             } catch(Exception e) {  
  45.                                                 LOG.warn("Unexpected exception", e);  
  46.                                             }  
  47.                                         }  
  48.                                     };  
  49.                                 snapInProcess.start();  
  50.                             }  
  51.                             logCount = 0;  
  52.                         }  
  53.                     }  
  54.             //如果是写请求,而且flush队列为空,执行往下执行   
  55.             else if (toFlush.isEmpty()) {  
  56.                         // optimization for read heavy workloads  
  57.                         // iff this is a read, and there are no pending  
  58.                         // flushes (writes), then just pass this to the next  
  59.                         // processor  
  60.                         nextProcessor.processRequest(si);  
  61.                         if (nextProcessor instanceof Flushable) {  
  62.                             ((Flushable)nextProcessor).flush();  
  63.                         }  
  64.                         continue;  
  65.                     }  
  66.             //写请求前面append到log输出流后,在这里加入到flush队列,后续批量处理  
  67.                     toFlush.add(si);  
  68.             //如果系统压力大,可能需要到1000个request才会flush,flush之后可以被后续processor处理  
  69.                     if (toFlush.size() > 1000) {  
  70.                         flush(toFlush);  
  71.                     }  
  72.                 }  
复制代码




具体的flush处理:
  1. private void flush(LinkedList<Request> toFlush)  
  2.         throws IOException, RequestProcessorException  
  3.     {  
  4.         if (toFlush.isEmpty())  
  5.             return;  
  6.     //将之前的append log flush到磁盘,并顺便关闭旧的log文件句柄  
  7.         zks.getZKDatabase().commit();  
  8.     //log flush完后,开始处理flush队列里的Request  
  9.         while (!toFlush.isEmpty()) {  
  10.             Request i = toFlush.remove();  
  11.         //执行后面的processor  
  12.             nextProcessor.processRequest(i);  
  13.         }  
  14.         if (nextProcessor instanceof Flushable) {  
  15.             ((Flushable)nextProcessor).flush();  
  16.         }  
  17.     }  
复制代码




我们假设现在系统压力小,我们的ConnectionRequest可以被立马处理了,执行FinalRequestProcessor:

  1. if (request.hdr != null) {  
  2.                TxnHeader hdr = request.hdr;  
  3.                Record txn = request.txn;  
  4.         //对于事务型请求,处理之  
  5.                rc = zks.processTxn(hdr, txn);  
  6.             }  
复制代码



具体处理:
  1. public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {  
  2.         ProcessTxnResult rc;  
  3.         int opCode = hdr.getType();  
  4.         long sessionId = hdr.getClientId();  
  5.     //进一步调用database来处理事务  
  6.         rc = getZKDatabase().processTxn(hdr, txn);  
  7.     //如果是创建session,添加session  
  8.         if (opCode == OpCode.createSession) {  
  9.             if (txn instanceof CreateSessionTxn) {  
  10.                 CreateSessionTxn cst = (CreateSessionTxn) txn;  
  11.                 sessionTracker.addSession(sessionId, cst  
  12.                         .getTimeOut());  
  13.       ......  
  14.         return rc;  
  15.     }  
复制代码




public ProcessTxnResult processTxn(TxnHeader header, Record txn)
  1.    {  
  2. //在这里构造一个Result对象,返回给FinalRequestProcessor  
  3.        ProcessTxnResult rc = new ProcessTxnResult();  
  4.   
  5.        try {  
  6.            rc.clientId = header.getClientId();  
  7.            rc.cxid = header.getCxid();  
  8.            rc.zxid = header.getZxid();  
  9.            rc.type = header.getType();  
  10.            rc.err = 0;  
  11.            rc.multiResult = null;  
  12. ......  
复制代码





在FinalRequestProcessor拿到database的处理结果,继续处理:
  1. case OpCode.createSession: {  
  2.                 zks.serverStats().updateLatency(request.createTime);  
  3.   
  4.                 lastOp = "SESS";  
  5.                 cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,  
  6.                         request.createTime, System.currentTimeMillis());  
  7.         //在这里写回response  
  8.                 zks.finishSessionInit(request.cnxn, true);  
  9.                 return;  
  10.             }  
复制代码




public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
  1.     ......  
  2. //构造一个返回对象,返回协商的sessionTimeout,唯一的sessionId和client的密码  
  3.           ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()  
  4.                   : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no  
  5.                           // longer valid  
  6.                           valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);  
  7.           ByteArrayOutputStream baos = new ByteArrayOutputStream();  
  8.           BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);  
  9. //用-1占位  
  10.           bos.writeInt(-1, "len");  
  11. //序列化内容  
  12.           rsp.serialize(bos, "connect");  
  13.           if (!cnxn.isOldClient) {  
  14.               bos.writeBool(  
  15.                       this instanceof ReadOnlyZooKeeperServer, "readOnly");  
  16.           }  
  17.           baos.close();  
  18.           ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());  
  19. //将之前的-1改成真实的长度  
  20.           bb.putInt(bb.remaining() - 4).rewind();  
  21. //通过channel写回  
  22.           cnxn.sendBuffer(bb);      
  23.   
  24.           ......  
  25. //打开selector的读事件  
  26.           cnxn.enableRecv();  
  27.       ......  
  28.   }  
复制代码





具体写回,通讯层NIOServerCnxn:



  1. public void sendBuffer(ByteBuffer bb) {  
  2.         try {  
  3.             if (bb != ServerCnxnFactory.closeConn) {  
  4.                 // We check if write interest here because if it is NOT set,  
  5.                 // nothing is queued, so we can try to send the buffer right  
  6.                 // away without waking up the selector  
  7.         //确保可写  
  8.                 if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {  
  9.                     try {  
  10.             //写回client  
  11.                         sock.write(bb);  
  12.                     } catch (IOException e) {  
  13.                         // we are just doing best effort right now  
  14.                     }  
  15.                 }  
  16.                 // if there is nothing left to send, we are done  
  17.         //一次写完了,太好了  
  18.                 if (bb.remaining() == 0) {  
  19.                     packetSent();  
  20.                     return;  
  21.                 }  
  22.             }  
  23.         //如果一次没写完,添加到输出队列,后续继续写  
  24.             synchronized(this.factory){  
  25.                 sk.selector().wakeup();  
  26.                 if (LOG.isTraceEnabled()) {  
  27.                     LOG.trace("Add a buffer to outgoingBuffers, sk " + sk  
  28.                             + " is valid: " + sk.isValid());  
  29.                 }  
  30.                 outgoingBuffers.add(bb);  
  31.                 if (sk.isValid()) {  
  32.                     sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);  
  33.                 }  
  34.             }  
  35.               
  36.         .......  
  37.     }  
复制代码



到这里server端已经执行完毕了,返回给client一个ConnectResponse对象,client端的SendThread收到server端的Response处理:

  1. void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)  
  2.       throws InterruptedException, IOException {  
  3.         SocketChannel sock = (SocketChannel) sockKey.channel();  
  4.         if (sock == null) {  
  5.             throw new IOException("Socket is null!");  
  6.         }  
  7.         if (sockKey.isReadable()) {  
  8.         //先读包的长度,一个int  
  9.             int rc = sock.read(incomingBuffer);  
  10.             if (rc < 0) {  
  11.                 throw new EndOfStreamException(  
  12.                         "Unable to read additional data from server sessionid 0x"  
  13.                                 + Long.toHexString(sessionId)  
  14.                                 + ", likely server has closed socket");  
  15.             }  
  16.         //如果读满,注意这里同一个包,要读2次,第一次读长度,第二次读内容,incomingBuffer重用  
  17.             if (!incomingBuffer.hasRemaining()) {  
  18.                 incomingBuffer.flip();  
  19.         //如果读的是长度  
  20.                 if (incomingBuffer == lenBuffer) {  
  21.                     recvCount++;  
  22.             //给incomingBuffer分配包长度的空间  
  23.                     readLength();  
  24.                 }   
  25.         //如果还未初始化,就是session还没建立,那server端返回的必须是ConnectResponse         
  26.         else if (!initialized) {  
  27.             //读取ConnectRequest,其实就是将incomingBuffer的内容反序列化成ConnectResponse对象  
  28.                     readConnectResult();  
  29.             //继续读后续响应  
  30.                     enableRead();  
  31.             //如果还有写请求,确保write事件ok  
  32.                     if (findSendablePacket(outgoingQueue,  
  33.                             cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {  
  34.                         // Since SASL authentication has completed (if client is configured to do so),  
  35.                         // outgoing packets waiting in the outgoingQueue can now be sent.  
  36.                         enableWrite();  
  37.                     }  
  38.             //准备读下一个响应  
  39.                     lenBuffer.clear();  
  40.                     incomingBuffer = lenBuffer;  
  41.                     updateLastHeard();  
  42.             //session建立完毕  
  43.                     initialized = true;  
  44.                 } else {  
  45.                     sendThread.readResponse(incomingBuffer);  
  46.                     lenBuffer.clear();  
  47.                     incomingBuffer = lenBuffer;  
  48.                     updateLastHeard();  
  49.                 }  
  50.             }  
  51.         }  
复制代码



具体的读取:
  1. void readConnectResult() throws IOException {  
  2.         .....  
  3.     //将incomingBuffer反序列化成CoonectResponse  
  4.         ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);  
  5.         BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);  
  6.         ConnectResponse conRsp = new ConnectResponse();  
  7.         conRsp.deserialize(bbia, "connect");  
  8.   
  9.         // read "is read-only" flag  
  10.         boolean isRO = false;  
  11.         try {  
  12.             isRO = bbia.readBool("readOnly");  
  13.         } catch (IOException e) {  
  14.             // this is ok -- just a packet from an old server which  
  15.             // doesn't contain readOnly field  
  16.             LOG.warn("Connected to an old server; r-o mode will be unavailable");  
  17.         }  
  18.     //server返回的sessionId  
  19.         this.sessionId = conRsp.getSessionId();  
  20.     //后续处理,初始化client的一些参数,最后触发WatchedEvent  
  21.         sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,  
  22.                 conRsp.getPasswd(), isRO);  
  23.     }  
复制代码




后续处理如下:
  1. void onConnected(int _negotiatedSessionTimeout, long _sessionId,  
  2.                 byte[] _sessionPasswd, boolean isRO) throws IOException {  
  3.             negotiatedSessionTimeout = _negotiatedSessionTimeout;  
  4.             ......  
  5.         //初始化client端的session相关参数  
  6.             readTimeout = negotiatedSessionTimeout * 2 / 3;  
  7.             connectTimeout = negotiatedSessionTimeout / hostProvider.size();  
  8.             hostProvider.onConnected();  
  9.             sessionId = _sessionId;  
  10.             sessionPasswd = _sessionPasswd;  
  11.         //修改CONNECT状态  
  12.             state = (isRO) ?  
  13.                     States.CONNECTEDREADONLY : States.CONNECTED;  
  14.             seenRwServerBefore |= !isRO;  
  15.             LOG.info("Session establishment complete on server "  
  16.                     + clientCnxnSocket.getRemoteSocketAddress()  
  17.                     + ", sessionid = 0x" + Long.toHexString(sessionId)  
  18.                     + ", negotiated timeout = " + negotiatedSessionTimeout  
  19.                     + (isRO ? " (READ-ONLY mode)" : ""));  
  20.         //触发一个SyncConnected事件,这里有专门的EventThread会异步通知注册的watcher来处理  
  21.             KeeperState eventState = (isRO) ?  
  22.                     KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;  
  23.             eventThread.queueEvent(new WatchedEvent(  
  24.                     Watcher.Event.EventType.None,  
  25.                     eventState, null));  
  26.         }  
复制代码


EventThread处理:
  1.   public void queueEvent(WatchedEvent event) {  
  2.           if (event.getType() == EventType.None  
  3.                   && sessionState == event.getState()) {  
  4.               return;  
  5.           }  
  6. //EventThread同步session状态  
  7.           sessionState = event.getState();  
  8.   
  9.           // materialize the watchers based on the event  
  10. //找出那些需要被通知的watcher,主线程直接调用对应watcher接口即可  
  11.           WatcherSetEventPair pair = new WatcherSetEventPair(  
  12.                   watcher.materialize(event.getState(), event.getType(),  
  13.                           event.getPath()),  
  14.                           event);  
  15.           // queue the pair (watch set & event) for later processing  
  16. //提交异步队列处理  
  17.           waitingEvents.add(pair);  
  18.       }  
复制代码






EventThread主线程

  1. public void run() {  
  2.            try {  
  3.               isRunning = true;  
  4.               while (true) {  
  5.         //拿事件  
  6.                  Object event = waitingEvents.take();  
  7.                  if (event == eventOfDeath) {  
  8.                     wasKilled = true;  
  9.                  } else {  
  10.             //处理  
  11.                     processEvent(event);  
  12.                  }  
  13.                  if (wasKilled)  
  14.                     synchronized (waitingEvents) {  
  15.                        if (waitingEvents.isEmpty()) {  
  16.                           isRunning = false;  
  17.                           break;  
  18.                        }  
  19.                     }  
  20.               }  
  21.            } catch (InterruptedException e) {  
  22.               LOG.error("Event thread exiting due to interruption", e);  
  23.            }  
  24.   
  25.             LOG.info("EventThread shut down");  
  26.         }  
复制代码







具体处理:

  1. if (event instanceof WatcherSetEventPair) {  
  2.                   // each watcher will process the event  
  3.                   WatcherSetEventPair pair = (WatcherSetEventPair) event;  
  4.                   for (Watcher watcher : pair.watchers) {  
  5.                       try {  
  6.                           watcher.process(pair.event);  
  7.                       } catch (Throwable t) {  
  8.                           LOG.error("Error while calling watcher ", t);  
  9.                       }  
  10.                   }  
  11.               }   
复制代码







在我们的例子里,会调用Executor这个watcher的process方法,又代理给了DataMonitor,对于SyncConnected啥事不干


  1. case SyncConnected:  
  2.                 // In this particular example we don't need to do anything  
  3.                 // here - watches are automatically re-registered with  
  4.                 // server and any watches triggered while the client was  
  5.                 // disconnected will be delivered (in order of course)  
  6.                 break;  
复制代码




好了,到这里client和server端session已经建立,可以进行后续的业务处理了。通过这个例子,我们讲解了client和server是如何交互数据,后续的请求比如create,get,set,delete都是类似流程。
Session建立核心流程:
1.创建TCP连接
2.client发送ConnectRequest包
3.server收到ConnectRequest包,创建session,将server端的sessionId返回给client
4.client收到server的响应,触发相应SyncConnected状态的事件
5.client端watcher消费事件

相关文章推荐:
Zookeeper源码分析之一Server启动

Zookeeper源码分析之三Exists请求和处理

深入浅出Zookeeper之四Create请求和处理

Zookeeper源码分析之五 Leader选举

Zookeeper源码分析之六 Leader/Follower初始化

Zookeeper源码分析之七分布式CREATE事务处理


已有(2)人评论

跳转到指定楼层
ainubis 发表于 2015-3-28 20:17:51
谢谢楼主分享。
回复

使用道具 举报

feng01301218 发表于 2015-4-3 18:10:54

1.Session建立核心流程的步骤是什么?
2.CREATE_SESSION的作用是什么?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条