分享

Hadoop必看:hadoop使用中的几个小细节(一)

xioaxu790 2014-5-20 09:03:53 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 10124
问题导读:
1、hadoop仅仅不支持以什么格式输出中文 ?
2、计算过程中的压缩和效率成什么关系 ?
3、reduce数量究竟设置多少最合适 ?






最近在hadoop实际使用中有以下几个小细节分享:
1 中文问题
    从url中解析出中文,但hadoop中打印出来仍是乱码?我们曾经以为hadoop是不支持中文的,后来经过查看源代码,发现hadoop仅仅是不支持以gbk格式输出中文而己。

    这是TextOutputFormat.class中的代码,hadoop默认的输出都是继承自FileOutputFormat来的,FileOutputFormat的两个子类一个是基于二进制流的输出,一个就是基于文本的输出TextOutputFormat。

  1. public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
  2.   protected static class LineRecordWriter<K, V>
  3.     implements RecordWriter<K, V> {
  4.     private static final String utf8 = “UTF-8″;//这里被写死成了utf-8
  5.     private static final byte[] newline;
  6.     static {
  7.       try {
  8.         newline = “\n”.getBytes(utf8);
  9.       } catch (UnsupportedEncodingException uee) {
  10.         throw new IllegalArgumentException(“can’t find ” + utf8 + ” encoding”);
  11.       }
  12.     }
  13.     public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
  14.       this.out = out;
  15.       try {
  16.         this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
  17.       } catch (UnsupportedEncodingException uee) {
  18.         throw new IllegalArgumentException(“can’t find ” + utf8 + ” encoding”);
  19.       }
  20.     }
  21.     private void writeObject(Object o) throws IOException {
  22.       if (o instanceof Text) {
  23.         Text to = (Text) o;
  24.         out.write(to.getBytes(), 0, to.getLength());//这里也需要修改
  25.       } else {
  26.         out.write(o.toString().getBytes(utf8));
  27.       }
  28.     }
  29. }
复制代码

可以看出hadoop默认的输出写死为utf-8,因此如果decode中文正确,那么将Linux客户端的character设为utf-8是可以看到中文的。因为hadoop用utf-8的格式输出了中文。
    因为大多数数据库是用gbk来定义字段的,如果想让hadoop用gbk格式输出中文以兼容数据库怎么办?
    我们可以定义一个新的类:

  1. public class GbkOutputFormat<K, V> extends FileOutputFormat<K, V> {
  2.   protected static class LineRecordWriter<K, V>
  3.     implements RecordWriter<K, V> {
  4. //写成gbk即可
  5.     private static final String gbk = “gbk”;
  6.     private static final byte[] newline;
  7.     static {
  8.       try {
  9.         newline = “\n”.getBytes(gbk);
  10.       } catch (UnsupportedEncodingException uee) {
  11.         throw new IllegalArgumentException(“can’t find ” + gbk + ” encoding”);
  12.       }
  13.     }
  14.     public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
  15.       this.out = out;
  16.       try {
  17.         this.keyValueSeparator = keyValueSeparator.getBytes(gbk);
  18.       } catch (UnsupportedEncodingException uee) {
  19.         throw new IllegalArgumentException(“can’t find ” + gbk + ” encoding”);
  20.       }
  21.     }
  22.     private void writeObject(Object o) throws IOException {
  23.       if (o instanceof Text) {
  24. //        Text to = (Text) o;
  25. //        out.write(to.getBytes(), 0, to.getLength());
  26. //      } else {
  27.         out.write(o.toString().getBytes(gbk));
  28.       }
  29.     }
  30. }
复制代码

  然后在mapreduce代码中加入conf1.setOutputFormat(GbkOutputFormat.class)
    即可以gbk格式输出中文。



2 关于计算过程中的压缩和效率的对比问题
    之前曾经介绍过对输入文件采用压缩可以提高部分计算效率。现在作更进一步的说明。
    为什么压缩会提高计算速度?这是因为mapreduce计算会将数据文件分散拷贝到所有datanode上,压缩可以减少数据浪费在带宽上的时间,当这些时间大于压缩/解压缩本身的时间时,计算速度就会提高了。
    hadoop的压缩除了将输入文件进行压缩外,hadoop本身还可以在计算过程中将map输出以及将reduce输出进行压缩。这种计算当中的压缩又有什么样的效果呢?
    测试环境:35台节点的hadoop cluster,单机2 CPU,8 core,8G内存,redhat 2.6.9, 其中namenode和second namenode各一台,namenode和second namenode不作datanode
    输入文件大小为2.5G不压缩,records约为3600万条。mapreduce程序分为两个job:
    job1:map将record按user字段作key拆分,reduce中作外连接。这样最后reduce输出为87亿records,大小540G
    job2:map读入这87亿条数据并输出,reduce进行简单统计,最后的records为2.5亿条,大小16G
    计算耗时54min

    仅对第二个阶段的map作压缩(第一个阶段的map输出并不大,没有压缩的必要),测试结果:计算耗时39min

    可见时间上节约了15min,注意以下参数的不同。
    不压缩时:
     Local bytes read=1923047905109
     Local bytes written=1685607947227
     压缩时:
     Local bytes read=770579526349
     Local bytes written=245469534966
     本地读写的的数量大大降低了

     至于对reduce输出的压缩,很遗憾经过测试基本没有提高速度的效果。可能是因为第一个job的输出大多数是在本地机上进行map,不经过网络传输的原因。
     附:对map输出进行压缩,只需要添加jobConf.setMapOutputCompressorClass(DefaultCodec.class)

3 关于reduce的数量设置问题
    reduce数量究竟多少是适合的。目前测试认为reduce数量约等于cluster中datanode的总cores的一半比较合适,比如cluster中有32台datanode,每台8 core,那么reduce设置为128速度最快。因为每台机器8 core,4个作map,4个作reduce计算,正好合适。
    附小测试:对同一个程序
            reduce num=32,reduce time = 6 min
            reduce num=128, reduce time = 2 min
            reduce num=320, reduce time = 5min







没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条