分享

什么情况下,Hive 只会产生一个reduce任务,没有maptask



问题导读:

1、mapreducer任务是怎样执行的?
2、mapTask的职责是什么?
3、hive自己的mapreducer是怎样执行的?



我们常规使用的mapreducer任务执行过程大致如下图:
640.jpeg

appmaster通过某种策略计算数据源可以做多少分片(getSplits方法),对应的生成固定数量的maptask,假如存在shuffle的话,就根据默认或者指定的reducer数,将数据分散给特定数量的reducer。不存在shuffle的话,reducer可以为零的。

正常逻辑:

mapTask的职责就是负责读数据,做ETL,也可以利用combiner局部聚合;ReduceTask输入严重依赖于mapper输出,所以‘一直’的逻辑仅有reducer无法执行的。

没有maptask,仅有一个reducerTask的hive任务。有点违背我们的使用常识了哦。

其实,正常使用的情况下,hive的sql模式执行引擎还是主要依赖于hadoop的mapreduce计算框架。

但是仅仅依赖于sql,然后依赖于hadoop的mapreduce,显然不能满足hive的野心的。所以hive想提供一个单独的可以使用java编写的hive自己的map/reduce的框架。

你在hive搜索reducer可以直接找到hive自己的reducer。

hql常规解析后依赖的mapreduce主要还是ExecReducer/ExecMapper--通过实现hadoop的mapper和reducer来实现具体执行逻辑。

hive自己的mapreducer有一个实现案例,就是GenericMR,实现源码如下:

  1. public final class GenericMR {
  2.   public void map(final InputStream in, final OutputStream out,
  3.       final Mapper mapper) throws Exception {
  4.     map(new InputStreamReader(in), new OutputStreamWriter(out), mapper);
  5.   }
  6.   public void map(final Reader in, final Writer out, final Mapper mapper) throws Exception {
  7.     handle(in, out, new RecordProcessor() {
  8.       @Override
  9.       public void processNext(RecordReader reader, Output output) throws Exception {
  10.         mapper.map(reader.next(), output);
  11.       }
  12.     });
  13.   }
  14.   public void reduce(final InputStream in, final OutputStream out,
  15.       final Reducer reducer) throws Exception {
  16.     reduce(new InputStreamReader(in), new OutputStreamWriter(out), reducer);
  17.   }
  18.   public void reduce(final Reader in, final Writer out, final Reducer reducer) throws Exception {
  19.     handle(in, out, new RecordProcessor() {
  20.       @Override
  21.       public void processNext(RecordReader reader, Output output) throws Exception {
  22.         reducer.reduce(reader.peek()[0], new KeyRecordIterator(
  23.             reader.peek()[0], reader), output);
  24.       }
  25.     });
  26.   }
  27.   private void handle(final Reader in, final Writer out,
  28.       final RecordProcessor processor) throws Exception {
  29.     final RecordReader reader = new RecordReader(in);
  30.     final OutputStreamOutput output = new OutputStreamOutput(out);
  31.     try {
  32.       while (reader.hasNext()) {
  33.         processor.processNext(reader, output);
  34.       }
  35.     } finally {
  36.       try {
  37.         output.close();
  38.       } finally {
  39.         reader.close();
  40.       }
  41.     }
  42.   }
  43.   private static interface RecordProcessor {
  44.     void processNext(final RecordReader reader, final Output output) throws Exception;
  45.   }
  46.   private static final class KeyRecordIterator implements Iterator<String[]> {
  47.     private final String key;
  48.     private final RecordReader reader;
  49.     private KeyRecordIterator(final String key, final RecordReader reader) {
  50.       this.key = key;
  51.       this.reader = reader;
  52.     }
  53.     @Override
  54.     public boolean hasNext() {
  55.       return (reader.hasNext() && key.equals(reader.peek()[0]));
  56.     }
  57.     @Override
  58.     public String[] next() {
  59.       if (!hasNext()) {
  60.         throw new NoSuchElementException();
  61.       }
  62.       return reader.next();
  63.     }
  64.     @Override
  65.     public void remove() {
  66.       throw new UnsupportedOperationException();
  67.     }
  68.   }
  69.   private static final class RecordReader {
  70.     private final BufferedReader reader;
  71.     private String[] next;
  72.     private RecordReader(final InputStream in) {
  73.       this(new InputStreamReader(in));
  74.     }
  75.     private RecordReader(final Reader in) {
  76.       reader = new BufferedReader(in);
  77.       next = readNext();
  78.     }
  79.     private String[] next() {
  80.       final String[] ret = next;
  81.       next = readNext();
  82.       return ret;
  83.     }
  84.     private String[] readNext() {
  85.       try {
  86.         final String line = reader.readLine();
  87.         return (line == null ? null : line.split("\t"));
  88.       } catch (final Exception e) {
  89.         throw new RuntimeException(e);
  90.       }
  91.     }
  92.     private boolean hasNext() {
  93.       return next != null;
  94.     }
  95.     private String[] peek() {
  96.       return next;
  97.     }
  98.     private void close() throws Exception {
  99.       reader.close();
  100.       
  101.     }
  102.   }
  103.   private static final class OutputStreamOutput implements Output {
  104.     private final PrintWriter out;
  105.     private OutputStreamOutput(final OutputStream out) {
  106.       this(new OutputStreamWriter(out));
  107.     }
  108.     private OutputStreamOutput(final Writer out) {
  109.       this.out = new PrintWriter(out);
  110.     }
  111.     public void close() throws Exception {
  112.       out.close();
  113.     }
  114.     @Override
  115.     public void collect(String[] record) throws Exception {
  116.       out.println(_join(record, "\t"));
  117.     }
  118.     private static String _join(final String[] record, final String separator) {
  119.       if (record == null || record.length == 0) {
  120.         return "";
  121.       }
  122.       final StringBuilder sb = new StringBuilder();
  123.       for (int i = 0; i < record.length; i++) {
  124.         if (i > 0) {
  125.           sb.append(separator);
  126.         }
  127.         sb.append(record[i]);
  128.       }
  129.       return sb.toString();
  130.     }
  131.   }
  132. }
复制代码

重点:

常规的mapreducer的reducer输入依赖于mapper的输出的,所以无法单独执行。但是GenericMR的实现reducer里可以直接支持InputStream/Reader,所以就可以直接生成java的指定输入流或者reader即可。
  1. new GenericMR().reduce(new StringReader("a\tb\tc"), new StringWriter(),
  2.           new Reducer() {
  3.         public void reduce(String key, Iterator<String[]> records,
  4.             Output output) throws Exception {
  5.           while (true) {
  6.             records.next();
  7.           }
  8.         }
  9. });
复制代码

这个问题确实很另类,浪尖估计是被面试者简历写了精通hive源码,才被大佬由此一问。要不正常使用者,不会注意到这个框架。

不过这个也给大家提个醒,关注框架使用的同时,也要关注框架的历史及发展。

总结一下就是:hive野心很大,不想仅仅限于hql,想提供一个单独的可以用java编写的hive自己的map/reduce计算框架。




最新经典文章,欢迎关注公众号



---------------------







没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条