本帖最后由 Oner 于 2017-3-19 00:55 编辑
接上篇:
这一篇主要讲解从日志文件的流向,即从flume-->kafka-->spark streaming。
启动相关进程
首先,我们需要确保启动相应的进程。如果启动了可以忽略。
启动hdfs
登录master机器
在master机器上使用 jps 命令查看相关进程
master机器上出现NameNode和 SecondaryNameNode说明master节点启动正常。然后登录slave1和slave2机器,使用 jps 命令查看相关进程。
slave1和slave2节点出现DataNode说明slave节点启动正常。
启动spark
登录master机器
使用jps命令查看进程
master节点出现 Master 进程说明spark的master节点启动成功。
然后在master机器上继续输入以下命令:
使用jps命令在master机器、slave1机器和slave2机器上
如果每台机器上都能出现 Worker 进程,说明spark的worker节点启动成功。
启动zookeeper
分别登录master机器、slave1机器和slave2机器,输入以下命令:
然后在每台机器上输入jps 命令查看相关进程。
如果每台机上都出现 QuorumPeerMain 进程,说明 zookeeper 启动成功。
启动kafka
分别登录master机器、slave1机器和slave2机器,输入以下命令:
- cd $KAFKA_HOME
- kafka-server-start.sh -daemon ./config/server.properties
复制代码
启动之后,使用jps命令查看相关进程:
如果每台机上都出现 Kafka 进程,说明 Kafka 启动成功。
启动flume
参考 about云日志分析项目准备9:Flume安装和使用 这篇中的Flume使用一节。
我们登录master机器。然后按照参考内容,将 flume source的监控目录设置为 /data/aboutyunlog 目录,sink的输出设为 kafka 的 aboutyunlog 这个topic。
之后创建相关目录(如果没有的话)
再之后在kafka上创建 aboutyunlog 这个topic(如果没有的话)。
- kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic aboutyunlog --replication-factor 1 --partitions 3
复制代码
完成上述步骤后,启动flume
- nohup flume-ng agent --conf-file /data/flume-1.6.0/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console &
复制代码
nohup 是一个可以用来启动后台进程的命令。
启动之后,使用jps命令查看相关进程:
如果可以看到 Application 这个进程,说明 flume-ng 启动成功。
注意:我们只是在master上启动了flume,用来监控master机器上的/data/aboutyunlog目录。
然后我们将我们的示例日志写入aboutyunlog.example文件中。
- 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 352 1057 31
- 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 352 1058 31
- 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 370 1057 31
- 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 370 1054 31
- 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 370 1054 31
- 2017-02-05 09:42:04 GET /plugin.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 925 1072 140
- 2017-02-05 09:42:04 GET /uc_server/data/avatar/000/00/55/20_avatar_middle.jpg 58.211.2.60 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 6430 1156 109
复制代码
将 aboutyunlog.example 文件移动到 flume 监控目录:
- mv aboutyunlog.example /data/aboutyunlog/
复制代码 经过1分钟后,我们可以查看 /data/aboutyunlog目录,会发现flume已经将该文件标记为 “已完成” 状态,也就是说,flume已经将消息发送到了kafka的aboutyunlog这个topic下。
编写 Streaming 代码
接下来我们在 idea 中编写使用spark streaming读取kafka 中 aboutyunlog这个topic的消息的代码。
- import kafka.api.OffsetRequest
- import kafka.message.MessageAndMetadata
- import kafka.serializer.StringDecoder
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.kafka.KafkaUtils
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- /**
- * Created by wangwei01 on 2017/3/12.
- */
- object StreamingReadData {
- Logger.getRootLogger.setLevel(Level.WARN)
- def main(args: Array[String]): Unit = {
- // 创建SparkConf对象,并指定AppName和Master
- val conf = new SparkConf()
- .setAppName("StreamingReadData")
- .setMaster("local")
- // 创建StreamingContext对象
- val ssc = new StreamingContext(conf, Seconds(10))
- // val zkServers = "master:2181,slave1:2181,slave2:2181"
- // 注意:需要在本机的hosts文件中添加 master/slave1/slave2对应的ip
- val brokers = "master:9092,slave1:9092,slave2:9092"
- val topics = "aboutyunlog"
- val groupId = "consumer_001"
- val topicsSet = topics.split(",").toSet
- val kafkaParams = Map[String, String](
- "metadata.broker.list" -> brokers,
- "group.id" -> groupId,
- "auto.offset.reset" -> OffsetRequest.SmallestTimeString // 说明每次程序启动,从kafka中最开始的第一条消息开始读取
- )
- val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
- ssc, kafkaParams, topicsSet).map(_._2)
- messages.print()
- ssc.start()
- ssc.awaitTermination()
- }
- }
复制代码
然后启动 spark streaming 程序,我们会看到类似的输出:
这说明我们的spark streaming 程序已经正确读取了从kafka中的消息。
|
|