问题导读
1、Terasort算法的关键点有哪些?
2、map task如何对数据记录做标记?
3、map task怎样对一个数据块进行局部排序?
1、概述
1TB排序通常用于衡量分布式数据处理框架的数据处理能力。Terasort是Hadoop中的的一个排序作业,在2008年,Hadoop在1TB排序基准评估中赢得第一名,耗时209秒。那么Terasort在Hadoop中是怎样实现的呢?本文主要从算法设计角度分析Terasort作业。
2、算法思想
实际上,当我们要把传统的串行排序算法设计成并行的排序算法时,通常会想到分而治之的策略,即:把要排序的数据划成M个数据块(可以用Hash的方法做到),然后每个map task对一个数据块进行局部排序,之后,一个reduce task对所有数据进行全排序。这种设计思路可以保证在map阶段并行度很高,但在reduce阶段完全没有并行。
传统并行sort算法
为了提高reduce阶段的并行度,TeraSort作业对以上算法进行改进:在map阶段,每个map task都会将数据划分成R个数据块(R为reduce task个数),其中第i(i>0)个数据块的所有数据都会比第i+1个中的数据大;在reduce阶段,第i个reduce task处理(进行排序)所有map task的第i块,这样第i个reduce task产生的结果均会比第i+1个大,最后将1~R个reduce task的排序结果顺序输出,即为最终的排序结果。这种设计思路很明显比第一种高效,但实现难度较大,它需要解决以下两个技术难点:第一,如何确定每个map task数据的R个数据块的范围? 第二,对于某条数据,如果快速的确定它属于哪个数据块?答案分别为【采样】和【trie树】。
Terasort流程
3、Terasort算法
3.1 Terasort算法流程
对于Hadoop的Terasort排序算法,主要由3步组成:采样 –>> map task对于数据记录做标记 –>> reduce task进行局部排序。
数据采样在JobClient端进行,首先从输入数据中抽取一部分数据,将这些数据进行排序,然后将它们划分成R个数据块,找出每个数据块的数据上限和下线(称为“分割点”),并将这些分割点保存到分布式缓存中。
在map阶段,每个map task首先从分布式缓存中读取分割点,并对这些分割点建立trie树(两层trie树,树的叶子节点上保存有该节点对应的reduce task编号)。然后正式开始处理数据,对于每条数据,在trie树中查找它属于的reduce task的编号,并保存起来。
在reduce阶段,每个reduce task从每个map task中读取其对应的数据进行局部排序,最后将reduce task处理后结果按reduce task编号依次输出即可。
3.2 Terasort算法关键点
(1)采样
Hadoop自带了很多数据采样工具,包括IntercalSmapler,RandomSampler,SplitSampler等(具体见org.apache.hadoop.mapred.lib)。
采样数据条数:sampleSize = conf.getLong(“terasort.partitions.sample”, 100000);
选取的split个数:samples = Math.min(10, splits.length); splits是所有split组成的数组。
每个split提取的数据条数:recordsPerSample = sampleSize / samples;
对采样的数据进行全排序,将获取的“分割点”写到文件_partition.lst中,并将它存放到分布式缓存区中。
举例说明:比如采样数据为b,abc,abd,bcd,abcd,efg,hii,afd,rrr,mnk
经排序后,得到:abc,abcd,abd,afd,b,bcd,efg,hii,mnk,rrr
如果reduce task个数为4,则分割点为:abd,bcd,mnk
(2)map task对数据记录做标记
每个map task从文件_partition.lst读取分割点,并创建trie树(假设是2-trie,即组织利用前两个字节)。
Map task从split中一条一条读取数据,并通过trie树查找每条记录所对应的reduce task编号。比如:abg对应第二个reduce task, mnz对应第四个reduce task。
(3)reduce task进行局部排序
每个reduce task进行局部排序,依次输出结果即可。
4、MapReduce examples code
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package org.apache.hadoop.examples.terasort;
-
- import java.io.DataInputStream;
- import java.io.IOException;
- import java.io.PrintStream;
- import java.net.URI;
-
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configurable;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.JobContext;
- import org.apache.hadoop.mapreduce.MRJobConfig;
- import org.apache.hadoop.mapreduce.Partitioner;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
-
- /**
- * Generates the sampled split points, launches the job, and waits for it to
- * finish.
- * <p>
- * To run the program:
- * <b>bin/hadoop jar hadoop-*-examples.jar terasort in-dir out-dir</b>
- */
- public class TeraSort extends Configured implements Tool {
- private static final Log LOG = LogFactory.getLog(TeraSort.class);
- static String SIMPLE_PARTITIONER = "mapreduce.terasort.simplepartitioner";
- static String OUTPUT_REPLICATION = "mapreduce.terasort.output.replication";
-
- /**
- * A partitioner that splits text keys into roughly equal partitions
- * in a global sorted order.
- */
- static class TotalOrderPartitioner extends Partitioner<Text,Text>
- implements Configurable {
- private TrieNode trie;
- private Text[] splitPoints;
- private Configuration conf;
-
- /**
- * A generic trie node
- */
- static abstract class TrieNode {
- private int level;
- TrieNode(int level) {
- this.level = level;
- }
- abstract int findPartition(Text key);
- abstract void print(PrintStream strm) throws IOException;
- int getLevel() {
- return level;
- }
- }
-
- /**
- * An inner trie node that contains 256 children based on the next
- * character.
- */
- static class InnerTrieNode extends TrieNode {
- private TrieNode[] child = new TrieNode[256];
-
- InnerTrieNode(int level) {
- super(level);
- }
- int findPartition(Text key) {
- int level = getLevel();
- if (key.getLength() <= level) {
- return child[0].findPartition(key);
- }
- return child[key.getBytes()[level] & 0xff].findPartition(key);
- }
- void setChild(int idx, TrieNode child) {
- this.child[idx] = child;
- }
- void print(PrintStream strm) throws IOException {
- for(int ch=0; ch < 256; ++ch) {
- for(int i = 0; i < 2*getLevel(); ++i) {
- strm.print(' ');
- }
- strm.print(ch);
- strm.println(" ->");
- if (child[ch] != null) {
- child[ch].print(strm);
- }
- }
- }
- }
-
- /**
- * A leaf trie node that does string compares to figure out where the given
- * key belongs between lower..upper.
- */
- static class LeafTrieNode extends TrieNode {
- int lower;
- int upper;
- Text[] splitPoints;
- LeafTrieNode(int level, Text[] splitPoints, int lower, int upper) {
- super(level);
- this.splitPoints = splitPoints;
- this.lower = lower;
- this.upper = upper;
- }
- int findPartition(Text key) {
- for(int i=lower; i<upper; ++i) {
- if (splitPoints[i].compareTo(key) > 0) {
- return i;
- }
- }
- return upper;
- }
- void print(PrintStream strm) throws IOException {
- for(int i = 0; i < 2*getLevel(); ++i) {
- strm.print(' ');
- }
- strm.print(lower);
- strm.print(", ");
- strm.println(upper);
- }
- }
-
-
- /**
- * Read the cut points from the given sequence file.
- * @param fs the file system
- * @param p the path to read
- * @param job the job config
- * @return the strings to split the partitions on
- * @throws IOException
- */
- private static Text[] readPartitions(FileSystem fs, Path p,
- Configuration conf) throws IOException {
- int reduces = conf.getInt(MRJobConfig.NUM_REDUCES, 1);
- Text[] result = new Text[reduces - 1];
- DataInputStream reader = fs.open(p);
- for(int i=0; i < reduces - 1; ++i) {
- result[i] = new Text();
- result[i].readFields(reader);
- }
- reader.close();
- return result;
- }
-
- /**
- * Given a sorted set of cut points, build a trie that will find the correct
- * partition quickly.
- * @param splits the list of cut points
- * @param lower the lower bound of partitions 0..numPartitions-1
- * @param upper the upper bound of partitions 0..numPartitions-1
- * @param prefix the prefix that we have already checked against
- * @param maxDepth the maximum depth we will build a trie for
- * @return the trie node that will divide the splits correctly
- */
- private static TrieNode buildTrie(Text[] splits, int lower, int upper,
- Text prefix, int maxDepth) {
- int depth = prefix.getLength();
- if (depth >= maxDepth || lower == upper) {
- return new LeafTrieNode(depth, splits, lower, upper);
- }
- InnerTrieNode result = new InnerTrieNode(depth);
- Text trial = new Text(prefix);
- // append an extra byte on to the prefix
- trial.append(new byte[1], 0, 1);
- int currentBound = lower;
- for(int ch = 0; ch < 255; ++ch) {
- trial.getBytes()[depth] = (byte) (ch + 1);
- lower = currentBound;
- while (currentBound < upper) {
- if (splits[currentBound].compareTo(trial) >= 0) {
- break;
- }
- currentBound += 1;
- }
- trial.getBytes()[depth] = (byte) ch;
- result.child[ch] = buildTrie(splits, lower, currentBound, trial,
- maxDepth);
- }
- // pick up the rest
- trial.getBytes()[depth] = (byte) 255;
- result.child[255] = buildTrie(splits, currentBound, upper, trial,
- maxDepth);
- return result;
- }
-
- public void setConf(Configuration conf) {
- try {
- FileSystem fs = FileSystem.getLocal(conf);
- this.conf = conf;
- Path partFile = new Path(TeraInputFormat.PARTITION_FILENAME);
- splitPoints = readPartitions(fs, partFile, conf);
- trie = buildTrie(splitPoints, 0, splitPoints.length, new Text(), 2);
- } catch (IOException ie) {
- throw new IllegalArgumentException("can't read partitions file", ie);
- }
- }
-
- public Configuration getConf() {
- return conf;
- }
-
- public TotalOrderPartitioner() {
- }
-
- public int getPartition(Text key, Text value, int numPartitions) {
- return trie.findPartition(key);
- }
-
- }
-
- /**
- * A total order partitioner that assigns keys based on their first
- * PREFIX_LENGTH bytes, assuming a flat distribution.
- */
- public static class SimplePartitioner extends Partitioner<Text, Text>
- implements Configurable {
- int prefixesPerReduce;
- private static final int PREFIX_LENGTH = 3;
- private Configuration conf = null;
- public void setConf(Configuration conf) {
- this.conf = conf;
- prefixesPerReduce = (int) Math.ceil((1 << (8 * PREFIX_LENGTH)) /
- (float) conf.getInt(MRJobConfig.NUM_REDUCES, 1));
- }
-
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public int getPartition(Text key, Text value, int numPartitions) {
- byte[] bytes = key.getBytes();
- int len = Math.min(PREFIX_LENGTH, key.getLength());
- int prefix = 0;
- for(int i=0; i < len; ++i) {
- prefix = (prefix << 8) | (0xff & bytes[i]);
- }
- return prefix / prefixesPerReduce;
- }
- }
-
- public static boolean getUseSimplePartitioner(JobContext job) {
- return job.getConfiguration().getBoolean(SIMPLE_PARTITIONER, false);
- }
-
- public static void setUseSimplePartitioner(Job job, boolean value) {
- job.getConfiguration().setBoolean(SIMPLE_PARTITIONER, value);
- }
-
- public static int getOutputReplication(JobContext job) {
- return job.getConfiguration().getInt(OUTPUT_REPLICATION, 1);
- }
-
- public static void setOutputReplication(Job job, int value) {
- job.getConfiguration().setInt(OUTPUT_REPLICATION, value);
- }
-
- public int run(String[] args) throws Exception {
- LOG.info("starting");
- Job job = Job.getInstance(getConf());
- Path inputDir = new Path(args[0]);
- Path outputDir = new Path(args[1]);
- boolean useSimplePartitioner = getUseSimplePartitioner(job);
- TeraInputFormat.setInputPaths(job, inputDir);
- FileOutputFormat.setOutputPath(job, outputDir);
- job.setJobName("TeraSort");
- job.setJarByClass(TeraSort.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.setInputFormatClass(TeraInputFormat.class);
- job.setOutputFormatClass(TeraOutputFormat.class);
- if (useSimplePartitioner) {
- job.setPartitionerClass(SimplePartitioner.class);
- } else {
- long start = System.currentTimeMillis();
- Path partitionFile = new Path(outputDir,
- TeraInputFormat.PARTITION_FILENAME);
- URI partitionUri = new URI(partitionFile.toString() +
- "#" + TeraInputFormat.PARTITION_FILENAME);
- try {
- TeraInputFormat.writePartitionFile(job, partitionFile);
- } catch (Throwable e) {
- LOG.error(e.getMessage());
- return -1;
- }
- job.addCacheFile(partitionUri);
- long end = System.currentTimeMillis();
- System.out.println("Spent " + (end - start) + "ms computing partitions.");
- job.setPartitionerClass(TotalOrderPartitioner.class);
- }
-
- job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));
- TeraOutputFormat.setFinalSync(job, true);
- int ret = job.waitForCompletion(true) ? 0 : 1;
- LOG.info("done");
- return ret;
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new TeraSort(), args);
- System.exit(res);
- }
-
- }
复制代码
|