分享

about云日志分析项目准备11:spark streaming 接收 flume 监控目录的日志文件

Oner 发表于 2017-3-18 18:45:50 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 0 3717
本帖最后由 Oner 于 2017-3-19 00:55 编辑


接上篇:
这一篇主要讲解从日志文件的流向,即从flume-->kafka-->spark streaming。

启动相关进程

首先,我们需要确保启动相应的进程。如果启动了可以忽略。

启动hdfs

登录master机器

  1. start-dfs.sh
复制代码

在master机器上使用 jps 命令查看相关进程
QQ截图20170312215509.jpg

master机器上出现NameNode和 SecondaryNameNode说明master节点启动正常。然后登录slave1和slave2机器,使用 jps 命令查看相关进程。
QQ截图20170312215741.jpg

slave1和slave2节点出现DataNode说明slave节点启动正常。


启动spark

登录master机器
  1. start-master.sh
复制代码
使用jps命令查看进程
QQ截图20170312220033.jpg

master节点出现 Master 进程说明spark的master节点启动成功。


然后在master机器上继续输入以下命令:
  1. start-slaves.sh
复制代码
QQ截图20170312220319.jpg


使用jps命令在master机器、slave1机器和slave2机器上
QQ截图20170312220645.jpg

如果每台机器上都能出现 Worker 进程,说明spark的worker节点启动成功。

启动zookeeper

分别登录master机器、slave1机器和slave2机器,输入以下命令:

  1. zkServer.sh start
复制代码

然后在每台机器上输入jps 命令查看相关进程。
QQ截图20170312221217.jpg

如果每台机上都出现 QuorumPeerMain 进程,说明 zookeeper 启动成功。

启动kafka

分别登录master机器、slave1机器和slave2机器,输入以下命令:

  1. cd $KAFKA_HOME
  2. kafka-server-start.sh -daemon ./config/server.properties
复制代码

启动之后,使用jps命令查看相关进程: QQ截图20170318235737.jpg

如果每台机上都出现 Kafka 进程,说明 Kafka 启动成功。


启动flume

参考 about云日志分析项目准备9:Flume安装和使用 这篇中的Flume使用一节。

我们登录master机器。然后按照参考内容,将 flume source的监控目录设置为 /data/aboutyunlog 目录,sink的输出设为 kafka 的 aboutyunlog 这个topic。

之后创建相关目录(如果没有的话)

再之后在kafka上创建 aboutyunlog 这个topic(如果没有的话)。
  1. kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic aboutyunlog --replication-factor 1 --partitions 3
复制代码

完成上述步骤后,启动flume

  1. nohup flume-ng agent --conf-file /data/flume-1.6.0/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console &
复制代码

nohup 是一个可以用来启动后台进程的命令。

启动之后,使用jps命令查看相关进程:

QQ截图20170319000026.jpg

如果可以看到 Application 这个进程,说明 flume-ng 启动成功。

注意:我们只是在master上启动了flume,用来监控master机器上的/data/aboutyunlog目录。

然后我们将我们的示例日志写入aboutyunlog.example文件中。
  1. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 352 1057 31
  2. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 352 1058 31
  3. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 370 1057 31
  4. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 370 1054 31
  5. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 370 1054 31
  6. 2017-02-05 09:42:04 GET /plugin.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 925 1072 140
  7. 2017-02-05 09:42:04 GET /uc_server/data/avatar/000/00/55/20_avatar_middle.jpg 58.211.2.60 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 http://www.aboutyun.com/thread-7746-1-1.html www.aboutyun.com 0 6430 1156 109
复制代码

将 aboutyunlog.example 文件移动到 flume 监控目录:
  1. mv aboutyunlog.example  /data/aboutyunlog/
复制代码
经过1分钟后,我们可以查看 /data/aboutyunlog目录,会发现flume已经将该文件标记为 “已完成” 状态,也就是说,flume已经将消息发送到了kafka的aboutyunlog这个topic下。
QQ截图20170319004726.jpg

编写 Streaming 代码

接下来我们在 idea 中编写使用spark streaming读取kafka 中 aboutyunlog这个topic的消息的代码。
  1. import kafka.api.OffsetRequest
  2. import kafka.message.MessageAndMetadata
  3. import kafka.serializer.StringDecoder
  4. import org.apache.log4j.{Level, Logger}
  5. import org.apache.spark.SparkConf
  6. import org.apache.spark.streaming.kafka.KafkaUtils
  7. import org.apache.spark.streaming.{Seconds, StreamingContext}

  8. /**
  9.   * Created by wangwei01 on 2017/3/12.
  10.   */
  11. object StreamingReadData {
  12.   Logger.getRootLogger.setLevel(Level.WARN)
  13.   def main(args: Array[String]): Unit = {

  14.     // 创建SparkConf对象,并指定AppName和Master
  15.     val conf = new SparkConf()
  16.           .setAppName("StreamingReadData")
  17.           .setMaster("local")

  18.     // 创建StreamingContext对象
  19.     val ssc = new StreamingContext(conf, Seconds(10))

  20. //    val zkServers = "master:2181,slave1:2181,slave2:2181"
  21.     // 注意:需要在本机的hosts文件中添加 master/slave1/slave2对应的ip
  22.     val brokers = "master:9092,slave1:9092,slave2:9092"

  23.     val topics = "aboutyunlog"
  24.     val groupId = "consumer_001"

  25.     val topicsSet = topics.split(",").toSet
  26.     val kafkaParams = Map[String, String](
  27.       "metadata.broker.list" -> brokers,
  28.       "group.id" -> groupId,
  29.       "auto.offset.reset" -> OffsetRequest.SmallestTimeString  // 说明每次程序启动,从kafka中最开始的第一条消息开始读取
  30.     )

  31.     val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  32.       ssc, kafkaParams, topicsSet).map(_._2)

  33.     messages.print()

  34.     ssc.start()
  35.     ssc.awaitTermination()
  36.   }
  37. }
复制代码

然后启动 spark streaming 程序,我们会看到类似的输出:
QQ截图20170319005002.jpg

这说明我们的spark streaming 程序已经正确读取了从kafka中的消息。

没找到任何评论,期待你打破沉寂

关闭

推荐上一条 /2 下一条