分享

hadoop MapReduce主程序中文乱码解决

hyj 发表于 2014-4-13 15:53:04 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 32468
hadoop涉及输出文本的默认输出编码统一用没有BOM的UTF-8的形式,但是对于中文的输出window系统默认的是GBK,有些格式文件例如CSV格式的文件用excel打开输出编码为没有BOM的UTF-8文件时,输出的结果为乱码,只能由UE或者记事本打开才能正常显示。因此将hadoop默认输出编码更改为GBK成为非常常见的需求。
      默认的情况下MR主程序中,设定输出编码的设置语句为:
  1. job.setOutputFormatClass(TextOutputFormat.class);
复制代码
  1. TextOutputFormat.class
复制代码

的代码如下:
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements.  See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership.  The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License.  You may obtain a copy of the License at
  9. *
  10. *     http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.hadoop.mapreduce.lib.output;
  19. import java.io.DataOutputStream;
  20. import java.io.IOException;
  21. import java.io.UnsupportedEncodingException;
  22. import org.apache.hadoop.classification.InterfaceAudience;
  23. import org.apache.hadoop.classification.InterfaceStability;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.fs.FileSystem;
  26. import org.apache.hadoop.fs.Path;
  27. import org.apache.hadoop.fs.FSDataOutputStream;
  28. import org.apache.hadoop.io.NullWritable;
  29. import org.apache.hadoop.io.Text;
  30. import org.apache.hadoop.io.compress.CompressionCodec;
  31. import org.apache.hadoop.io.compress.GzipCodec;
  32. import org.apache.hadoop.mapreduce.OutputFormat;
  33. import org.apache.hadoop.mapreduce.RecordWriter;
  34. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  35. import org.apache.hadoop.util.*;
  36. /** An {@link OutputFormat} that writes plain text files. */
  37. @InterfaceAudience.Public
  38. @InterfaceStability.Stable
  39. public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
  40.   public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
  41.   protected static class LineRecordWriter<K, V>
  42.     extends RecordWriter<K, V> {
  43.     private static final String utf8 = "UTF-8";  // 将UTF-8转换成GBK
  44.     private static final byte[] newline;
  45.     static {
  46.       try {
  47.         newline = "\n".getBytes(utf8);
  48.       } catch (UnsupportedEncodingException uee) {
  49.         throw new IllegalArgumentException("can't find " + utf8 + " encoding");
  50.       }
  51.     }
  52.     protected DataOutputStream out;
  53.     private final byte[] keyValueSeparator;
  54.     public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
  55.       this.out = out;
  56.       try {
  57.         this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
  58.       } catch (UnsupportedEncodingException uee) {
  59.         throw new IllegalArgumentException("can't find " + utf8 + " encoding");
  60.       }
  61.     }
  62.     public LineRecordWriter(DataOutputStream out) {
  63.       this(out, "\t");
  64.     }
  65.     /**
  66.      * Write the object to the byte stream, handling Text as a special
  67.      * case.
  68.      * @param o the object to print
  69.      * @throws IOException if the write throws, we pass it on
  70.      */
  71.     private void writeObject(Object o) throws IOException {
  72.       if (o instanceof Text) {
  73.         Text to = (Text) o;   // 将此行代码注释掉
  74.         out.write(to.getBytes(), 0, to.getLength());  // 将此行代码注释掉
  75.       } else { // 将此行代码注释掉      
  76.         out.write(o.toString().getBytes(utf8));
  77.       }
  78.     }
  79.     public synchronized void write(K key, V value)
  80.       throws IOException {
  81.       boolean nullKey = key == null || key instanceof NullWritable;
  82.       boolean nullValue = value == null || value instanceof NullWritable;
  83.       if (nullKey && nullValue) {
  84.         return;
  85.       }
  86.       if (!nullKey) {
  87.         writeObject(key);
  88.       }
  89.       if (!(nullKey || nullValue)) {
  90.         out.write(keyValueSeparator);
  91.       }
  92.       if (!nullValue) {
  93.         writeObject(value);
  94.       }
  95.       out.write(newline);
  96.     }
  97.     public synchronized
  98.     void close(TaskAttemptContext context) throws IOException {
  99.       out.close();
  100.     }
  101.   }
  102.   public RecordWriter<K, V>
  103.          getRecordWriter(TaskAttemptContext job
  104.                          ) throws IOException, InterruptedException {
  105.     Configuration conf = job.getConfiguration();
  106.     boolean isCompressed = getCompressOutput(job);
  107.     String keyValueSeparator= conf.get(SEPERATOR, "\t");
  108.     CompressionCodec codec = null;
  109.     String extension = "";
  110.     if (isCompressed) {
  111.       Class<? extends CompressionCodec> codecClass =
  112.         getOutputCompressorClass(job, GzipCodec.class);
  113.       codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
  114.       extension = codec.getDefaultExtension();
  115.     }
  116.     Path file = getDefaultWorkFile(job, extension);
  117.     FileSystem fs = file.getFileSystem(conf);
  118.     if (!isCompressed) {
  119.       FSDataOutputStream fileOut = fs.create(file, false);
  120.       return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
  121.     } else {
  122.       FSDataOutputStream fileOut = fs.create(file, false);
  123.       return new LineRecordWriter<K, V>(new DataOutputStream
  124.                                         (codec.createOutputStream(fileOut)),
  125.                                         keyValueSeparator);
  126.     }
  127.   }
  128. }
