本帖最后由 xuanxufeng 于 2016-5-8 17:11 编辑
问题导读
1.mapreduce处理文件是否按行分割?
2.如何事先自定义分割文件?
3.哪个函数实现了分割文件?
我一直有这样的偏执,认为,如果要想做要一个事情学好一样东西,一定要知道这个东西的原理,不知道本质的东西,是很难使用好一个工具,hadoop是个工具,分布式工具,和其他的工具一样也不一样,这里不打算介绍太多的原理,只是阐明我的态度,原理部分打算另外启动一个系列来进行阐述。 在阐述本文想要解决的问题之前,我们先来回忆一下有这样的先验知识,默认的文件分割是按照“行”的概念来进行文件切分的,hadoop coding中map函数中处理的K是行的偏移值,V是行的数据(对应的class是LineRecordReader.class)。那么,问题来了,如果我不想按照行来进行数据切分,想要按照指定的分隔符来进行分割,那怎么做呢?比如,如下这种case? <doc>
a=a1
b=b1
...
</doc>
<doc>
a=a2
b=b2
...
</doc>
<doc>
a=a3
b=b3
...
</doc>
<doc>
...
</doc> 从文件的格式可以看的很清楚,我想要map处理一次数据是一个doc,我想要的分隔符貌似有两个<doc>开始,</doc>结束。如果这类问题能够能够解决,那么推广开去,以单个分隔符分割的诉求也是可以解决的了。 下面会提供一些技术方案来实现,重点阐述方案二 方案1:将每个doc变成行数据,那么自然可以沿用默认的行处理的方式即可,这个方案的难点在将每一个doc数据变成一行数据,这可能需要其他的工具来进行辅助,大家可以去思考怎么实现 方案2:自定义Inputformat,RecordReader,LineReader来进行实现,要想知道自己怎么来动工,那么先要了解默认的是咋弄的,好吧,let's Go! 首先要从JobConf说起,要跑得动一个hadoop程序,需要说明一些相关的配置来告诉hadoop计算框架现在来的是一号神马人物,那么常见的一些配置有,输入的文件格式啊,map,reduce的处理class啊,程序的输入/输出地址等,其中有个配置是和我们这里谈的比较相关的,就是输入文件的配置 ,coding时一般是这样写的:
jobConf.setInputFormat(xxxInputFormat.class); 这个默认的xxxInputFormat就是TextInputFormat(源码如图1) 可以看出这个类其实是个代理类,其中画粗线的方法是真正决定实现的关键点,getRecordReader函数,返回是一个RecordReader对象,每调用一次这个函数,即返回一个k,v作为map函数的k,v输入!
图1 TextInputFormat源码 那么,RecordReader是我们要探寻的第二个目的地,这是一个接口类,outline如图2所示,可以看出这是一个迭代类,重点的方法是next函数,那么我们再回到这个默认的RecordReader实现类,LineRecordReader去一探究竟吧(如图三所示),果然没有辜负我的期望,看看画线的那一块吧,就是最终的答案, 每调用一次next,即调用了一次in.readline函数,value作为入参,调用结束后,赋值满满的返回!
图2,RecordReader的outline
图3 LineRecordReader部分源码 这个in到底是什么呢?private LineReader in; 哈哈,出来了,LineReader ,这个背后的决策者终于被找出来了,那赶紧来看看咋实现的吧,如图四所示,看到标志性的建筑了吧,“\n\r”,good,这个函数就是我们要找的那个!
图4 readline函数部分源码 好了,到这里我们来总结一下刚才的发现之旅,从jobconf知道了分割的格式处理工具InputFormat,找从InputFormat到了hadoop默认的TextInputFormat,再从TextInputFormat找到了K,V对象RecordReader,再从RecordReader找打行K,V对象LineRecordReader,最后找到了LineReader:负责对文件分割的真正的决策者!那么,有了这样的认识,自然知道,我们首先该做的是建立一个DocReader用于分割,这次我将代码贴了出来: private byte[] sp = { '<', '/', 'd', 'o', 'c', '>' };
private byte[] A = { '\\','A'};
Text strBk ;
public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
str.clear();
boolean hadSpliter = false;
boolean hitEndOfFile = false;
int startPosn = bufferPosn;
long bytesConsumed = 0;
// get a split content
outerLoop: while (true) {
startPosn = bufferPosn; if (bufferPosn >= bufferLength) {
if (!backfill()) {
hitEndOfFile = true;
break;
}
} startPosn = bufferPosn;
for (; bufferPosn < bufferLength; ++bufferPosn) {
if (bufferPosn+sp.length <= bufferLength && checkSpliter(bufferPosn, sp)) {
hadSpliter = true;
bufferPosn += sp.length;
break outerLoop;
}
} bytesConsumed += bufferPosn - startPosn;
if (bytesConsumed >= maxBytesToConsume) {
return (int) Math.min(bytesConsumed, (long) Integer.MAX_VALUE);
}
}
if (!hitEndOfFile) {
bytesConsumed += bufferPosn - startPosn;
int length = bufferPosn - startPosn + (hadSpliter ? 1 : 0);
length = (int) Math.min(length, maxLineLength - str.getLength());
if (length > 0) {
if(strBk!=null)
{
str.set(strBk);
strBk=null;
}
try {
str.append(buffer, startPosn, length);
} catch (Exception e) {
System.out.println("startPosn:"+startPosn+" length:"+length+ " bufferLength:"+bufferLength);
str.append(buffer, startPosn, length-1);
str.append(A,0,A.length);
}
int pos = startPosn+length;
if(pos<bufferLength){
strBk = new Text();
strBk.append(buffer, pos , bufferLength-pos);
}
}
}
return (int) Math.min(bytesConsumed, (long) Integer.MAX_VALUE);
} private boolean checkSpliter(int bufferPosn, byte[] sp) {
for (int i = 0; i < sp.length; i++) {
if (buffer[bufferPosn + i] != sp)
return false;
}
return true;
} 对于这个代码的理解,是需要建立在对于LineReader代码的理解基础上,我尝试着梳理一下实现这个代码的一些思想(需要读者在结合LineReader的源码上来理解),首先,我们要知道这类XXXReader是对输入的文件流来做处理,因此有一个对象来代表输入的文件流,这个就是private InputStream in;那么,每触发调用一次readLine函数,其实是从inputstream中找出想要的那一部分来进行截断再输出,那么想要的那一部分可以是line的分割符\n\r,也可以是doc的分割符!因为stream流可能会很长,因此我们处理的对象其实是buffer流,buffer流是用于保存一小段inputstream的缓存,每次buffer流遍历完后,即会清空,加载进另外一小段inputstream,知道stream的末端为止。(buffer的大小可以通过配置io.file.buffer.size来进行控制的,默认为2m) 因此这段代码的核心思想是,首先从当前的buffer流中找到</doc>分隔符(checkSpliter(bufferPosn, sp)函数),并记录下他的位置 (bufferPosn += sp.length;),再往下即是拼凑一个完整的doc,因为如图5所示,buffer的开始处<buffer-start>不是<doc>时,只用一个buffer是无法还原一个doc的,因此需要一个变量strBk记录上一次被buffer截断时的信息(strBk.append(buffer, pos , bufferLength-pos);) 最后,由strBk与当前的buffer[0,pos]拼凑起来的即是一个完整的doc了。
图5 buffer流工作示意图 到这里,我们似乎解决了如何获取一个doc的问题,但是背后隐藏着另外一个问题,就是如果分割的时候buffer的开始位置刚好是处于</doc>中的某一个字节的话,那么会怎样呢?buffer开始位置,即意味着上一个buffer的结束位置,因此也即是说,上一个buffer中是没有找到</doc>的,那么上一个buffer就会被miss掉,这样会引起数据的丢失,如果在一个数据不可缺失的场景这样的情况发生可不妙了,因此这个方法需要有改进,前面其实有暗示到,buffer流截断了</doc>,那么如果每次判断一下上一次的buffer末尾与这一次的buffer开始,那这个问题不就可以解决了吗?另外还有一个解决的trick是放大buffer的大小,使得buffer一定是大于doc的,那么会大大的降低这种情况出现的可能行,注意,这只是降低,不是完全避免。
先介绍一下思路,上次的缺陷在于分割时如果刚好分开了分隔符,那么按照原来的算法,必然会造成数据缺少,本次升级的思路是将上一次的buffer进行全部保存,如果当数据开始新的buffer时,将原来的buffer赋予,请看代码,核心思想在红色字体的部分
private byte[] sp_e = { '<', '/', 'd', 'o', 'c', '>','\01'};
Text strBk ;
public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
str.clear();
boolean hitEndOfFile = false;
int startPosn = bufferPosn;
long bytesConsumed = 0;
// get a split content
outerLoop: while (true) {
startPosn = bufferPosn; if (bufferPosn >= bufferLength) {
if (!backfill()) {
hitEndOfFile = true;
break;
}
} if (strBk != null && bufferPosn==0) { //当开始读取一轮新的buffer数据时,将历史的buffer数据append给当前的buffer作为最新的buffer
strBk.append(buffer, 0, buffer.length);
buffer = strBk.getBytes();
bufferLength = buffer.length;
strBk = null;
}
startPosn = bufferPosn;
strBk = new Text();
for (; bufferPosn < bufferLength; ++bufferPosn) {
if (bufferPosn + sp_e.length <= bufferLength && checkSpliter(bufferPosn, sp_e)) {
bufferPosn += sp_e.length;
break outerLoop;
}
strBk.append(buffer, bufferPosn, 1); //循环时,将当前的所有buffer进行保存,以用于数据完整
} }
//只有找到分隔符</doc>才会到这里
if (!hitEndOfFile) {
bytesConsumed += bufferPosn - startPosn;
int length = bufferPosn - startPosn;
length = (int) Math.min(length, maxLineLength - str.getLength());
if (length > 0) {
try {
str.append(buffer, startPosn, length);
String s = str.toString();
LOG.info("find one:"+s.substring(0, 100)+s.substring(s.length()-10, s.length()));
} catch (Exception e) {
System.err.println("startPosn:"+startPosn+" length:"+length+ " bufferLength:"+bufferLength);
}
int pos = bufferPosn;
if(pos<bufferLength){ //将当前buffer剩余的buffer保存起来
strBk = new Text();
strBk.append(buffer, pos , bufferLength-pos);
}
}
}
return (int) Math.min(bytesConsumed, (long) Integer.MAX_VALUE);
}
|