分享

Zookeeper源码分析之三Exists请求和处理

desehawk 2014-11-25 17:57:37 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 19640
本帖最后由 desehawk 于 2014-11-25 22:22 编辑

问题导读


1.finishPacket过程,源码是如何实现的?
2.Exists过程有哪些注意点?







前一篇介绍了zookeeper的client和server之间session是如何建立的。在DataMonitor的例子中,DataMonitor通过exists异步接口和server端交互,本文将介绍exists操作是如何完成。
dataMonitor开始exist操作

  1.    public void exists(final String path, Watcher watcher,  
  2.            StatCallback cb, Object ctx)  
  3.    {  
  4.         ......  
  5. //exist请求头  
  6.        RequestHeader h = new RequestHeader();  
  7.        h.setType(ZooDefs.OpCode.exists);  
  8. //请求体  
  9.        ExistsRequest request = new ExistsRequest();  
  10.        request.setPath(serverPath);  
  11.        request.setWatch(watcher != null);  
  12.        SetDataResponse response = new SetDataResponse();  
  13. //添加到发送队列  
  14.        cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,  
  15.                clientPath, serverPath, ctx, wcb);  
  16.    }  
复制代码




添加过程

  1. Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,  
  2.             Record response, AsyncCallback cb, String clientPath,  
  3.             String serverPath, Object ctx, WatchRegistration watchRegistration)  
  4.     {  
  5.         Packet packet = null;  
  6.   
  7.         // Note that we do not generate the Xid for the packet yet. It is  
  8.         // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),  
  9.         // where the packet is actually sent.  
  10.         synchronized (outgoingQueue) {  
  11.         //转换成Packet  
  12.             packet = new Packet(h, r, request, response, watchRegistration);  
  13.             packet.cb = cb;  
  14.             packet.ctx = ctx;  
  15.             packet.clientPath = clientPath;  
  16.             packet.serverPath = serverPath;  
  17.             if (!state.isAlive() || closing) {  
  18.                 conLossPacket(packet);  
  19.             } else {  
  20.                 // If the client is asking to close the session then  
  21.                 // mark as closing  
  22.                 if (h.getType() == OpCode.closeSession) {  
  23.                     closing = true;  
  24.                 }  
  25.         //添加到发送队列  
  26.                 outgoingQueue.add(packet);  
  27.             }  
  28.         }  
  29.     //唤醒下Selector,快点处理  
  30.         sendThread.getClientCnxnSocket().wakeupCnxn();  
  31.         return packet;  
  32.     }  
复制代码




接下来还是回到SendThread的发送过程,之前Session建立时已经分析过,这里有一点要注意下:
  1. //重要的业务请求,需要设置事务id  
  2. if ((p.requestHeader != null) &&  
  3.                                 (p.requestHeader.getType() != OpCode.ping) &&  
  4.                                 (p.requestHeader.getType() != OpCode.auth)) {  
  5.                             p.requestHeader.setXid(cnxn.getXid());  
  6.                         }  
复制代码




接下来Server端IO线程拿到请求,处理,过程和之前session建立时一样,就不赘述了。变化的是后续的处理链。
PrepRequestProcessor预处理
  1. //All the rest don't need to create a Txn - just verify session  
  2. /读请求,不需要创建事务,只是检查了下session是否还在,此时事务头和事务体都是null  
  3.            case OpCode.sync:  
  4.            case OpCode.exists:  
  5.            case OpCode.getData:  
  6.            case OpCode.getACL:  
  7.            case OpCode.getChildren:  
  8.            case OpCode.getChildren2:  
  9.            case OpCode.ping:  
  10.            case OpCode.setWatches:  
  11.                zks.sessionTracker.checkSession(request.sessionId,  
  12.                        request.getOwner());  
  13.                break;  
复制代码





SyncRequestProcessor处理逻辑之前已经分析过了,这里就挑重点说一下
  1. //试图将其写log,由于ExistsRequest并不是一个事务型请求,所以这里直接返回false,也就是说ExistsRequest不会被记录到log文件中  
  2. zks.getZKDatabase().append(si)  
复制代码





