问题导读:
1、mapreducer任务是怎样执行的?
2、mapTask的职责是什么?
3、hive自己的mapreducer是怎样执行的?
我们常规使用的mapreducer任务执行过程大致如下图:
appmaster通过某种策略计算数据源可以做多少分片(getSplits方法),对应的生成固定数量的maptask,假如存在shuffle的话,就根据默认或者指定的reducer数,将数据分散给特定数量的reducer。不存在shuffle的话,reducer可以为零的。
正常逻辑:
mapTask的职责就是负责读数据,做ETL,也可以利用combiner局部聚合;ReduceTask输入严重依赖于mapper输出,所以‘一直’的逻辑仅有reducer无法执行的。
没有maptask,仅有一个reducerTask的hive任务。有点违背我们的使用常识了哦。
其实,正常使用的情况下,hive的sql模式执行引擎还是主要依赖于hadoop的mapreduce计算框架。
但是仅仅依赖于sql,然后依赖于hadoop的mapreduce,显然不能满足hive的野心的。所以hive想提供一个单独的可以使用java编写的hive自己的map/reduce的框架。
你在hive搜索reducer可以直接找到hive自己的reducer。
hql常规解析后依赖的mapreduce主要还是ExecReducer/ExecMapper--通过实现hadoop的mapper和reducer来实现具体执行逻辑。
hive自己的mapreducer有一个实现案例,就是GenericMR,实现源码如下:
- public final class GenericMR {
- public void map(final InputStream in, final OutputStream out,
- final Mapper mapper) throws Exception {
- map(new InputStreamReader(in), new OutputStreamWriter(out), mapper);
- }
-
- public void map(final Reader in, final Writer out, final Mapper mapper) throws Exception {
- handle(in, out, new RecordProcessor() {
- @Override
- public void processNext(RecordReader reader, Output output) throws Exception {
- mapper.map(reader.next(), output);
- }
- });
- }
-
- public void reduce(final InputStream in, final OutputStream out,
- final Reducer reducer) throws Exception {
- reduce(new InputStreamReader(in), new OutputStreamWriter(out), reducer);
- }
-
- public void reduce(final Reader in, final Writer out, final Reducer reducer) throws Exception {
- handle(in, out, new RecordProcessor() {
- @Override
- public void processNext(RecordReader reader, Output output) throws Exception {
- reducer.reduce(reader.peek()[0], new KeyRecordIterator(
- reader.peek()[0], reader), output);
- }
- });
- }
-
- private void handle(final Reader in, final Writer out,
- final RecordProcessor processor) throws Exception {
- final RecordReader reader = new RecordReader(in);
- final OutputStreamOutput output = new OutputStreamOutput(out);
-
- try {
- while (reader.hasNext()) {
- processor.processNext(reader, output);
- }
- } finally {
- try {
- output.close();
- } finally {
- reader.close();
- }
- }
- }
-
- private static interface RecordProcessor {
- void processNext(final RecordReader reader, final Output output) throws Exception;
- }
-
- private static final class KeyRecordIterator implements Iterator<String[]> {
- private final String key;
- private final RecordReader reader;
-
- private KeyRecordIterator(final String key, final RecordReader reader) {
- this.key = key;
- this.reader = reader;
- }
-
- @Override
- public boolean hasNext() {
- return (reader.hasNext() && key.equals(reader.peek()[0]));
- }
-
- @Override
- public String[] next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
-
- return reader.next();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
- private static final class RecordReader {
- private final BufferedReader reader;
- private String[] next;
-
- private RecordReader(final InputStream in) {
- this(new InputStreamReader(in));
- }
-
- private RecordReader(final Reader in) {
- reader = new BufferedReader(in);
- next = readNext();
- }
-
- private String[] next() {
- final String[] ret = next;
-
- next = readNext();
-
- return ret;
- }
-
- private String[] readNext() {
- try {
- final String line = reader.readLine();
- return (line == null ? null : line.split("\t"));
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private boolean hasNext() {
- return next != null;
- }
-
- private String[] peek() {
- return next;
- }
-
- private void close() throws Exception {
- reader.close();
-
- }
- }
-
- private static final class OutputStreamOutput implements Output {
- private final PrintWriter out;
-
- private OutputStreamOutput(final OutputStream out) {
- this(new OutputStreamWriter(out));
- }
-
- private OutputStreamOutput(final Writer out) {
- this.out = new PrintWriter(out);
- }
-
- public void close() throws Exception {
- out.close();
- }
-
- @Override
- public void collect(String[] record) throws Exception {
- out.println(_join(record, "\t"));
- }
-
- private static String _join(final String[] record, final String separator) {
- if (record == null || record.length == 0) {
- return "";
- }
- final StringBuilder sb = new StringBuilder();
- for (int i = 0; i < record.length; i++) {
- if (i > 0) {
- sb.append(separator);
- }
- sb.append(record[i]);
- }
- return sb.toString();
- }
- }
- }
复制代码
重点:
常规的mapreducer的reducer输入依赖于mapper的输出的,所以无法单独执行。但是GenericMR的实现reducer里可以直接支持InputStream/Reader,所以就可以直接生成java的指定输入流或者reader即可。
- new GenericMR().reduce(new StringReader("a\tb\tc"), new StringWriter(),
- new Reducer() {
- public void reduce(String key, Iterator<String[]> records,
- Output output) throws Exception {
- while (true) {
- records.next();
- }
- }
- });
复制代码
这个问题确实很另类,浪尖估计是被面试者简历写了精通hive源码,才被大佬由此一问。要不正常使用者,不会注意到这个框架。
不过这个也给大家提个醒,关注框架使用的同时,也要关注框架的历史及发展。
总结一下就是:hive野心很大,不想仅仅限于hql,想提供一个单独的可以用java编写的hive自己的map/reduce计算框架。
最新经典文章,欢迎关注公众号
---------------------
|