desehawk 发表于 2016-6-23 18:22:53

Hadoop源码分析1——数据节点写数据

本帖最后由 desehawk 于 2016-6-23 18:29 编辑


问题导读

1.客户端写文件时系统各节点是如何配合的?
2.你认为DistributedFileSystem的作用是什么?
3.写数据包括哪些字段?

static/image/hrline/4.gif


即使不考虑数据节点出错后的故障处理,文件写入也是HDFS中最复杂的流程。本章以创建一个新文件并向文件中写入数据,然后关闭文件为例,分析客户端写文件时系统各节点的配合,如下图所示。



客户端调用DistributedFileSystem的create()方法创建文件,上图的步骤1,这时,DistributedFileSystem创建DFSOutputStream,并由远程过程调用,让名字节点执行同名方法,在文件系统的命名空间中创建一个新文件。名字节点创建新文件时,需要执行各种各样的检查,如名字节点处于正常工作状态,被创建的文件不存在,客户端有在父目录中创建文件的权限等。这些检查都通过以后,名字节点会构造一个新文件,并记录创建操作到编辑日志edits中。远程方法调用结束后,DistributedFileSystem将该DFSOutputStream对象包裹在FSDataOutputStream实例中,返回给客户端。

在上图的步骤3客户端写入数据时,由于create()调用创建了一个空文件,所以,DFSOutputStream实例首先需要向名字节点申请数据块,addBlock()方法成功执行后,返回一个LocatedBlock对象。该对象包含了新数据块的数据块标识和版本号,同时,它的成员变量LocatedBlock.locs提供了数据流管道的信息,通过上述信息,DFSOutputStream就可以和数据节点联系,通过写数据接口建立数据流管道。客户端写入FSDataOutputStream流中的数据,被分成一个一个的文件包,放入DFSOutputStream对象的内部队列。该队列中的文件包最后打包成数据包,发往数据流管道,流经管道上的各个数据节点,并持久化。确认包,上图步骤6逆流而上,从数据流管道依次发往客户端,当客户端收到应答时,它将对应的包从内部队列移除。

DFSOutputStream在写完一个数据块后,数据流管道上的节点,会通过和名字节点的DatanodeProtocol远程接口的blockReceived()方法,向名字节点提交数据块。如果数据队列中还有等待输出的数据,DFSOutputStream对象需要再次调用addBlock()方法,为文件添加新的数据块。

客户端完成数据的写入后,调用close()方法关闭流,步骤8。关闭意味着客户端不会再往流中写入数据,所以,当DFSOutputStream数据队列中的文件包都收到应答后,就可以使用ClinetProtocol.complete()方法通知名字节点关闭文件,完成一次正常的写文件流程。

写数据
流式接口的写数据实现远比读数据复杂。客户端写HDFS文件数据的操作码为80,请求包含如下主要字段:
blockId(数据块ID):写数据的数据块标识,数据节点通过它定位数据块。
generationStamp(版本号):数据块的版本号,用于进行版本检查。
pipelineSize(数据流管道的大小):参与到写过程的所有数据节点的个数。
isRecovery(是否是数据恢复过程):这个写操作是不是错误恢复过程中的一部分。
clientName(客户端名字):发起写请求的客户端名字,可能为空。
hasSrcDataNode(源信息标记):写请求是否携带源信息,如果是true,则包含源信息。
srcDataNode(源信息,可选):类型为DatanodeInfo,包含发起写请求的数据节点信息。
numTargets(数据目标列表大小):当前数据节点还有多少个下游数据推送目标。
targets(数据目标列表):当前数据节点的下游数据推送目标列表。
accessToken(访问令牌):与安全特性相关,不讨论。
checksum(数据校验信息):类型为DataChecksum,包含了后续写数据数据包的校验方式。



与org.apache.Hadoop.hdfs.server.datanode.DataXceiver.Java.readBlock类似,上述字段在writeBlock()入口中读取,并保存在对应的方法变量中,然后,构造数据块接收器org.apache.hadoop.hdfs.server.datanode.BlockReceiver对象,

/**
   * Write a block to disk.
   *
   * @param in The stream to read from
   * @throws IOException
   */