复制代码

从上述代码的第48行可以看出hadoop已经限定此输出格式统一为UTF-8,因此为了改变hadoop的输出代码的文本编码只需定义一个和TextOutputFormat相同的类GbkOutputFormat同样继承FileOutputFormat(注意是org.apache.hadoop.mapreduce.lib.output.FileOutputFormat)即可,如下代码:
  1. import java.io.DataOutputStream;
  2. import java.io.IOException;
  3. import java.io.UnsupportedEncodingException;
  4. import org.apache.hadoop.classification.InterfaceAudience;
  5. import org.apache.hadoop.classification.InterfaceStability;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.FileSystem;
  8. import org.apache.hadoop.fs.Path;
  9. import org.apache.hadoop.fs.FSDataOutputStream;
  10. import org.apache.hadoop.io.NullWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.io.compress.CompressionCodec;
  13. import org.apache.hadoop.io.compress.GzipCodec;
  14. import org.apache.hadoop.mapreduce.OutputFormat;
  15. import org.apache.hadoop.mapreduce.RecordWriter;
  16. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  18. import org.apache.hadoop.util.*;
  19. @InterfaceAudience.Public
  20. @InterfaceStability.Stable
  21. public class GbkOutputFormat<K, V> extends FileOutputFormat<K, V> {
  22.   public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
  23.   protected static class LineRecordWriter<K, V>
  24.     extends RecordWriter<K, V> {
  25.     private static final String utf8 = "GBK";
  26.     private static final byte[] newline;
  27.     static {
  28.       try {
  29.         newline = "\n".getBytes(utf8);
  30.       } catch (UnsupportedEncodingException uee) {
  31.         throw new IllegalArgumentException("can't find " + utf8 + " encoding");
  32.       }
  33.     }
  34.     protected DataOutputStream out;
  35.     private final byte[] keyValueSeparator;
  36.     public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
  37.       this.out = out;
  38.       try {
  39.         this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
  40.       } catch (UnsupportedEncodingException uee) {
  41.         throw new IllegalArgumentException("can't find " + utf8 + " encoding");
  42.       }
  43.     }
  44.     public LineRecordWriter(DataOutputStream out) {
  45.       this(out, "\t");
  46.     }
  47.     /**
  48.      * Write the object to the byte stream, handling Text as a special
  49.      * case.
  50.      * @param o the object to print
  51.      * @throws IOException if the write throws, we pass it on
  52.      */
  53.     private void writeObject(Object o) throws IOException {
  54.       if (o instanceof Text) {
  55. //        Text to = (Text) o;
  56. //        out.write(to.getBytes(), 0, to.getLength());
  57. //      } else {
  58.         out.write(o.toString().getBytes(utf8));
  59.       }
  60.     }
  61.     public synchronized void write(K key, V value)
  62.       throws IOException {
  63.       boolean nullKey = key == null || key instanceof NullWritable;
  64.       boolean nullValue = value == null || value instanceof NullWritable;
  65.       if (nullKey && nullValue) {
  66.         return;
  67.       }
  68.       if (!nullKey) {
  69.         writeObject(key);
  70.       }
  71.       if (!(nullKey || nullValue)) {
  72.         out.write(keyValueSeparator);
  73.       }
  74.       if (!nullValue) {
  75.         writeObject(value);
  76.       }
  77.       out.write(newline);
  78.     }
  79.     public synchronized
  80.     void close(TaskAttemptContext context) throws IOException {
  81.       out.close();
  82.     }
  83.   }
  84.   public RecordWriter<K, V>
  85.          getRecordWriter(TaskAttemptContext job
  86.                          ) throws IOException, InterruptedException {
  87.     Configuration conf = job.getConfiguration();
  88.     boolean isCompressed = getCompressOutput(job);
  89.     String keyValueSeparator= conf.get(SEPERATOR, "\t");
  90.     CompressionCodec codec = null;
  91.     String extension = "";
  92.     if (isCompressed) {
  93.       Class<? extends CompressionCodec> codecClass =
  94.         getOutputCompressorClass(job, GzipCodec.class);
  95.       codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
  96.       extension = codec.getDefaultExtension();
  97.     }
  98.     Path file = getDefaultWorkFile(job, extension);
  99.     FileSystem fs = file.getFileSystem(conf);
  100.     if (!isCompressed) {
  101.       FSDataOutputStream fileOut = fs.create(file, false);
  102.       return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
  103.     } else {
  104.       FSDataOutputStream fileOut = fs.create(file, false);
  105.       return new LineRecordWriter<K, V>(new DataOutputStream
  106.                                         (codec.createOutputStream(fileOut)),
  107.                                         keyValueSeparator);
  108.     }
  109.   }
  110. }
复制代码
最后将输出编码类型设置成GbkOutputFormat.class,如:
  1. job.setOutputFormatClass(GbkOutputFormat.class);
复制代码





参考:
  1. http://semantic.iteye.com/blog/1846238
复制代码




没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条