分享

Spark技术实战之1 -- KafkaWordCount

xioaxu790 2014-10-19 11:30:48 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 6 40786
问题导读
1、怎样搭建Kafka集群?
2、parser中的表达式分别代表什么意思?
3、你觉得应该如何运行KafkaWordCount呢?





概要
Spark应用开发实践性非常强,很多时候可能都会将时间花费在环境的搭建和运行上,如果有一个比较好的指导将会大大的缩短应用开发流程。Spark Streaming中涉及到和许多第三方程序的整合,源码中的例子如何真正跑起来,文档不是很多也不详细。

本篇主要讲述如何运行KafkaWordCount,这个需要涉及Kafka集群的搭建,还是说的越仔细越好。

搭建Kafka集群
步骤1:下载kafka 0.8.1及解压
  1. wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
  2. tar zvxf kafka_2.10-0.8.1.1.tgz
  3. cd kafka_2.10-0.8.1.1
复制代码



步骤2:启动zookeeper
  1. bin/zookeeper-server-start.sh config/zookeeper.properties
复制代码


步骤3:修改配置文件config/server.properties,添加如下内容
  1. host.name=localhost
  2. # Hostname the broker will advertise to producers and consumers. If not set, it uses the
  3. # value for "host.name" if configured.  Otherwise, it will use the value returned from
  4. # java.net.InetAddress.getCanonicalHostName().
  5. advertised.host.name=localhost
复制代码


步骤4:启动Kafka server
  1. bin/kafka-server-start.sh config/server.properties
复制代码


步骤5:创建topic
  1. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1  --topic test
复制代码


检验topic创建是否成功
  1. bin/kafka-topics.sh --list --zookeeper localhost:2181
复制代码

如果正常返回test

步骤6:打开producer,发送消息
  1. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  2. ##启动成功后,输入以下内容测试
  3. This is a message
  4. This is another message
复制代码


步骤7:打开consumer,接收消息
  1. bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
  2. ###启动成功后,如果一切正常将会显示producer端输入的内容
  3. This is a message
  4. This is another message
复制代码


运行KafkaWordCount
KafkaWordCount源文件位置 examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
尽管里面有使用说明,见下文,但如果不是事先对Kafka有一定的了解的话,决然不知道这些参数是什么意思,也不知道该如何填写。
  1. /**
  2. * Consumes messages from one or more topics in Kafka and does wordcount.
  3. * Usage: KafkaWordCount   
  4. *    is a list of one or more zookeeper servers that make quorum
  5. *    is the name of kafka consumer group
  6. *    is a list of one or more kafka topics to consume from
  7. *    is the number of threads the kafka consumer should use
  8. *
  9. * Example:
  10. *    `$ bin/run-example \
  11. *      org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
  12. *      my-consumer-group topic1,topic2 1`
  13. */
  14. object KafkaWordCount {
  15.   def main(args: Array[String]) {
  16.     if (args.length < 4) {
  17.       System.err.println("Usage: KafkaWordCount    ")
  18.       System.exit(1)
  19.     }
  20.     StreamingExamples.setStreamingLogLevels()
  21.     val Array(zkQuorum, group, topics, numThreads) = args
  22.     val sparkConf = new SparkConf().setAppName("KafkaWordCount")
  23.     val ssc =  new StreamingContext(sparkConf, Seconds(2))
  24.     ssc.checkpoint("checkpoint")
  25.     val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
  26.     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
  27.     val words = lines.flatMap(_.split(" "))
  28.     val wordCounts = words.map(x => (x, 1L))
  29.       .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
  30.     wordCounts.print()
  31.     ssc.start()
  32.     ssc.awaitTermination()
  33.   }
  34. }
复制代码



讲清楚了写这篇博客的主要原因之后,来看一看该如何运行KafkaWordCount

步骤1:停止运行刚才的kafka-console-producer和kafka-console-consumer

步骤2:运行KafkaWordCountProducer
  1. bin/run-example org.apache.spark.examples.streaming.KafkaWordCountProducer localhost:9092 test 3 5
复制代码

解释一下参数的意思,localhost:9092表示producer的地址和端口, test表示topic,3表示每秒发多少条消息,5表示每条消息中有几个单词

步骤3:运行KafkaWordCount
  1. bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 test-consumer-group test 1
  2. 解释一下参数, localhost:2181表示zookeeper的监听地址,test-consumer-group表示consumer-group的名称,必须和$KAFKA_HOME/config/consumer.properties中的group.id的配置内容一致,test表示topic,1表示线程数。
