分享

HDFS dfsclient端的处理逻辑介绍及代码实现

pig2 2014-2-23 01:38:12 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 8752
可以带着下面问题来阅读:

1.HDFS是以什么为单位读写block的?
2.packet由什么组成?
3.HDFS进行数据校验的基本单位是什么?
4.那个函数启动了一个pipeline(管道)?
5.block数据的写入采用什么方式?
6.对block的数据写入使用的是什么的方式
7.HDFS复制三份,这三份是指什么?文件?package?
8.复制三份,分别写入DataNode 1, 2, 3的过程是什么?
9.writeChunk完成的功能是什么?
10.DataStreamer线程负责把准备好的数据packet,顺序写入到DataNode,如未确认写入成功的packet,该怎么处理?
11.DataStreamer线程传输数据到DataNode时,写入dataNode的过程是什么?


HDFS一个文件由多个block构成。HDFS在进行block读写的时候是以packet(默认每个packet为64K)为单位进行的。每一个packet由若干个chunk(默认512Byte)组成。Chunk是进行数据校验的基本单位,对每一个chunk生成一个校验和(默认4Byte)并将校验和进行存储。
在写入一个block的时候,数据传输的基本单位是packet,每个packet由若干个chunk组成。

  1. FileSystem hdfs = FileSystem.get(new Configuration());
  2. Path path = new Path("/testfile");
  3. // writing
  4. FSDataOutputStream dos = hdfs.create(path);
  5. byte[] readBuf = "Hello World".getBytes("UTF-8");
  6. dos.write(readBuf, 0, readBuf.length);
  7. dos.close();
  8. hdfs.close();
复制代码
文件的打开
上传一个文件到hdfs,一般会调用DistributedFileSystem.create,其实现如下:
  1. public FSDataOutputStream create(Path f, FsPermission permission,boolean overwrite,int bufferSize, short replication, long blockSize,Progressable progress) throws IOException {
  2.     return new FSDataOutputStream
  3.        (dfs.create(getPathName(f), permission,overwrite, replication, blockSize, progress, bufferSize),
  4.         statistics);
  5. }
复制代码
其最终生成一个FSDataOutputStream用于向新生成的文件中写入数据。其成员变量dfs的类型为DFSClient,DFSClient的create函数如下:
  1. public OutputStream create(String src,FsPermission permission,boolean overwrite,short replication,long blockSize,Progressable progress,int buffersize) throws IOException {
  2.     checkOpen();
  3.     if (permission == null) {
  4.       permission = FsPermission.getDefault();
  5.     }
  6.     FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
  7.     OutputStream result = new DFSOutputStream(src, masked,overwrite, replication, blockSize, progress, buffersize,
  8.         conf.getInt("io.bytes.per.checksum", 512));
  9.     leasechecker.put(src, result);
  10.     return result;
  11. }
复制代码
其中构造了一个DFSOutputStream,在其构造函数中,同过RPC调用NameNode的create来创建一个文件。
当然,构造函数中还做了一件重要的事情,就是streamer.start(),也即启动了一个pipeline,用于写数据,在写入数据的过程中,我们会仔细分析。
  1. DFSOutputStream(String src, FsPermission masked, boolean overwrite,short replication, long blockSize, Progressable progress,
  2.                 int buffersize, int bytesPerChecksum) throws IOException {
  3.     this(src, blockSize, progress, bytesPerChecksum);
  4.     computePacketChunkSize(writePacketSize, bytesPerChecksum);
  5.     try {
  6.       namenode.create(src, masked, clientName, overwrite, replication, blockSize);
  7.     } catch(RemoteException re) {
  8.       throw re.unwrapRemoteException(AccessControlException.class,QuotaExceededException.class);
  9.     }
  10.     streamer.start();
  11. }
复制代码
通过rpc调用NameNode的create函数,调用namesystem.startFile函数,其又调用startFileInternal函数,它创建一个新的文件,状态为under construction,没有任何data block与之对应。