接下来FinalRequestProcessor处理,由于不是事务型请求,省了很多步骤,直接进入switch处理:
  1. case OpCode.exists: {  
  2.                 lastOp = "EXIS";  
  3.                 // TODO we need to figure out the security requirement for this!  
  4.                 ExistsRequest existsRequest = new ExistsRequest();  
  5.         //反序列化  
  6.                 ByteBufferInputStream.byteBuffer2Record(request.request,  
  7.                         existsRequest);  
  8.                 String path = existsRequest.getPath();  
  9.                 if (path.indexOf('\0') != -1) {  
  10.                     throw new KeeperException.BadArgumentsException();  
  11.                 }  
  12.         //拿对应node的状态,并设置是否watch  
  13.                 Stat stat = zks.getZKDatabase().statNode(path, existsRequest  
  14.                         .getWatch() ? cnxn : null);  
  15.         //结果  
  16.                 rsp = new ExistsResponse(stat);  
  17.                 break;  
  18.             }  
  19. ......  
  20. //当前处理zxid  
  21. long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();  
  22. //构造相应头  
  23.         ReplyHeader hdr =  
  24.             new ReplyHeader(request.cxid, lastZxid, err.intValue());  
  25.   
  26.         zks.serverStats().updateLatency(request.createTime);  
  27.         cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,  
  28.                     request.createTime, System.currentTimeMillis());  
  29.   
  30.         try {  
  31.     //写回响应  
  32.             cnxn.sendResponse(hdr, rsp, "response");  
复制代码




statNode过程

  1. //此处的watcher就是对应ServerCnxn,代表是哪个client,server端需要notify的时候,直接往对应ServerCnxn写数据即可  
  2. public Stat statNode(String path, Watcher watcher)  
  3.             throws KeeperException.NoNodeException {  
  4.         Stat stat = new Stat();  
  5.         DataNode n = nodes.get(path);  
  6.         if (watcher != null) {  
  7.         //对于exists请求,需要监听data变化事件,添加watcher  
  8.             dataWatches.addWatch(path, watcher);  
  9.         }  
  10.         if (n == null) {  
  11.             throw new KeeperException.NoNodeException();  
  12.         }  
  13.         synchronized (n) {  
  14.             n.copyStat(stat);  
  15.             return stat;  
  16.         }  
  17.     }  
复制代码



好了,以上server端就完成了ExistsRequest的处理了。接下来client端SendThread收到ExistsResponse进行处理
  1.       if (sockKey.isReadable()) {  
  2.           int rc = sock.read(incomingBuffer);  
  3. ......              
  4. else if (!initialized) {  
  5.                   readConnectResult();  
  6.                   enableRead();  
  7.                   if (findSendablePacket(outgoingQueue,  
  8.                           cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {  
  9.                       // Since SASL authentication has completed (if client is configured to do so),  
  10.                       // outgoing packets waiting in the outgoingQueue can now be sent.  
  11.                       enableWrite();  
  12.                   }  
  13.                   lenBuffer.clear();  
  14.                   incomingBuffer = lenBuffer;  
  15.                   updateLastHeard();  
  16.                   initialized = true;  
  17.               }   
  18. //此时client session已经建立,init完成         
  19. else {  
  20.                   sendThread.readResponse(incomingBuffer);  
  21.                   lenBuffer.clear();  
  22.                   incomingBuffer = lenBuffer;  
  23.                   updateLastHeard();  
  24.               }  
  25.           }  
  26.       }  
复制代码





  具体读取:
  1. void readResponse(ByteBuffer incomingBuffer) throws IOException {  
  2.             ByteBufferInputStream bbis = new ByteBufferInputStream(  
  3.                     incomingBuffer);  
  4.             BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);  
  5.             ReplyHeader replyHdr = new ReplyHeader();  
  6.         //先读响应头,先对特殊的xid进行处理  
  7.             replyHdr.deserialize(bbia, "header");  
  8.             ......  
  9.         //从头拿之前入队的ExistsRequest请求Packet,由于client和server都是单线程处理,多队列处理,所以认为全局有序  
  10.             Packet packet;  
  11.             synchronized (pendingQueue) {  
  12.                 if (pendingQueue.size() == 0) {  
  13.                     throw new IOException("Nothing in the queue, but got "  
  14.                             + replyHdr.getXid());  
  15.                 }  
  16.                 packet = pendingQueue.remove();  
  17.             }  
  18.             /*
  19.              * Since requests are processed in order, we better get a response
  20.              * to the first request!
  21.              */  
  22.             try {  
  23.         //检查一下是否一样  
  24.                 if (packet.requestHeader.getXid() != replyHdr.getXid()) {  
  25.                     packet.replyHeader.setErr(  
  26.                             KeeperException.Code.CONNECTIONLOSS.intValue());  
  27.                     throw new IOException("Xid out of order. Got Xid "  
  28.                             + replyHdr.getXid() + " with err " +  
  29.                             + replyHdr.getErr() +  
  30.                             " expected Xid "  
  31.                             + packet.requestHeader.getXid()  
  32.                             + " for a packet with details: "  
  33.                             + packet );  
  34.                 }  
  35.   
  36.                 packet.replyHeader.setXid(replyHdr.getXid());  
  37.                 packet.replyHeader.setErr(replyHdr.getErr());  
  38.                 packet.replyHeader.setZxid(replyHdr.getZxid());  
  39.         //更新client端的最新zxid  
  40.                 if (replyHdr.getZxid() > 0) {  
  41.                     lastZxid = replyHdr.getZxid();  
  42.                 }  
  43.         //反序列化响应  
  44.                 if (packet.response != null && replyHdr.getErr() == 0) {  
  45.                     packet.response.deserialize(bbia, "response");  
  46.                 }  
  47.   
  48.                 if (LOG.isDebugEnabled()) {  
  49.                     LOG.debug("Reading reply sessionid:0x"  
  50.                             + Long.toHexString(sessionId) + ", packet:: " + packet);  
  51.                 }  
  52.             } finally {  
  53.         //处理packet,主要是注册watcher,回调,触发事件  
  54.                 finishPacket(packet);  
  55.             }  
  56.         }  
复制代码




finishPacket过程
  1.    private void finishPacket(Packet p) {  
  2. //注册watcher,如果是exists请求,则注册到dataWatches中  
  3.        if (p.watchRegistration != null) {  
  4.            p.watchRegistration.register(p.replyHeader.getErr());  
  5.        }  
  6. //如果是同步接口,则唤醒等待的业务线程  
  7.        if (p.cb == null) {  
  8.            synchronized (p) {  
  9.                p.finished = true;  
  10.                p.notifyAll();  
  11.            }  
  12.        }   
  13. //如果是异步请求,则发送异步事件  
  14. else {  
  15.            p.finished = true;  
  16.            eventThread.queuePacket(p);  
  17.        }  
  18.    }  
复制代码



EventThread端

  1. else if (p.response instanceof ExistsResponse  
  2.                           || p.response instanceof SetDataResponse  
  3.                           || p.response instanceof SetACLResponse) {  
  4.                       StatCallback cb = (StatCallback) p.cb;  
  5.             //如果成功,增加node stat的回调参数  
  6.                       if (rc == 0) {  
  7.                           if (p.response instanceof ExistsResponse) {  
  8.                               cb.processResult(rc, clientPath, p.ctx,  
  9.                                       ((ExistsResponse) p.response)  
  10.                                               .getStat());  
  11.                           } else if (p.response instanceof SetDataResponse) {  
  12.                               cb.processResult(rc, clientPath, p.ctx,  
  13.                                       ((SetDataResponse) p.response)  
  14.                                               .getStat());  
  15.                           } else if (p.response instanceof SetACLResponse) {  
  16.                               cb.processResult(rc, clientPath, p.ctx,  
  17.                                       ((SetACLResponse) p.response)  
  18.                                               .getStat());  
  19.                           }  
  20.                       }   
  21.             //如果响应失败,stat为null  
  22.             else {  
  23.                           cb.processResult(rc, clientPath, p.ctx, null);  
  24.                       }  
  25.                   }   
复制代码





在DataMonitor例子中,它本身就是一个StatCallback
  1. public void processResult(int rc, String path, Object ctx, Stat stat) {  
  2.         boolean exists;  
  3.         switch (rc) {  
  4.         case Code.Ok:  
  5.             exists = true;  
  6.             break;  
  7.         case Code.NoNode:  
  8.             exists = false;  
  9.             break;  
  10.         case Code.SessionExpired:  
  11.         case Code.NoAuth:  
  12.             dead = true;  
  13.             listener.closing(rc);  
  14.             return;  
  15.         default:  
  16.             // Retry errors  
  17.             zk.exists(znode, true, this, null);  
  18.             return;  
  19.         }  
复制代码



Exists过程大致就是上面描述的,主要注意点:


1.客户端Request发送完之后会进入Pending队列,等待响应之后拿出来继续处理
2.同步接口是使用Packet.wait()实现的
3.server端exists操作不是事务型的操作,不会写入log
4.server端的watcher就是一个客户端连接ServerCxcn,代表一个客户端,notify的时候直接往连接里写数据即可


相关文章推荐:
Zookeeper源码分析之一Server启动

Zookeeper源码分析之二Session建立

深入浅出Zookeeper之四Create请求和处理

Zookeeper源码分析之五 Leader选举

Zookeeper源码分析之六 Leader/Follower初始化

Zookeeper源码分析之七分布式CREATE事务处理



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条