private void writeBlock(DataInputStream in) throws IOException {
    DatanodeInfo srcDataNode = null;
    LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
            " tcp no delay " + s.getTcpNoDelay());
    //
    // Read in the header
    //
    Block block = new Block(in.readLong(),
      dataXceiverServer.estimateBlockSize, in.readLong());
    LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: "
      + localAddress);
    int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
    boolean isRecovery = in.readBoolean(); // is this part of recovery?
    String client = Text.readString(in); // working on behalf of this client
    boolean hasSrcDataNode = in.readBoolean(); // is src node info present
    if (hasSrcDataNode) {
      srcDataNode = new DatanodeInfo();
      srcDataNode.readFields(in);
    }
    int numTargets = in.readInt();
    if (numTargets < 0) {
      throw new IOException("Mislabelled incoming datastream.");
    }
    DatanodeInfo targets[] = new DatanodeInfo;
    for (int i = 0; i < targets.length; i++) {
      DatanodeInfo tmp = new DatanodeInfo();
      tmp.readFields(in);
      targets = tmp;
    }
    Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
    accessToken.readFields(in);
    DataOutputStream replyOut = null;   // stream to prev target
    replyOut = new DataOutputStream(
                   NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
    if (datanode.isBlockTokenEnabled) {
      try {
      datanode.blockTokenSecretManager.checkAccess(accessToken, null, block,
            BlockTokenSecretManager.AccessMode.WRITE);
      } catch (InvalidToken e) {
      try {
          if (client.length() != 0) {
            replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
            Text.writeString(replyOut, datanode.dnRegistration.getName());
            replyOut.flush();
          }
          throw new IOException("Access token verification failed, for client "
            + remoteAddress + " for OP_WRITE_BLOCK for " + block);
      } finally {
          IOUtils.closeStream(replyOut);
      }
      }
    }

    DataOutputStream mirrorOut = null;// stream to next target
    DataInputStream mirrorIn = null;    // reply from next target
    Socket mirrorSock = null;         // socket to next target
    BlockReceiver blockReceiver = null; // responsible for data handling
    String mirrorNode = null;         // the name:port of next target
    String firstBadLink = "";         // first datanode that failed in connection setup
    short mirrorInStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
    try {
      // open a block receiver and check if the block does not exist
      blockReceiver = new BlockReceiver(block, in,
          s.getRemoteSocketAddress().toString(),
          s.getLocalSocketAddress().toString(),
          isRecovery, client, srcDataNode, datanode);

      //
      // Open network conn to backup machine, if
      // appropriate
      //
      if (targets.length > 0) {
      InetSocketAddress mirrorTarget = null;
      // Connect to backup machine
      final String mirrorAddrString =
          targets.getName(connectToDnViaHostname);
      mirrorNode = targets.getName();
      mirrorTarget = NetUtils.createSocketAddr(mirrorAddrString);
      mirrorSock = datanode.newSocket();
      try {
          int timeoutValue = datanode.socketTimeout +
                           (HdfsConstants.READ_TIMEOUT_EXTENSION * numTargets);
          int writeTimeout = datanode.socketWriteTimeout +
                           (HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets);
          LOG.debug("Connecting to " + mirrorAddrString);
          NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
          mirrorSock.setSoTimeout(timeoutValue);
          mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
          mirrorOut = new DataOutputStream(
             new BufferedOutputStream(
                         NetUtils.getOutputStream(mirrorSock, writeTimeout),
                         SMALL_BUFFER_SIZE));
          mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));

          // Write header: Copied from DFSClient.java!
          mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
          mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK );
          mirrorOut.writeLong( block.getBlockId() );
          mirrorOut.writeLong( block.getGenerationStamp() );
          mirrorOut.writeInt( pipelineSize );
          mirrorOut.writeBoolean( isRecovery );
          Text.writeString( mirrorOut, client );
          mirrorOut.writeBoolean(hasSrcDataNode);
          if (hasSrcDataNode) { // pass src node information
            srcDataNode.write(mirrorOut);
          }
          mirrorOut.writeInt( targets.length - 1 );
          for ( int i = 1; i < targets.length; i++ ) {
            targets.write( mirrorOut );
          }
          accessToken.write(mirrorOut);

          blockReceiver.writeChecksumHeader(mirrorOut);
          mirrorOut.flush();

          // read connect ack (only for clients, not for replication req)
          if (client.length() != 0) {
            mirrorInStatus = mirrorIn.readShort();
            firstBadLink = Text.readString(mirrorIn);
            if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
            LOG.info("Datanode " + targets.length +
                     " got response for connect ack " +
                     " from downstream datanode with firstbadlink as " +
                     firstBadLink);
            }
          }

      } catch (IOException e) {
          if (client.length() != 0) {
            replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
            Text.writeString(replyOut, mirrorNode);
            replyOut.flush();
          }
          IOUtils.closeStream(mirrorOut);
          mirrorOut = null;
          IOUtils.closeStream(mirrorIn);
          mirrorIn = null;
          IOUtils.closeSocket(mirrorSock);
          mirrorSock = null;
          if (client.length() > 0) {
            throw e;
          } else {
            LOG.info(datanode.dnRegistration + ":Exception transfering " +
                     block + " to mirror " + mirrorNode +
                     "- continuing without the mirror\n" +
                     StringUtils.stringifyException(e));
          }
      }
      }

      // send connect ack back to source (only for clients)
      if (client.length() != 0) {
      if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
          LOG.info("Datanode " + targets.length +
                   " forwarding connect ack to upstream firstbadlink is " +
                   firstBadLink);
      }
      replyOut.writeShort(mirrorInStatus);
      Text.writeString(replyOut, firstBadLink);
      replyOut.flush();
      }

      // receive the block and mirror to the next target
      String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
      blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
                                 mirrorAddr, null, targets.length);

      // if this write is for a replication request (and not
      // from a client), then confirm block. For client-writes,
      // the block is finalized in the PacketResponder.
      if (client.length() == 0) {
      datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
      LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
            + localAddress + " size " + block.getNumBytes());
      }

      if (datanode.blockScanner != null) {
      datanode.blockScanner.addBlock(block);
      }

    } catch (IOException ioe) {
      LOG.info("writeBlock " + block + " received exception " + ioe);
      throw ioe;
    } finally {
      // close all opened streams
      IOUtils.closeStream(mirrorOut);
      IOUtils.closeStream(mirrorIn);
      IOUtils.closeStream(replyOut);
      IOUtils.closeSocket(mirrorSock);
      IOUtils.closeStream(blockReceiver);
    }
}



读取数据包头的代码