dfsclient文件的写入
下面轮到客户端向新创建的文件中写入数据了,一般会使用FSDataOutputStream的write方法:
按照hdfs的设计,对block的数据写入使用的是pipeline的方式,也即将数据分成一个个的package,如果需要复制三分,分别写入DataNode 1, 2, 3,则会进行如下的过程:
  • 首先将package 1写入DataNode 1
  • 然后由DataNode 1负责将package 1写入DataNode 2,同时客户端可以将pacage 2写入DataNode 1
  • 然后DataNode 2负责将package 1写入DataNode 3, 同时客户端可以讲package 3写入DataNode 1,DataNode 1将package 2写入DataNode 2
  • 就这样将一个个package排着队的传递下去,直到所有的数据全部写入并复制完毕
FSDataOutputStream的write方法会调用DFSOutputStream的write方法,而DFSOutputStream继承自FSOutputSummer,所以实际上是调用FSOutputSummer的write方法,如下:
  1. public synchronized void write(byte b[], int off, int len)
  2.   throws IOException {
  3.     //参数检查
  4.     for (int n=0;n<len;n+=write1(b, off+n, len-n)) {
  5.     }
  6.   }
复制代码
FSOutputSummer的write1的方法如下:
  1. private int write1(byte b[], int off, int len) throws IOException {
  2.     if(count==0 && len>=buf.length) {
  3.       // buf初始化的大小是chunk的大小,默认是512,这里的代码会在写入的数据的剩余内容大于或等于一个chunk的大小时调用
  4.       // 这里避免多余一次复制
  5.       final int length = buf.length;
  6.       sum.update(b, off, length);//length是一个完整chunk的大小,默认是512,这里根据一个chunk内容计算校验和
  7.       writeChecksumChunk(b, off, length, false);
  8.       return length;
  9.     }
  10.    
  11.     // buf初始化的大小是chunk的大小,默认是512,这里的代码会在写入的数据的剩余内容小于一个chunk的大小时调用
  12.     // 规避了数组越界问题
  13.     int bytesToCopy = buf.length-count;
  14.     bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
  15.     sum.update(b, off, bytesToCopy);//bytesToCopy不足一个chunk,是写入的内容的最后一个chunk的剩余字节数目
  16.     System.arraycopy(b, off, buf, count, bytesToCopy);
  17.     count += bytesToCopy;
  18.     if (count == buf.length) {//如果不足一个chunk,就缓存到本地buffer,如果还有下一次写入,就填充这个chunk,满一个chunk再flush,count清0
  19.       // local buffer is full
  20.       flushBuffer();//最终调用writeChecksumChunk方法实现
  21.     }
  22.     return bytesToCopy;
  23.   }
复制代码
writeChecksumChunk的实现如下:
  1. //写入一个chunk的数据长度(默认512),忽略len的长度
  2. private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
  3.   throws IOException {
  4.     int tempChecksum = (int)sum.getValue();
  5.     if (!keep) {
  6.       sum.reset();
  7.     }
  8.     int2byte(tempChecksum, checksum);//把当前chunk的校验和从int转换为字节
  9.     writeChunk(b, off, len, checksum);
  10. }
复制代码
writeChunk由子类DFSOutputStream实现,如下:
  1. protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)throws IOException {
  2.       //创建一个package,并写入数据
  3.       currentPacket = new Packet(packetSize, chunksPerPacket,bytesCurBlock);
  4.       currentPacket.writeChecksum(checksum, 0, cklen);
  5.       currentPacket.writeData(b, offset, len);
  6.       currentPacket.numChunks++;
  7.       bytesCurBlock += len;
  8.       //如果此package已满,则放入队列中准备发送
  9.       if (currentPacket.numChunks == currentPacket.maxChunks ||bytesCurBlock == blockSize) {
  10.           ......
  11.           dataQueue.addLast(currentPacket);
  12.           //唤醒等待dataqueue的传输线程,也即DataStreamer
  13.           dataQueue.notifyAll();
  14.           currentPacket = null;
  15.           ......
  16.       }
  17. }
