分享

使用Scala开发基于Hadoop的map reduce程序

pig2 发表于 2014-7-15 16:47:32 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 17344
问题导读:
1.如何使用脚本打包?
2.如何测试Scala代码?





昨天较完整地搭建了一个Hadoop环境,并仔细研究了几个配置参数。
直到晚上,整个环境跑起来,web监控等也正常。于是用Scala写了一个word count的测试程序。
Scala可以很好地调用Java代码,所以,唯一要做的就是配置好编译环境,并把scala-library.jar和
应用程序一并打包。通过简单的探索,就得到了可以运行的配置。
这里给出简单的编译和打包方法,供参考。

编译脚本:这个脚本的主要作用是设置正确的classpath。

  1. #!/bin/bash
  2. # fast scala compiler for hadoop
  3. # accept every options fsc supported, except '-classpath'
  4. class_path=.:$JAVA_HOME/lib/tools.jar:$HADOOP_HOME/hadoop-2-core.jar
  5. class_path=${class_path}:$HADOOP_HOME/lib/commons-cli-2.0-SNAPSHOT.jar
  6. class_path=${class_path}:$HADOOP_HOME/lib/commons-codec-1.3.jar
  7. class_path=${class_path}:$HADOOP_HOME/lib/commons-httpclient-3.0.1.jar
  8. class_path=${class_path}:$HADOOP_HOME/lib/commons-logging-1.0.4.jar
  9. class_path=${class_path}:$HADOOP_HOME/lib/commons-logging-api-1.0.4.jar
  10. class_path=${class_path}:$HADOOP_HOME/lib/commons-net-1.4.1.jar
  11. class_path=${class_path}:$HADOOP_HOME/lib/derbyclient.jar
  12. class_path=${class_path}:$HADOOP_HOME/lib/derby.jar
  13. class_path=${class_path}:$HADOOP_HOME/lib/hadoop-2-baidu-sos.jar
  14. class_path=${class_path}:$HADOOP_HOME/lib/hsqldb-1.8.0.10.jar
  15. class_path=${class_path}:$HADOOP_HOME/lib/jets3t-0.6.1.jar
  16. class_path=${class_path}:$HADOOP_HOME/lib/jetty-6.1.14.jar
  17. class_path=${class_path}:$HADOOP_HOME/lib/jetty-util-6.1.14.jar
  18. class_path=${class_path}:$HADOOP_HOME/lib/json-org.jar
  19. class_path=${class_path}:$HADOOP_HOME/lib/junit-3.8.1.jar
  20. class_path=${class_path}:$HADOOP_HOME/lib/kfs-0.2.2.jar
  21. class_path=${class_path}:$HADOOP_HOME/lib/log4j-1.2.15.jar
  22. class_path=${class_path}:$HADOOP_HOME/lib/oro-2.0.8.jar
  23. class_path=${class_path}:$HADOOP_HOME/lib/servlet-api.jar
  24. class_path=${class_path}:$HADOOP_HOME/lib/slf4j-api-1.4.3.jar
  25. class_path=${class_path}:$HADOOP_HOME/lib/slf4j-log4j12-1.4.3.jar
  26. class_path=${class_path}:$HADOOP_HOME/lib/xmlenc-0.52.jar
  27. class_path=${class_path}:$HADOOP_HOME/lib/jetty-ext/commons-el.jar
  28. class_path=${class_path}:$HADOOP_HOME/lib/jetty-ext/jasper-compiler.jar
  29. class_path=${class_path}:$HADOOP_HOME/lib/jetty-ext/jasper-runtime.jar
  30. class_path=${class_path}:$HADOOP_HOME/lib/jetty-ext/jsp-api.jar
  31. fsc -classpath ${class_path} "$@"
复制代码

下面这个脚本主要是打包成一个.jar文件:
  1. #!/bin/bash
  2. # package up a jar file to sumit for scala
  3. if [ $# -ne 2 ]; then
  4.     echo "Usage: `basename $0` jar_file classes_dir"
  5.     exit 1
  6. fi
  7. jarfile=$1
  8. classes_dir=$2
  9. if [ -e ${classes_dir}/lib ]; then
  10.     echo "adding libraries: "
  11.     ls ${classes_dir}/lib
  12. else
  13.     mkdir ${classes_dir}/lib
  14. fi
  15. cp $SCALA_HOME/lib/scala-library.jar ${classes_dir}/lib/ &&
  16. jar -cvf ${jarfile}.jar -C ${classes_dir} .
复制代码


最后,用于测试的Scala代码



  1. package net.liangkun
  2.             
  3. import java.io.IOException
  4. import java.util._
  5.                
  6. import org.apache.hadoop.fs.Path
  7. import org.apache.hadoop.conf._
  8. import org.apache.hadoop.io._
  9. import org.apache.hadoop.mapred._
  10. import org.apache.hadoop.util._
  11.         
  12. class Map extends MapReduceBase
  13. with Mapper[LongWritable, Text, Text, IntWritable] {
  14.     private val one = new IntWritable(1);
  15.     private val word = new Text();
  16.         
  17.     def map(key: LongWritable, value: Text,
  18.         output: OutputCollector[Text, IntWritable],
  19.         reporter: Reporter
  20.     ) {
  21.         
  22.         val line = value.toString
  23.         val tokenizer = new StringTokenizer(line)
  24.         while(tokenizer.hasMoreTokens) {
  25.             word.set(tokenizer.nextToken)
  26.             output.collect(word, one)
  27.         }
  28.     }
  29. }
  30. class Reduce extends MapReduceBase
  31. with Reducer[Text, IntWritable, Text, IntWritable] {
  32.     def reduce(key: Text, values: Iterator[IntWritable],
  33.         output: OutputCollector[Text, IntWritable],
  34.         reporter: Reporter
  35.     ) {
  36.         output.collect(key, new IntWritable(count(0, values)))
  37.         def count(sum: Int, vs: Iterator[IntWritable]): Int =
  38.             if(vs.hasNext)
  39.                 count(sum + vs.next.get, vs)
  40.             else
  41.                 sum
  42.     }
  43. }
  44. object WordCount {
  45.     def main(args: Array[String]) {
  46.         val conf = new JobConf(this.getClass)
  47.         conf.setJobName("WordCount")
  48.         conf.setOutputKeyClass(classOf[Text])
  49.         conf.setOutputValueClass(classOf[IntWritable])
  50.         conf.setMapperClass(classOf[Map])
  51.         conf.setCombinerClass(classOf[Reduce])
  52.         conf.setReducerClass(classOf[Reduce])
  53.         conf.setInputFormat(classOf[TextInputFormat])
  54.         conf.setOutputFormat(classOf[TextOutputFormat[Text, IntWritable]])
  55.         FileInputFormat.setInputPaths(conf, new Path(args(0)))
  56.         FileOutputFormat.setOutputPath(conf, new Path(args(1)))
  57.         JobClient.runJob(conf)
  58.     }
  59. }
复制代码










已有(2)人评论

跳转到指定楼层
小小布衣 发表于 2014-10-30 09:05:17
最近在学scala
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条