//
    // Read in the header
    //
    Block block = new Block(in.readLong(),
      dataXceiverServer.estimateBlockSize, in.readLong());
    LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: "
      + localAddress);
    int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
    boolean isRecovery = in.readBoolean(); // is this part of recovery?
    String client = Text.readString(in); // working on behalf of this client
    boolean hasSrcDataNode = in.readBoolean(); // is src node info present
    if (hasSrcDataNode) {
      srcDataNode = new DatanodeInfo();
      srcDataNode.readFields(in);
    }
    int numTargets = in.readInt();
    if (numTargets < 0) {
      throw new IOException("Mislabelled incoming datastream.");
    }
    DatanodeInfo targets[] = new DatanodeInfo;
    for (int i = 0; i < targets.length; i++) {
      DatanodeInfo tmp = new DatanodeInfo();
      tmp.readFields(in);
      targets = tmp;
    }
    Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
    accessToken.readFields(in);


构造BlockReceiver对象的代码
blockReceiver = new BlockReceiver(block, in,
          s.getRemoteSocketAddress().toString(),
          s.getLocalSocketAddress().toString(),
          isRecovery, client, srcDataNode, datanode);



在BlockReceiver的构造函数中,会为写数据块和校验信息文件打开输出数据流,使用的是FSDataset.writeToBlock()方法,在完成一系列检查后,它返回到数据块文件和校验文件的输出流。

BlockReceiver(Block block, DataInputStream in, String inAddr,
                String myAddr, boolean isRecovery, String clientName,
                DatanodeInfo srcDataNode, DataNode datanode) throws IOException {
    try{
      this.block = block;
      this.in = in;
      this.inAddr = inAddr;
      this.myAddr = myAddr;
      this.isRecovery = isRecovery;
      this.clientName = clientName;
      this.offsetInBlock = 0;
      this.srcDataNode = srcDataNode;
      this.datanode = datanode;
      this.checksum = DataChecksum.newDataChecksum(in);
      this.bytesPerChecksum = checksum.getBytesPerChecksum();
      this.checksumSize = checksum.getChecksumSize();
      this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
      this.syncBehindWrites = datanode.shouldSyncBehindWrites();
      //
      // Open local disk out
      //
      streams = datanode.data.writeToBlock(block, isRecovery,
                              clientName == null || clientName.length() == 0);
      this.finalized = false;
      if (streams != null) {
      this.out = streams.dataOut;
      this.cout = streams.checksumOut;
      if (out instanceof FileOutputStream) {
          this.outFd = ((FileOutputStream) out).getFD();
      } else {
          LOG.warn("Could not get file descriptor for outputstream of class "
            + out.getClass());
      }
      this.checksumOut = new DataOutputStream(new BufferedOutputStream(
                                                streams.checksumOut,
                                                SMALL_BUFFER_SIZE));
      // If this block is for appends, then remove it from periodic
      // validation.
      if (datanode.blockScanner != null && isRecovery) {
          datanode.blockScanner.deleteBlock(block);
      }
      }
    } catch (BlockAlreadyExistsException bae) {
      throw bae;
    } catch(IOException ioe) {
      IOUtils.closeStream(this);
      cleanupBlock();

      // check if there is a disk error
      IOException cause = FSDataset.getCauseIfDiskError(ioe);
      DataNode.LOG.warn("IOException in BlockReceiver constructor. Cause is ",
          cause);

      if (cause != null) { // possible disk error
      ioe = cause;
      datanode.checkDiskError(ioe); // may throw an exception here
      }

      throw ioe;
    }
}


接下来,DataXceiver.writeBlock()将根据请求信息建立数据流管道。
数据流管道中,顺流的是HDFS的文件数据(下图的粗箭头方向),而写操作的确认包会逆流而上,所以,这里需要两个Socket对象。其中,对象s用于和管道上游通信,它的输入和输出流分别是in和replyOut;往下游的Socket对象是mirrorSock,关联了输出流mirrorOut和输入流mirrorIn。



如果当前数据节点不是数据管道的最末端,writeBlock()方法就会使用数据目标列表的第一项,建立到下一个数据节点的Socket连接,连接建立后,通过输出流mirrorOut,往下一个数据节点发起写请求,除了数据目标列表大小和数据目标列表字段会相应变化以外,其他字段和上游读到的请求信息是一致的(严格地说,这里不涉及请求的数据校验信息,即上面讨论的接收发送字段只包含请求前面的10个字段)。
代码如下
// Write header: Copied from DFSClient.java!
          mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
          mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK );
          mirrorOut.writeLong( block.getBlockId() );
          mirrorOut.writeLong( block.getGenerationStamp() );
          mirrorOut.writeInt( pipelineSize );
          mirrorOut.writeBoolean( isRecovery );
          Text.writeString( mirrorOut, client );
          mirrorOut.writeBoolean(hasSrcDataNode);
          if (hasSrcDataNode) { // pass src node information
            srcDataNode.write(mirrorOut);
          }
          mirrorOut.writeInt( targets.length - 1 );
          for ( int i = 1; i < targets.length; i++ ) {
            targets.write( mirrorOut );
          }
          accessToken.write(mirrorOut);

          blockReceiver.writeChecksumHeader(mirrorOut);
          mirrorOut.flush();