复制代码
writeChunk比较简单,就是把数据填充packet,填充完毕,就放到dataQueue,再唤醒DataStreamer。
DataStreamer完成了数据的传输,DataStreamer的run函数如下:
  1. public void run() {
  2.     while (!closed && clientRunning) {
  3.       Packet one = null;
  4.       synchronized (dataQueue) {
  5.       boolean doSleep = processDatanodeError(hasError, false);//如果ack出错,则处理IO错误
  6.         //如果队列中没有package,则等待
  7.         while ((!closed && !hasError && clientRunning && dataQueue.size() == 0) || doSleep) {
  8.           try {
  9.             dataQueue.wait(1000);
  10.           } catch (InterruptedException  e) {
  11.           }
  12.           doSleep = false;
  13.         }
  14.         try {
  15.           //得到队列中的第一个package
  16.           one = dataQueue.getFirst();
  17.           long offsetInBlock = one.offsetInBlock;
  18.           //由NameNode分配block,并生成一个写入流指向此block
  19.           if (blockStream == null) {
  20.             nodes = nextBlockOutputStream(src);
  21.             response = new ResponseProcessor(nodes);
  22.             response.start();
  23.           }
  24.           ByteBuffer buf = one.getBuffer();
  25.           //将packet从dataQueue移至ackQueue,等待确认
  26.           dataQueue.removeFirst();
  27.           dataQueue.notifyAll();
  28.           synchronized (ackQueue) {
  29.             ackQueue.addLast(one);
  30.             ackQueue.notifyAll();
  31.           }
  32.           //利用生成的写入流将数据写入DataNode中的block
  33.           blockStream.write(buf.array(), buf.position(), buf.remaining());
  34.           if (one.lastPacketInBlock) {
  35.             blockStream.writeInt(0); //表示此block写入完毕
  36.           }
  37.           blockStream.flush();
  38.         } catch (Throwable e) {
  39.         }
  40.         
  41.         if (one.lastPacketInBlock) {
  42.             //数据块写满,做一些清理工作,下次再申请块
  43.             response.close();        // ignore all errors in Response
  44.             
  45.             synchronized (dataQueue) {
  46.               IOUtils.cleanup(LOG, blockStream, blockReplyStream);
  47.               nodes = null;
  48.               response = null;
  49.               blockStream = null;//设置为null,下次就会判断blockStream为null,申请新的块
  50.               blockReplyStream = null;
  51.             }
  52.         }
  53.     }
  54.       ......
  55.   }
复制代码
DataStreamer线程负责把准备好的数据packet,顺序写入到DataNode,未确认写入成功的packet则移动到ackQueue,等待确认。
DataStreamer线程传输数据到DataNode时,要向namenode申请数据块,方法是nextBlockOutputStream,再调用locateFollowingBlock,通过RPC调用namenode.addBlock(src, clientName),在NameNode分配了DataNode和block以后,createBlockOutputStream开始写入数据。
客户端在DataStreamer的run函数中创建了写入流后,调用blockStream.write将packet写入DataNode

DataStreamer还会启动ResponseProcessor线程,它负责接收datanode的ack,当接收到所有datanode对一个packet确认成功的ack,ResponseProcessor从ackQueue中删除相应的packet。在出错时,从ackQueue中移除packet到dataQueue,移除失败的datanode,恢复数据块,建立新的pipeline。实现如下:
  1. public void run() {
  2. ...
  3. PipelineAck ack = new PipelineAck();
  4. while (!closed && clientRunning && !lastPacketInBlock) {
  5.   try {
  6.     // read an ack from the pipeline
  7.     ack.readFields(blockReplyStream);
  8.     ...
  9.     //处理所有DataNode响应的状态
  10.     for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning; i--) {
  11.         short reply = ack.getReply(i);  
  12.       if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {//ack验证,如果DataNode写入packet失败,则出错   
  13.         errorIndex = i; //记录损坏的DataNode,会在processDatanodeError方法移除该失败的DataNode
  14.         throw new IOException("Bad response " + reply + " for block " + block +  " from datanode " + targets[i].getName());   
  15.       }   
  16.     }
  17.     long seqno = ack.getSeqno();
  18.     if (seqno == Packet.HEART_BEAT_SEQNO) {  // 心跳ack,忽略
  19.       continue;
  20.     }
  21.     Packet one = null;
  22.     synchronized (ackQueue) {
  23.       one = ackQueue.getFirst();
  24.     }
  25.     ...
  26.     synchronized (ackQueue) {
  27.       assert ack.getSeqno() == lastAckedSeqno + 1;//验证ack
  28.       lastAckedSeqno = ack.getSeqno();
  29.       ackQueue.removeFirst();//移除确认写入成功的packet
  30.       ackQueue.notifyAll();
  31.     }
  32.   } catch (Exception e) {
  33.     if (!closed) {
  34.       hasError = true;//设置ack错误,让
  35.       ...
  36.       closed = true;
  37.     }
  38.   }
  39. }
  40. }
