分享

Hadoop Streaming各种版本wordCount

hyj 2014-2-26 20:42:52 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 7744
本帖最后由 nettman 于 2014-5-25 19:43 编辑
Hadoop Streaming是Hadoop提供的多语言编程工具,通过该工具,用户可采用任何语言编写MapReduce程序,本文将介绍几个Hadoop Streaming编程实例,大家可以带着下面问题来阅读:

1.对于一种编写语言,应该怎么编写Mapper和Reduce,需遵循什么样的编程规范
2. 如何在Hadoop Streaming中自定义Hadoop Counter
3. 如何在Hadoop Streaming中自定义状态信息,进而给用户反馈当前作业执行进度
4. 如何在Hadoop Streaming中打印调试日志,在哪里可以看到这些日志
5.如何使用Hadoop Streaming处理二进制文件,而不仅仅是文本文件

本文重点解决前四个问题,给出了C++和Shell编写的Wordcount实例,供大家参考。

1. C++版WordCount
(1)Mapper实现(mapper.cpp)
  1. #include <iostream>
  2. #include <string>
  3. using namespace std;
  4. int main() {
  5.   string key;
  6.   while(cin >> key) {
  7.     cout << key << "\t" << "1" << endl;
  8.     // Define counter named counter_no in group counter_group
  9.     cerr << "reporter:counter:counter_group,counter_no,1\n";
  10.     // dispaly status
  11.     cerr << "reporter:status:processing......\n";
  12.     // Print logs for testing
  13.     cerr << "This is log, will be printed in stdout file\n";
  14.   }
  15.   return 0;
  16. }
复制代码
(2)Reducer实现(reducer.cpp)
  1. #include <iostream>
  2. #include <string>
  3. using namespace std;
  4. int main() { //reducer将会被封装成一个独立进程,因而需要有main函数
  5.   string cur_key, last_key, value;
  6.   cin >> cur_key >> value;
  7.   last_key = cur_key;
  8.   int n = 1;
  9.   while(cin >> cur_key) { //读取map task输出结果
  10.     cin >> value;
  11.     if(last_key != cur_key) { //识别下一个key
  12.       cout << last_key << "\t" << n << endl;
  13.       last_key = cur_key;
  14.       n = 1;
  15.     } else { //获取key相同的所有value数目
  16.       n++; //key值相同的,累计value值
  17.     }
  18.   }
  19.   cout << last_key << "\t" << n << endl;
  20.   return 0;
  21. }
复制代码
(3)编译运行
编译以上两个程序:
  1. g++ -o mapper mapper.cpp
  2. g++ -o reducer reducer.cpp
复制代码
测试一下:
  1. echo “dong xicheng is here now, talk to dong xicheng now” | ./mapper | sort | ./reducer
复制代码
注:上面这种测试方法会频繁打印以下字符串,可以先注释掉,这些字符串hadoop能够识别
  1. reporter:counter:counter_group,counter_no,1
  2. reporter:status:processing……
  3. This is log, will be printed in stdout file
复制代码
测试通过后,可通过以下脚本将作业提交到集群中(run_cpp_mr.sh):
  1. #!/bin/bash
  2. HADOOP_HOME=/opt/yarn-client
  3. INPUT_PATH=/test/input
  4. OUTPUT_PATH=/test/output
  5. echo "Clearing output path: $OUTPUT_PATH"
  6. $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH
  7. ${HADOOP_HOME}/bin/hadoop jar\
  8.    ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\
  9.   -files mapper,reducer\
  10.   -input $INPUT_PATH\
  11.   -output $OUTPUT_PATH\
  12.   -mapper mapper\
  13.   -reducer reducer
复制代码
2. Shell版WordCount
(1)Mapper实现(mapper.sh)
  1. #! /bin/bash
  2. while read LINE; do
  3.   for word in $LINE
  4.   do
  5.     echo "$word 1"
  6.     # in streaming, we define counter by
  7.     # [reporter:counter:<group>,<counter>,<amount>]
  8.     # define a counter named counter_no, in group counter_group
  9.     # increase this counter by 1
  10.     # counter shoule be output through stderr
  11.     echo "reporter:counter:counter_group,counter_no,1" >&2
  12.     echo "reporter:counter:status,processing......" >&2
  13.     echo "This is log for testing, will be printed in stdout file" >&2
  14.   done
  15. done
复制代码
(2)Reducer实现(mapper.sh)
  1. #! /bin/bash
  2. count=0
  3. started=0
  4. word=""
  5. while read LINE;do
  6.   newword=`echo $LINE | cut -d ' '  -f 1`
  7.   if [ "$word" != "$newword" ];then
  8.     [ $started -ne 0 ] && echo "$word\t$count"
  9.     word=$newword
  10.     count=1
  11.     started=1
  12.   else
  13.     count=$(( $count + 1 ))
  14.   fi
  15. done
  16. echo "$word\t$count"
复制代码
(3)测试运行
测试以上两个程序:
  1. echo “dong xicheng is here now, talk to dong xicheng now” | sh mapper.sh | sort | sh reducer.sh
复制代码
注:上面这种测试方法会频繁打印以下字符串,可以先注释掉,这些字符串hadoop能够识别
  1. reporter:counter:counter_group,counter_no,1
  2. reporter:status:processing……
  3. This is log, will be printed in stdout file
复制代码
测试通过后,可通过以下脚本将作业提交到集群中(run_shell_mr.sh):
  1. #!/bin/bash
  2. HADOOP_HOME=/opt/yarn-client
  3. INPUT_PATH=/test/input
  4. OUTPUT_PATH=/test/output
  5. echo "Clearing output path: $OUTPUT_PATH"
  6. $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH
  7. ${HADOOP_HOME}/bin/hadoop jar\
  8.    ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\
  9.   -files mapper.sh,reducer.sh\
  10.   -input $INPUT_PATH\
  11.   -output $OUTPUT_PATH\
  12.   -mapper "sh mapper.sh"\
  13.   -reducer "sh reducer.sh"
复制代码
3. 程序说明
在Hadoop Streaming中,标准输入、标准输出和错误输出各有妙用,其中,标准输入和输出分别用于接受输入数据和输出处理结果,而错误输出的意义视内容而定:
(1)如果标准错误输出的内容为:reporter:counter:group,counter,amount,表示将名称为counter,所在组为group的hadoop counter值增加amount,hadoop第一次读到这个counter时,会创建它,之后查找counter表,增加对应counter值
(2)如果标准错误输出的内容为:reporter:status:message,则表示在界面或者终端上打印message信息,可以是一些状态提示信息
(3)如果采用错误输出的内容不是以上两种情况,则表示调试日志,Hadoop会将其重定向到stdout文件中。注:每个Task对应三个日志文件,分别是stdout、stderr和syslog,都是文本文件,可以在web界面上查看这三个日志文件内容,也可以登录到task所在节点上,到对应目录中查看。
另外,需要注意一点,默认Map Task输出的key和value分隔符是\t,Hadoop会在Map和Reduce阶段按照\t分离key和value,并对key排序,注意这点非常重要,当然,你可以使用stream.map.output.field.separator指定新的分隔符。





没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条