本帖最后由 nettman 于 2014-5-25 19:43 编辑
Hadoop Streaming是Hadoop提供的多语言编程工具,通过该工具,用户可采用任何语言编写MapReduce程序,本文将介绍几个Hadoop Streaming编程实例,大家可以带着下面问题来阅读:
2. 如何在Hadoop Streaming中自定义Hadoop Counter
3. 如何在Hadoop Streaming中自定义状态信息,进而给用户反馈当前作业执行进度
4. 如何在Hadoop Streaming中打印调试日志,在哪里可以看到这些日志
5.如何使用Hadoop Streaming处理二进制文件,而不仅仅是文本文件
1. C++版WordCount
- #include <iostream>
- #include <string>
- using namespace std;
- int main() {
- string key;
- while(cin >> key) {
- cout << key << "\t" << "1" << endl;
- // Define counter named counter_no in group counter_group
- cerr << "reporter:counter:counter_group,counter_no,1\n";
- // dispaly status
- cerr << "reporter:status:processing......\n";
- // Print logs for testing
- cerr << "This is log, will be printed in stdout file\n";
- }
- return 0;
- }
(2)Reducer实现(reducer.cpp)- #include <iostream>
- #include <string>
- using namespace std;
- int main() { //reducer将会被封装成一个独立进程,因而需要有main函数
- string cur_key, last_key, value;
- cin >> cur_key >> value;
- last_key = cur_key;
- int n = 1;
- while(cin >> cur_key) { //读取map task输出结果
- cin >> value;
- if(last_key != cur_key) { //识别下一个key
- cout << last_key << "\t" << n << endl;
- last_key = cur_key;
- n = 1;
- } else { //获取key相同的所有value数目
- n++; //key值相同的,累计value值
- }
- }
- cout << last_key << "\t" << n << endl;
- return 0;
- }
编译以上两个程序:- g++ -o mapper mapper.cpp
- g++ -o reducer reducer.cpp
测试一下:- echo “dong xicheng is here now, talk to dong xicheng now” | ./mapper | sort | ./reducer
注:上面这种测试方法会频繁打印以下字符串,可以先注释掉,这些字符串hadoop能够识别- reporter:counter:counter_group,counter_no,1
- reporter:status:processing……
- This is log, will be printed in stdout file
测试通过后,可通过以下脚本将作业提交到集群中(run_cpp_mr.sh):- #!/bin/bash
- HADOOP_HOME=/opt/yarn-client
- INPUT_PATH=/test/input
- OUTPUT_PATH=/test/output
- echo "Clearing output path: $OUTPUT_PATH"
- $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH
- ${HADOOP_HOME}/bin/hadoop jar\
- ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\
- -files mapper,reducer\
- -input $INPUT_PATH\
- -output $OUTPUT_PATH\
- -mapper mapper\
- -reducer reducer
2. Shell版WordCount
(1)Mapper实现(mapper.sh)- #! /bin/bash
- while read LINE; do
- for word in $LINE
- do
- echo "$word 1"
- # in streaming, we define counter by
- # [reporter:counter:<group>,<counter>,<amount>]
- # define a counter named counter_no, in group counter_group
- # increase this counter by 1
- # counter shoule be output through stderr
- echo "reporter:counter:counter_group,counter_no,1" >&2
- echo "reporter:counter:status,processing......" >&2
- echo "This is log for testing, will be printed in stdout file" >&2
- done
- done
(2)Reducer实现(mapper.sh)- #! /bin/bash
- count=0
- started=0
- word=""
- while read LINE;do
- newword=`echo $LINE | cut -d ' ' -f 1`
- if [ "$word" != "$newword" ];then
- [ $started -ne 0 ] && echo "$word\t$count"
- word=$newword
- count=1
- started=1
- else
- count=$(( $count + 1 ))
- fi
- done
- echo "$word\t$count"
测试以上两个程序:- echo “dong xicheng is here now, talk to dong xicheng now” | sh mapper.sh | sort | sh reducer.sh
注:上面这种测试方法会频繁打印以下字符串,可以先注释掉,这些字符串hadoop能够识别- reporter:counter:counter_group,counter_no,1
- reporter:status:processing……
- This is log, will be printed in stdout file
测试通过后,可通过以下脚本将作业提交到集群中(run_shell_mr.sh):- #!/bin/bash
- HADOOP_HOME=/opt/yarn-client
- INPUT_PATH=/test/input
- OUTPUT_PATH=/test/output
- echo "Clearing output path: $OUTPUT_PATH"
- $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH
- ${HADOOP_HOME}/bin/hadoop jar\
- ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\
- -files mapper.sh,reducer.sh\
- -input $INPUT_PATH\
- -output $OUTPUT_PATH\
- -mapper "sh mapper.sh"\
- -reducer "sh reducer.sh"
3. 程序说明
在Hadoop Streaming中,标准输入、标准输出和错误输出各有妙用,其中,标准输入和输出分别用于接受输入数据和输出处理结果,而错误输出的意义视内容而定:
(1)如果标准错误输出的内容为:reporter:counter:group,counter,amount,表示将名称为counter,所在组为group的hadoop counter值增加amount,hadoop第一次读到这个counter时,会创建它,之后查找counter表,增加对应counter值
另外,需要注意一点,默认Map Task输出的key和value分隔符是\t,Hadoop会在Map和Reduce阶段按照\t分离key和value,并对key排序,注意这点非常重要,当然,你可以使用stream.map.output.field.separator指定新的分隔符。