往下一个数据节点的写请求发送以后,writeBlock()会等待请求的应答,这是一个同步的过程,由于数据流中会有多个数据节点,所以建立数据流管道会花比较长的时间,这也是HDFS不适合用于低延迟数据访问场景的原因之一。
应答包含的内容:返回码和附加信息。当返回码是DataTransferProtocol.OP_STATUS_ERROR,即出错时,附加信息提供了流中第一个出错的数据节点地址信息(主机名和端口)。方法变量mirrorInStatus和firstBadLink用于保存返回码和附加信息。writeBlock()通过从流mirrorIn中读取返回码和附加信息,等待下游节点的应答。
// read connect ack (only for clients, not for replication req)
          if (client.length() != 0) {
            mirrorInStatus = mirrorIn.readShort();
            firstBadLink = Text.readString(mirrorIn);
            if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
            LOG.info("Datanode " + targets.length +
                     " got response for connect ack " +
                     " from downstream datanode with firstbadlink as " +
                     firstBadLink);
            }
          }


顺利完成各类初始化任务,写应答
// send connect ack back to source (only for clients)
      if (client.length() != 0) {
      if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
          LOG.info("Datanode " + targets.length +
                   " forwarding connect ack to upstream firstbadlink is " +
                   firstBadLink);
      }
      replyOut.writeShort(mirrorInStatus);
      Text.writeString(replyOut, firstBadLink);
      replyOut.flush();
      }


DataXceiver委托BlockReceiver.receiveBlock()处理写数据的数据包。

// receive the block and mirror to the next target
      String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
      blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
                                 mirrorAddr, null, targets.length);

成功处理完这些数据包以后,调用DataNode.notifyNamenodeReceivedBlock()通知名字节点。

// if this write is for a replication request (and not
      // from a client), then confirm block. For client-writes,
      // the block is finalized in the PacketResponder.
      if (client.length() == 0) {
      datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
      LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
            + localAddress + " size " + block.getNumBytes());
      }

      if (datanode.blockScanner != null) {
      datanode.blockScanner.addBlock(block);
      }

最后,writeBlock()会关闭到上下游的输入/输出流,完成一次写数据请求
// close all opened streams
      IOUtils.closeStream(mirrorOut);
      IOUtils.closeStream(mirrorIn);
      IOUtils.closeStream(replyOut);
      IOUtils.closeSocket(mirrorSock);
IOUtils.closeStream(blockReceiver);

PacketResponder线程
当BlockReceiver处理客户端的写数据请求时,方法receiveBlock()接收数据包,校验数据并保存到本地的数据块文件和校验信息文件中,如果节点处于数据流管道的中间,它还需要向下一个数据节点转发数据包。同时,数据节点还需要从下游接收数据包确认,并向上游转发。为此,数据块接收器引入了PacketResponder线程,它和BlockReceiver所在的线程(下面称为BlockReceiver线程)一起工作,分别用于从下游接收应答和从上游接收数据。
org.apache.hadoop.hdfs.server.datanode.BlockReceiver.java中的receiveBlock方法

void receiveBlock(
      DataOutputStream mirrOut, // output to next datanode
      DataInputStream mirrIn,   // input from next datanode
      DataOutputStream replyOut,// output to previous datanode
      String mirrAddr, DataTransferThrottler throttlerArg,
      int numTargets) throws IOException {

      mirrorOut = mirrOut;
      mirrorAddr = mirrAddr;
      throttler = throttlerArg;

    try {
      // write data chunk header
      if (!finalized) {
      BlockMetadataHeader.writeHeader(checksumOut, checksum);
      }
      if (clientName.length() > 0) {
      responder = new Daemon(datanode.threadGroup,
                               new PacketResponder(this, block, mirrIn,
                                                   replyOut, numTargets,
                                                   Thread.currentThread()));
      responder.start(); // start thread to processes reponses
      }

      /*
       * Receive until packet length is zero.
       */
      while (receivePacket() > 0) {}

      // flush the mirror out
      if (mirrorOut != null) {
      try {
          mirrorOut.writeInt(0); // mark the end of the block
          mirrorOut.flush();
      } catch (IOException e) {
          handleMirrorOutError(e);
      }
      }

      // wait for all outstanding packet responses. And then
      // indicate responder to gracefully shutdown.
      if (responder != null) {
      ((PacketResponder)responder.getRunnable()).close();
      }

      // if this write is for a replication request (and not
      // from a client), then finalize block. For client-writes,
      // the block is finalized in the PacketResponder.
      if (clientName.length() == 0) {
      // close the block/crc files
      close();

      // Finalize the block. Does this fsync()?
      block.setNumBytes(offsetInBlock);
      datanode.data.finalizeBlock(block);
      datanode.myMetrics.incrBlocksWritten();
      }

    } catch (IOException ioe) {
      LOG.info("Exception in receiveBlock for " + block + " " + ioe);
      IOUtils.closeStream(this);
      if (responder != null) {
      responder.interrupt();
      }
      cleanupBlock();
      throw ioe;
    } finally {
      if (responder != null) {
      try {
          responder.join();
      } catch (InterruptedException e) {
          throw new IOException("Interrupted receiveBlock");
      }
      responder = null;
      }
    }
}

PacketResponder线程从下游数据节点接收确认,并在合适的时候,往上游发送。这里的“合适”包括两个条件:
1、当前数据节点已经顺利处理完该数据包。
2、(数据节点处于管道的中间时)当前数据节点收到下游数据节点的数据包确认。
这两个条件都满足,意味着当前数据节点和数据流管道后续数据节点都完成了对某个数据包的处理。
由于当前节点由BlockReceiver线程处理数据包,所以,它必须将处理结果通过某种机制,通知到PacketResponder线程,并由PacketResponder线程进行进一步的处理。