复制代码




Spark技术实战之 -- PackratParsers实例
概要
通过一个简明的Demo程序来说明如何使用scala中的PackratParsers
  1. DemoApp
  2. import scala.util.parsing.combinator.PackratParsers
  3. import scala.util.parsing.combinator.syntactical._
  4. object Dotter extends StandardTokenParsers with PackratParsers {
  5.     //定义分割符
  6.     lexical.delimiters ++= List(".",";","+","-","*")
  7.     //合法的输入模式,支持加,减,乘
  8.     lazy val pgm : PackratParser[Int] = expr | minus|multiply
  9.     //定义模式加
  10.     lazy val expr :PackratParser[Int]= num~"+"~num ^^ {case n1~"+"~n2 => n1.toInt + n2.toInt}
  11.     //定义模式减
  12.     lazy val minus :PackratParser[Int]= num~"-"~num ^^ {case n1~"-"~n2 => n1.toInt - n2.toInt}
  13.     lazy val multiply :PackratParser[Int]= num~"*"~num ^^ {case n1~"*"~n2 => n1.toInt * n2.toInt}
  14.     lazy val num = numericLit
  15.     def parse(input: String) =
  16.     phrase(pgm)(new PackratReader(new lexical.Scanner(input))) match {
  17.       case Success(result, _) => println("Success!"); println(result);Some(result)
  18.       case n @ _ => println(n);println("bla"); None
  19.     }  
  20.     def main(args: Array[String]) {
  21.       //定义list,::表示添加,Nil表示list结束
  22.       val prg = "12*2"::"24-4"::"3+5"::Nil
  23.       prg.map(parse)
  24.     }
  25. }
复制代码


parser中的表达式说明
A<~B        只保留左侧内容 A<~B 只保留A
A~>B        只保留右侧内容 A~>B 只保留B
^^        根据匹配结果生成语法短语
^^^        将语法短语转换成为另外的值,注意与^^的区别
~        连接符 A &#771;B 表示模式匹配是B紧跟于A之后
|        或者 A|B 表示模式要么由A组成,要么由B组成


编译执行
将上述源码保存到文件dotter.scala。

编译
  1. scalac dotter.scala
复制代码


执行
  1. scala -cp . Dotter
复制代码



相关文章

Spark技术实战之1 -- KafkaWordCount
http://www.aboutyun.com/thread-9580-1-1.html

Spark技术实战之2 -- Spark Cassandra Connector的安装和使用
http://www.aboutyun.com/thread-9582-1-1.html


Spark技术实战之3 -- 利用Spark将json文件导入Cassandra
http://www.aboutyun.com/thread-9583-1-1.html


Apache Spark技术实战之4 -- SparkR的安装及使用
http://www.aboutyun.com/thread-10082-1-1.html

Apache Spark技术实战之5 -- spark-submit常见问题及其解决
http://www.aboutyun.com/thread-10083-1-1.html

Apache Spark技术实战之6 -- CassandraRDD高并发数据读取实现剖析
http://www.aboutyun.com/thread-10084-1-1.html












http://www.cnblogs.com/hseagle/p/3887507.html

已有(6)人评论

跳转到指定楼层
hb1984 发表于 2014-10-22 11:46:23
谢谢楼主分享。              
回复

使用道具 举报

hahaxixi 发表于 2014-11-18 09:23:23
谢谢楼主分享。              
回复

使用道具 举报

轩辕依梦Q 发表于 2015-9-1 15:09:19
mark一下,多谢楼主分享
回复

使用道具 举报

balrog 发表于 2015-11-22 16:40:16
很不错的讲解
回复

使用道具 举报

tang 发表于 2015-11-27 17:05:23
很强大的例子
回复

使用道具 举报

fasbit 发表于 2017-11-13 16:20:03
本帖最后由 fasbit 于 2017-11-13 16:22 编辑

bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount 10.1.210.50:2181 my-consumer-group replication-threadA,replication-threadB 1
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/11/13 16:16:23 INFO StreamingExamples: Setting log level to [WARN] for streaming example. To override add a custom log4j.properties to the classpath.
17/11/13 16:16:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils
        at org.apache.spark.examples.streaming.JavaKafkaWordCount.main(JavaKafkaWordCount.java:74)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 10 more

初接触spark,请教一下这里如何指定依赖的jar包,谢谢!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条