复制代码
当ResponseProcessor在确认packet失败时,processDatanodeError方法用于处理datanode的错误,当调用返回后需要休眠一段时间时,返回true。下面是其简单的处理流程:
1.关闭blockStream和blockReplyStream
2.将packet从ackQueue移到dataQueue
3.删除坏datanode
4.通过RPC调用datanode的recoverBlock方法来恢复块,如果有错,返回true
5.如果没有可用的datanode,关闭DFSOutputStream和streamer,返回false
6.创建块输出流,如果不成功,转到3
  1. private boolean processDatanodeError(boolean hasError, boolean isAppend) {
  2.   if (!hasError) {//DataNode没有发生错误,直接返回
  3.     return false;
  4.   }
  5.   
  6.   //将未确认写入成功的packets从ack queue移动到data queue的前面
  7.   synchronized (ackQueue) {
  8.     dataQueue.addAll(0, ackQueue);
  9.     ackQueue.clear();
  10.   }
  11.   boolean success = false;
  12.   while (!success && clientRunning) {
  13.     DatanodeInfo[] newnodes = null;
  14.    
  15.     //根据errorIndex确定失败的DataNode,从所有的DataNode nodes移除失败的DataNode,复制到newnodes
  16.     // 通知primary datanode做数据块恢复,更新合适的时间戳
  17.     LocatedBlock newBlock = null;
  18.     ClientDatanodeProtocol primary =  null;
  19.     DatanodeInfo primaryNode = null;
  20.     try {
  21.       // Pick the "least" datanode as the primary datanode to avoid deadlock.
  22.       primaryNode = Collections.min(Arrays.asList(newnodes));
  23.       primary = createClientDatanodeProtocolProxy(primaryNode, conf, block, accessToken, socketTimeout);
  24.       newBlock = primary.recoverBlock(block, isAppend, newnodes);//恢复数据块
  25.     } catch (IOException e) {
  26.         //循环创建块输出流,如果不成功,移除失败的DataNode
  27.           return true;          // 需要休眠
  28.     } finally {
  29.       RPC.stopProxy(primary);
  30.     }
  31.     recoveryErrorCount = 0; // 数据块恢复成功
  32.     block = newBlock.getBlock();
  33.     accessToken = newBlock.getBlockToken();
  34.     nodes = newBlock.getLocations();
  35.     this.hasError = false;
  36.     lastException = null;
  37.     errorIndex = 0;
  38.     success = createBlockOutputStream(nodes, clientName, true);
  39.   }
  40.   response = new ResponseProcessor(nodes);
  41.   response.start();//启动ResponseProcessor做ack确认处理
  42.   return false; // 不休眠,继续处理
  43. }
复制代码
总结
hdfs文件的写入是比较复杂的,所以本文重点介绍了dfsclient端的处理逻辑,对namenode和datanode的响应,以后在做分析。



来自群组: Hadoop技术组

没找到任何评论,期待你打破沉寂

BoyOfChina 发表于 2014-2-23 20:42:45
很详细,很厉害
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条