/**
   * Processed responses from downstream datanodes in the pipeline
   * and sends back replies to the originator.
   */
class PacketResponder implements Runnable, FSConstants {   

    //packet waiting for ack
    private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
    private volatile boolean running = true;
    private Block block;
    DataInputStream mirrorIn;   // input from downstream datanode
    DataOutputStream replyOut;// output to upstream datanode
    private int numTargets;   // number of downstream datanodes including myself
    private BlockReceiver receiver; // The owner of this responder.
    private Thread receiverThread; // the thread that spawns this responder

    public String toString() {
      return "PacketResponder " + numTargets + " for " + this.block;
    }

    PacketResponder(BlockReceiver receiver, Block b, DataInputStream in,
                  DataOutputStream out, int numTargets,
                  Thread receiverThread) {
      this.receiver = receiver;
      this.block = b;
      mirrorIn = in;
      replyOut = out;
      this.numTargets = numTargets;
      this.receiverThread = receiverThread;
    }

    /**
   * enqueue the seqno that is still be to acked by the downstream datanode.
   * @param seqno
   * @param lastPacketInBlock
   */
    synchronized void enqueue(long seqno, boolean lastPacketInBlock) {
      if (running) {
      LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
                  " to ack queue.");
      ackQueue.addLast(new Packet(seqno, lastPacketInBlock));
      notifyAll();
      }
    }

    /**
   * wait for all pending packets to be acked. Then shutdown thread.
   */
    synchronized void close() {
      while (running && ackQueue.size() != 0 && datanode.shouldRun) {
      try {
          wait();
      } catch (InterruptedException e) {
          running = false;
      }
      }
      LOG.debug("PacketResponder " + numTargets +
               " for block " + block + " Closing down.");
      running = false;
      notifyAll();
    }

    /**
   * Thread to process incoming acks.
   * @see java.lang.Runnable#run()
   */
    public void run() {
      boolean lastPacketInBlock = false;
      boolean isInterrupted = false;
      final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
      while (running && datanode.shouldRun && !lastPacketInBlock) {

      try {
          /**
         * Sequence number -2 is a special value that is used when
         * a DN fails to read an ack from a downstream. In this case,
         * it needs to tell the client that there's been an error downstream
         * but has no valid sequence number to use. Thus, -2 is used
         * as an UNKNOWN value.
         */
          long expected = PipelineAck.UNKOWN_SEQNO;
          long seqno = PipelineAck.UNKOWN_SEQNO;;

          PipelineAck ack = new PipelineAck();
          boolean localMirrorError = mirrorError;
          try {
            Packet pkt = null;
            synchronized (this) {
            // wait for a packet to arrive
            while (running && datanode.shouldRun && ackQueue.size() == 0) {
                if (LOG.isDebugEnabled()) {
                  LOG.debug("PacketResponder " + numTargets +
                            " seqno = " + seqno +
                            " for block " + block +
                            " waiting for local datanode to finish write.");
                  }
                  wait();
                }
                if (!running || !datanode.shouldRun) {
                  break;
                }
                pkt = ackQueue.removeFirst();
                expected = pkt.seqno;
                notifyAll();
            }
            // receive an ack if DN is not the last one in the pipeline
            if (numTargets > 0 && !localMirrorError) {
                // read an ack from downstream datanode
                ack.readFields(mirrorIn);
                if (LOG.isDebugEnabled()) {
                  LOG.debug("PacketResponder " + numTargets +
                      " for block " + block + " got " + ack);
                }
                seqno = ack.getSeqno();
                // verify seqno
                if (seqno != expected) {
                  throw new IOException("PacketResponder " + numTargets +
                      " for block " + block +
                      " expected seqno:" + expected +
                      " received:" + seqno);
                }
            }
            lastPacketInBlock = pkt.lastPacketInBlock;
            } catch (InterruptedException ine) {
            isInterrupted = true;
            } catch (IOException ioe) {
            if (Thread.interrupted()) {
                isInterrupted = true;
            } else {
                // continue to run even if can not read from mirror
                // notify client of the error
                // and wait for the client to shut down the pipeline
                mirrorError = true;
                LOG.info("PacketResponder " + block + " " + numTargets +
                  " Exception " + StringUtils.stringifyException(ioe));
            }
            }

            if (Thread.interrupted() || isInterrupted) {
            /* The receiver thread cancelled this thread.
               * We could also check any other status updates from the
               * receiver thread (e.g. if it is ok to write to replyOut).
               * It is prudent to not send any more status back to the client
               * because this datanode has a problem. The upstream datanode
               * will detect that this datanode is bad, and rightly so.
               */
            LOG.info("PacketResponder " + block +" " + numTargets +
                     " : Thread is interrupted.");
            break;
            }

            // If this is the last packet in block, then close block
            // file and finalize the block before responding success
            if (lastPacketInBlock && !receiver.finalized) {
            receiver.close();
            final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
            block.setNumBytes(receiver.offsetInBlock);
            datanode.data.finalizeBlock(block);
            datanode.myMetrics.incrBlocksWritten();
            datanode.notifyNamenodeReceivedBlock(block,
                  DataNode.EMPTY_DEL_HINT);
            if (ClientTraceLog.isInfoEnabled() &&
                  receiver.clientName.length() > 0) {
                long offset = 0;
                ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
                      receiver.inAddr, receiver.myAddr, block.getNumBytes(),
                      "HDFS_WRITE", receiver.clientName, offset,
                      datanode.dnRegistration.getStorageID(), block, endTime-startTime));
            } else {
                LOG.info("Received " + block + " of size " + block.getNumBytes() +
                         " from " + receiver.inAddr);
            }
            }

            // construct my ack message
            short[] replies = null;
            if (mirrorError) { // no ack is read
                replies = new short;
                replies = DataTransferProtocol.OP_STATUS_SUCCESS;
                replies = DataTransferProtocol.OP_STATUS_ERROR;
            } else {
                short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
                replies = new short;
                replies = DataTransferProtocol.OP_STATUS_SUCCESS;
                for (int i=0; i<ackLen; i++) {
                  replies = ack.getReply(i);
                }
            }
            PipelineAck replyAck = new PipelineAck(expected, replies);

            // send my ack back to upstream datanode
            replyAck.write(replyOut);
            replyOut.flush();
            if (LOG.isDebugEnabled()) {
            LOG.debug("PacketResponder " + numTargets +
                        " for block " + block +
                        " responded an ack: " + replyAck);
            }
      } catch (Throwable e) {
          LOG.warn("IOException in BlockReceiver.run(): ", e);
          if (running) {
            LOG.info("PacketResponder " + block + " " + numTargets +
                     " Exception " + StringUtils.stringifyException(e));
            running = false;
          }
          if (!Thread.interrupted()) { // error not caused by interruption
            receiverThread.interrupt();
          }
      }
      }
      LOG.info("PacketResponder " + numTargets + " for " + block +
          " terminating");
    }
}


