问题导读:
1.传统程序有什么特点?
2.mapreduce如何实现的分布式?
3.是否所有的传统程序都可以转换为mapreduce?
传统程序我们求平均值、排序或许对于我们程序员来讲,这并不是难事。传统程序该如何转换mapreduce,这里以求平均数为例。
一、传统程序:
无论是C语言、Java、.net还是其它语言,比如求平均值。
我们都会是输入一组数据:
复制代码
然后该如何求平均值:
复制代码 这样的在任何语言中,这都是小事一桩。
二、如何转换mapreduce程序
为什么mapreduce被称之为分布式编程,是因为它把输入数据进行了分割,然后每一个客户端处理一部分数据,最后在合并起来。求平均值,mapreduce首先分割输入数据
1
2
3
4
5
分割之后,发给map处理,map处理完毕送到reduce,这样就完成了mapredcue。而这个中间的分割的过程,则是传统程序所没有的。下面便是通过来mapreduce实现来运行平均值
首先我们进行map函数:
map函数就是对数据一个分割,但是在进行之前已经对数据进行了分割。
我们从下面结果来分析mapredue:
上面结果map传递value中,可以得出,map函数被调用了5次,然后分别输出了strScore.
Reduce调用了一次。
附上下面程序:如果附加到个人项目中,首先需要
(1)创建包aboutyun.com
(2)然后有avg.txt文件
(3)修改成自己的hdfs路径
从上面我们看出,任何传统的程序都可以转换为mapreduce.
- package aboutyun.com;
-
- import java.io.IOException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Counter;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-
- import java.util.Iterator;
- import java.util.StringTokenizer;
-
- public class pingjunzhi {
-
- static final String INPUT_PATH = "hdfs://master:8020/avg.txt";
- static final String OUT_PATH = "hdfs://master:8020/outPut/test";
-
- public static void main(String[] args) throws Exception {
- // 主类
- Configuration conf = new Configuration();
-
- final Job job = Job.getInstance(conf, mapreduce.class.getSimpleName());
- // final Job job = new Job(conf, mapreduce.class.getSimpleName());
- job.setNumReduceTasks(1);
- job.setJarByClass(mapreduce.class);
- // 寻找输入
- FileInputFormat.setInputPaths(job, INPUT_PATH);
- // 1.2对输入数据进行格式化处理的类
- job.setInputFormatClass(TextInputFormat.class);
- job.setMapperClass(MyMapper.class);
-
- // 1.2指定map输出类型<key,value>类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
-
- job.setReducerClass(MyReduce.class);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- // 指定输出路径
- FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
- // 指定输出的格式或则类
- job.setOutputFormatClass(TextOutputFormat.class);
-
- // 把作业提交
- job.waitForCompletion(true);
-
- }
-
- // map类
- static class MyMapper extends
- Mapper<LongWritable, Text, Text, LongWritable> {
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
-
-
-
- String line = value.toString();
- Counter countPrint = context.getCounter("Map输出传递Value", line);
- countPrint.increment(1l);
- // 将输入的数据首先按行进行分割
-
- StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");
-
- // 分别对每一行进行处理
-
- while (tokenizerArticle.hasMoreElements()) {
-
- // 每行按空格划分
-
- StringTokenizer tokenizerLine = new StringTokenizer(
- tokenizerArticle.nextToken());
-
-
-
- String strScore = tokenizerLine.nextToken();// 个数部分
- Counter countPrint1 = context.getCounter("Map中循环strScore", strScore);
- countPrint1.increment(1l);
- // Text name = new Text(strName);
-
- int scoreInt = Integer.parseInt(strScore);
-
- // 输出
-
- context.write(new Text("avg"), new LongWritable(scoreInt));
-
- }
-
- }
-
- }
-
- // reduce类
- static class MyReduce extends
- Reducer<Text, LongWritable, Text, LongWritable> {
- @Override
- protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s,
- Context ctx) throws java.io.IOException, InterruptedException {
-
-
- long sum = 0;
-
- long count = 0;
-
- Iterator<LongWritable> iterator = v2s.iterator();
-
- while (iterator.hasNext()) {
-
- sum += iterator.next().get();// 计算总值
-
- count++;// 统计个数
-
- }
-
- long average = (long) sum / count;// 计算平均值
-
- ctx.write(k2, new LongWritable(average));
- Counter countPrint1 = ctx.getCounter("Redue调用次数","空");
- countPrint1.increment(1l);
-
- }
-
- }
-
- }
复制代码
|