org.apache.hadoop.hdfs.server.datanode.BlockReceiver.java中的内部类
PacketResponder中的成员变量ackQueue,保存了BlockReceiver线程已经处理的写请求数据包。BlockReceiver.receivePacket()方法每处理完一个数据包,就通过PacketResponder.enqueue()将对应信息(定义在内部类BlockReceiver.Packet中,包括数据包的序列号和是否是最后一个数据包两个字段)放入队列ackQueue中,队列ackQueue中的信息由PacketResponder.run()方法处理,这是一个典型的生产者-消费者模型。
PacketResponder.run()是一个大方法,它的处理过程明显的分为两个部分:等待上述两个条件满足,以及条件满足后的处理。其中,第一部分需要通过Java的同步工具wait(),等待ackQueue中的数据,这里wait()等待的是enqueue()方法中notifyAll()的通知。如果ackQueue有数据,则获取第一个记录,接下来,如果当前数据节点位于数据流管道的中间,那么,在流mirrorIn上读取下游的确认,如果顺利读取到下游的响应,表明第一步处理已经完成,代码如下
while (running && datanode.shouldRun && !lastPacketInBlock) {

      try {
          /**
         * Sequence number -2 is a special value that is used when
         * a DN fails to read an ack from a downstream. In this case,
         * it needs to tell the client that there's been an error downstream
         * but has no valid sequence number to use. Thus, -2 is used
         * as an UNKNOWN value.
         */
          long expected = PipelineAck.UNKOWN_SEQNO;
          long seqno = PipelineAck.UNKOWN_SEQNO;;

          PipelineAck ack = new PipelineAck();
          boolean localMirrorError = mirrorError;
          try {
            Packet pkt = null;
            synchronized (this) {
            // wait for a packet to arrive
            while (running && datanode.shouldRun && ackQueue.size() == 0) {
                if (LOG.isDebugEnabled()) {
                  LOG.debug("PacketResponder " + numTargets +
                            " seqno = " + seqno +
                            " for block " + block +
                            " waiting for local datanode to finish write.");
                  }
                  wait();
                }
                if (!running || !datanode.shouldRun) {
                  break;
                }
                pkt = ackQueue.removeFirst();
                expected = pkt.seqno;
                notifyAll();
            }
            // receive an ack if DN is not the last one in the pipeline
            if (numTargets > 0 && !localMirrorError) {
                // read an ack from downstream datanode
                ack.readFields(mirrorIn);
                if (LOG.isDebugEnabled()) {
                  LOG.debug("PacketResponder " + numTargets +
                      " for block " + block + " got " + ack);
                }
                seqno = ack.getSeqno();
                // verify seqno
                if (seqno != expected) {
                  throw new IOException("PacketResponder " + numTargets +
                      " for block " + block +
                      " expected seqno:" + expected +
                      " received:" + seqno);
                }
            }
            lastPacketInBlock = pkt.lastPacketInBlock;
            } catch (InterruptedException ine) {
            isInterrupted = true;
            } catch (IOException ioe) {
            if (Thread.interrupted()) {
                isInterrupted = true;
            } else {
                // continue to run even if can not read from mirror
                // notify client of the error
                // and wait for the client to shut down the pipeline
                mirrorError = true;
                LOG.info("PacketResponder " + block + " " + numTargets +
                  " Exception " + StringUtils.stringifyException(ioe));
            }
            }

            if (Thread.interrupted() || isInterrupted) {
            /* The receiver thread cancelled this thread.
               * We could also check any other status updates from the
               * receiver thread (e.g. if it is ok to write to replyOut).
               * It is prudent to not send any more status back to the client
               * because this datanode has a problem. The upstream datanode
               * will detect that this datanode is bad, and rightly so.
               */
            LOG.info("PacketResponder " + block +" " + numTargets +
                     " : Thread is interrupted.");
            break;
            }


如果处理的是整个写请求最后一个数据包的确认,这时,需要执行如下附加步骤:关闭PacketResponder所属的数据块接收器对象,设置数据块长度,使用FSDataset.finalizeBlock()方法提交数据块,最后利用notifyNamenodeReceivedBlock()通知名字节点,本节点完成了一个数据块的接收。代码如下:

// If this is the last packet in block, then close block
            // file and finalize the block before responding success
            if (lastPacketInBlock && !receiver.finalized) {
            receiver.close();
            final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
            block.setNumBytes(receiver.offsetInBlock);
            datanode.data.finalizeBlock(block);
            datanode.myMetrics.incrBlocksWritten();
            datanode.notifyNamenodeReceivedBlock(block,
                  DataNode.EMPTY_DEL_HINT);
            if (ClientTraceLog.isInfoEnabled() &&
                  receiver.clientName.length() > 0) {
                long offset = 0;
                ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
                      receiver.inAddr, receiver.myAddr, block.getNumBytes(),
                      "HDFS_WRITE", receiver.clientName, offset,
                      datanode.dnRegistration.getStorageID(), block, endTime-startTime));
            } else {
                LOG.info("Received " + block + " of size " + block.getNumBytes() +
                         " from " + receiver.inAddr);
            }
            }


无论当前处理的是否是最后一个数据包,也无论当前数据节点是否是管道的最后一个节点,确认包都需要往上游发送,代码如下

// construct my ack message
            short[] replies = null;
            if (mirrorError) { // no ack is read
                replies = new short;
                replies = DataTransferProtocol.OP_STATUS_SUCCESS;
                replies = DataTransferProtocol.OP_STATUS_ERROR;
            } else {
                short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
                replies = new short;
                replies = DataTransferProtocol.OP_STATUS_SUCCESS;
                for (int i=0; i<ackLen; i++) {
                  replies = ack.getReply(i);
                }
            }
            PipelineAck replyAck = new PipelineAck(expected, replies);

            // send my ack back to upstream datanode
            replyAck.write(replyOut);
            replyOut.flush();
            if (LOG.isDebugEnabled()) {
            LOG.debug("PacketResponder " + numTargets +
                        " for block " + block +
                        " responded an ack: " + replyAck);
            }

当客户端最终接收到确认包时,它可以断定数据流管道上的所有数据节点已经接收到对应的数据包。

数据接收

客户端写往数据节点的数据由org.apache.Hadoop.hdfs.server.datanode.BlockReceiver.Java中的receiveBlock方法接收
void receiveBlock(
      DataOutputStream mirrOut, // output to next datanode
      DataInputStream mirrIn,   // input from next datanode
      DataOutputStream replyOut,// output to previous datanode
      String mirrAddr, DataTransferThrottler throttlerArg,
      int numTargets) throws IOException {

      mirrorOut = mirrOut;
      mirrorAddr = mirrAddr;
      throttler = throttlerArg;

    try {
      // write data chunk header
      if (!finalized) {
      BlockMetadataHeader.writeHeader(checksumOut, checksum);
      }
      if (clientName.length() > 0) {
      responder = new Daemon(datanode.threadGroup,
                               new PacketResponder(this, block, mirrIn,
                                                   replyOut, numTargets,
                                                   Thread.currentThread()));
      responder.start(); // start thread to processes reponses
      }

      /*
       * Receive until packet length is zero.
       */
      while (receivePacket() > 0) {}

      // flush the mirror out
      if (mirrorOut != null) {
      try {
          mirrorOut.writeInt(0); // mark the end of the block
          mirrorOut.flush();
      } catch (IOException e) {
          handleMirrorOutError(e);
      }
      }

      // wait for all outstanding packet responses. And then
      // indicate responder to gracefully shutdown.
      if (responder != null) {
      ((PacketResponder)responder.getRunnable()).close();
      }

      // if this write is for a replication request (and not
      // from a client), then finalize block. For client-writes,
      // the block is finalized in the PacketResponder.
      if (clientName.length() == 0) {
      // close the block/crc files
      close();

      // Finalize the block. Does this fsync()?
      block.setNumBytes(offsetInBlock);
      datanode.data.finalizeBlock(block);
      datanode.myMetrics.incrBlocksWritten();
      }

    } catch (IOException ioe) {
      LOG.info("Exception in receiveBlock for " + block + " " + ioe);
      IOUtils.closeStream(this);
      if (responder != null) {
      responder.interrupt();
      }
      cleanupBlock();
      throw ioe;
    } finally {
      if (responder != null) {
      try {
          responder.join();
      } catch (InterruptedException e) {
          throw new IOException("Interrupted receiveBlock");
      }
      responder = null;
      }
    }
}


receiveBlock()将大部分处理工作交给receivePacket()。BlockReceiver.receiveBlock()循环调用receivePacket(),处理这次写请求的所有数据包。当所有的数据接收完毕后,它进行下述清理工作:发送一个空的数据包到下游节点,通知后面的数据节点这次写操作结束;等待PacketResponder线程结束,线程结束,表明所有的响应已经发送给上游节点,同时,本节点的处理结果也已经通知名字节点;关闭文件,并更新数据块文件的长度,然后,通过FSDataset.finalizeBlock()提交数据块。至此,写数据请求处理完毕
receivePacket()代码如下:

/**
   * Receives and processes a packet. It can contain many chunks.
   * returns size of the packet.
   */
private int receivePacket() throws IOException {

    int payloadLen = readNextPacket();

    if (payloadLen <= 0) {
      return payloadLen;
    }

    buf.mark();
    //read the header
    buf.getInt(); // packet length
    offsetInBlock = buf.getLong(); // get offset of packet in block
    long seqno = buf.getLong();    // get seqno
    boolean lastPacketInBlock = (buf.get() != 0);

    int endOfHeader = buf.position();
    buf.reset();

    if (LOG.isDebugEnabled()){
      LOG.debug("Receiving one packet for " + block +
                " of length " + payloadLen +
                " seqno " + seqno +
                " offsetInBlock " + offsetInBlock +
                " lastPacketInBlock " + lastPacketInBlock);
    }

    setBlockPosition(offsetInBlock);

    // First write the packet to the mirror:
    if (mirrorOut != null && !mirrorError) {
      try {
      mirrorOut.write(buf.array(), buf.position(), buf.remaining());
      mirrorOut.flush();
      } catch (IOException e) {
      handleMirrorOutError(e);
      }
    }

    buf.position(endOfHeader);      
    int len = buf.getInt();

    if (len < 0) {
      throw new IOException("Got wrong length during writeBlock(" + block +
                            ") from " + inAddr + " at offset " +
                            offsetInBlock + ": " + len);
    }

    if (len == 0) {
      LOG.debug("Receiving empty packet for " + block);
    } else {
      offsetInBlock += len;

      int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
                                                            checksumSize;

      if ( buf.remaining() != (checksumLen + len)) {
      throw new IOException("Data remaining in packet does not match " +
                              "sum of checksumLen and dataLen");
      }
      int checksumOff = buf.position();
      int dataOff = checksumOff + checksumLen;
      byte pktBuf[] = buf.array();

      buf.position(buf.limit()); // move to the end of the data.

      /* skip verifying checksum iff this is not the last one in the
       * pipeline and clientName is non-null. i.e. Checksum is verified
       * on all the datanodes when the data is being written by a
       * datanode rather than a client. Whe client is writing the data,
       * protocol includes acks and only the last datanode needs to verify
       * checksum.
       */
      if (mirrorOut == null || clientName.length() == 0) {
      verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
      }

      try {
      if (!finalized) {
          //finally write to the disk :
          out.write(pktBuf, dataOff, len);

          // If this is a partial chunk, then verify that this is the only
          // chunk in the packet. Calculate new crc for this chunk.
          if (partialCrc != null) {
            if (len > bytesPerChecksum) {
            throw new IOException("Got wrong length during writeBlock(" +
                                    block + ") from " + inAddr + " " +
                                    "A packet can have only one partial chunk."+
                                    " len = " + len +
                                    " bytesPerChecksum " + bytesPerChecksum);
            }
            partialCrc.update(pktBuf, dataOff, len);
            byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
            checksumOut.write(buf);
            LOG.debug("Writing out partial crc for data len " + len);
            partialCrc = null;
          } else {
            checksumOut.write(pktBuf, checksumOff, checksumLen);
          }
          datanode.myMetrics.incrBytesWritten(len);
          /// flush entire packet before sending ack
          flush();

          // update length only after flush to disk
          datanode.data.setVisibleLength(block, offsetInBlock);
          dropOsCacheBehindWriter(offsetInBlock);
      }
      } catch (IOException iex) {
      datanode.checkDiskError(iex);
      throw iex;
      }
    }

    // put in queue for pending acks
    if (responder != null) {
      ((PacketResponder)responder.getRunnable()).enqueue(seqno,
                                    lastPacketInBlock);
    }

    if (throttler != null) { // throttle I/O
      throttler.throttle(payloadLen);
    }

    return payloadLen;
}

receivePacket()方法一开始就调用readNextPacket(),通过该方法最少读入一个写请求数据包。读入的数据放在类型为ByteBuffer的缓冲区buf中。ByteBuffer是java.nio.Buffer的子类
readNextPacket()的实现比较琐碎,需要保证缓冲区剩余空间可以读入整个数据包,必须考虑数据包的长度(必要时需要重新分配缓冲区)、并根据需要调整数据在缓冲区中的位置。总而言之,readNextPacket()一次读入一个完整的数据包。

BlockReceiver.receivePacket()从缓冲区读入数据包的包头信息,并赋值到对象的成员变量,接下来的准备动作是:调用setBlockPosition()方法,设置写数据时的文件位置。代码如下

buf.mark();
    //read the header
    buf.getInt(); // packet length
    offsetInBlock = buf.getLong(); // get offset of packet in block
    long seqno = buf.getLong();    // get seqno
    boolean lastPacketInBlock = (buf.get() != 0);

    int endOfHeader = buf.position();
    buf.reset();

    if (LOG.isDebugEnabled()){
      LOG.debug("Receiving one packet for " + block +
                " of length " + payloadLen +
                " seqno " + seqno +
                " offsetInBlock " + offsetInBlock +
                " lastPacketInBlock " + lastPacketInBlock);
    }

    setBlockPosition(offsetInBlock);
下一篇
Hadoop源码分析——数据节点写数据2
http://www.aboutyun.com/forum.php?mod=viewthread&tid=18967







页: [1]
查看完整版本: Hadoop源码分析1——数据节点写数据