分享

about云日志分析项目准备10-3:Spark Local模式之Log文本清洗

本帖最后由 pig2 于 2017-8-23 16:41 编辑
问题导读:
1. Spark 有哪几种运行方式?
2. 如何初始化StreamingContext?
3. 如何用scala 写正则表达式进行文本清洗?
4. 待解决的问题有哪些?




解决方案:


Spark运行模式

Spark的运行模式多种多样,在单机上可以以Local和伪分布式模式运行;部署在集群上时有Spark内建的Standalone模式及对于外部框架的支持,有Mesos模式和Spark On YARN模式,这两者主要区别是资源管理交由谁负责。

  • Local模式

本地运行,不加任何配置,默认为Local模式。


  • Standalone模式

Standalone模式中Spark集群中Master和Worker节点构成。用户程序通过与Master节点交互,申请所需资源,Worker节点负责具体Executor的启动运行。


  • Local Cluster模式

Local Cluster即伪分布式模式,是基于Standalone模式实现的,启动Master(主进程)和Worker(工作进程)的位置全部在本地。
  • YARN Standalone/YARN Cluster模式

通过Hadoop YARN框架来调度Spark应用所需资源。




Spark On YARN提交常用参数:

2017-03-02_151256.jpg

初始化StreamingContext

为了初始化Spark Streaming程序,一个StreamingContext对象必需被创建,它是Spark Streaming所有流操作的主要入口。一个StreamingContext对象可以用SparkConf对象创建。


  1. import org.apache.spark._
  2. import org.apache.spark.streaming._
  3. val conf = new SparkConf().setAppName(appName).setMaster(master)
  4. val ssc = new StreamingContext(conf, Seconds(1))
复制代码


appName表示你的应用程序显示在集群UI上的名字,master是一个Spark、Mesos、YARN集群URL或者一个特殊字符串“local”,它表示程序用本地模式运行。当程序运行在集群中时,你并不希望在程序中硬编码master,而是希望用spark-submit启动应用程序,并从spark-submit中得到master的值。对于本地测试或者单元测试,你可以传递“local”字符串在同一个进程内运行Spark Streaming。需要注意的是,它在内部创建了一个SparkContext对象,你可以通过ssc.sparkContext访问这个SparkContext对象。

批时间片需要根据你的程序的潜在需求以及集群的可用资源来设定,你可以在性能调优那一节获取详细的信息。

可以利用已经存在的SparkContext对象创建StreamingContext对象。

  1. import org.apache.spark.streaming._
  2. val sc = ...                // existing SparkContext
  3. val ssc = new StreamingContext(sc, Seconds(1))
复制代码


当一个上下文(context)定义之后,你必须按照以下几步进行操作

  • 定义输入源;
  • 准备好流计算指令;
  • 利用streamingContext.start()方法接收和处理数据;
  • 处理过程将一直持续,直到streamingContext.stop()方法被调用。

几点需要注意的地方:

  • 一旦一个context已经启动,就不能有新的流算子建立或者是添加到context中。
  • 一旦一个context已经停止,它就不能再重新启动
  • 在JVM中,同一时间只能有一个StreamingContext处于活跃状态
  • 在StreamingContext上调用stop()方法,也会关闭SparkContext对象。如果只想仅关闭StreamingContext对象,设置stop()的可选参数为false
  • 一个SparkContext对象可以重复利用去创建多个StreamingContext对象,前提条件是前面的StreamingContext在后面StreamingContext创建之前关闭(不关闭SparkContext)。




scala正则表达式实现Local 模式 Spark 文本清洗

日志:
  1. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url=http://www.aboutyun.com/thread-7746-1-1.html]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url=http://www.aboutyun.com]www.aboutyun.com[/url] 0 352 1057 31
  2. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url=http://www.aboutyun.com/thread-7746-1-1.html]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url=http://www.aboutyun.com]www.aboutyun.com[/url] 0 352 1058 31
  3. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url=http://www.aboutyun.com/thread-7746-1-1.html]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url=http://www.aboutyun.com]www.aboutyun.com[/url] 0 370 1057 31
  4. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url=http://www.aboutyun.com/thread-7746-1-1.html]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url=http://www.aboutyun.com]www.aboutyun.com[/url] 0 370 1054 31
复制代码


实现:

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.storage.StorageLevel
  3. import org.apache.spark.streaming.{Seconds, StreamingContext}

  4. import scala.util.matching.Regex

  5. /**
  6.   * Created by PeersLee on 2017/2/8.
  7.   */
  8. object ExtractFromLocal {
  9.         def main(args: Array[String]): Unit = {
  10.                 //创建SparkConf,设置AppName和Master
  11.                 // var:声明变量,val:声明常量
  12.                 val conf = new SparkConf().setAppName("ExtractFromLocal").setMaster("local")
  13.                 // 创建一个具有n个线程10秒间隔时间的本地StreamingContext
  14.                 val ssc = new StreamingContext(conf, Seconds(10))
  15.                 // 上面我们已经创建一个DStream(连续的数据流),这个ssc可以从TCP源(ip : localhost, port : 9999) 获取流数据,
  16.                 // 开始获取数据
  17.                 val hostname = "192.168.1.10"
  18.                 val port = 9999
  19.                 val lines = ssc.socketTextStream(hostname, port, StorageLevel.MEMORY_AND_DISK_SER)
  20.                 // 处理文本
  21.                 // 时间
  22.                 val timePat = new Regex("(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2})")
  23.                 // ip
  24.                 val ipPat = new Regex("((\\d{1,3}\\.){3}\\d{1,3})")
  25.                 // url
  26.                 val urlPat = new Regex("((http|https)://.*?)\\s")
  27.                 /*
  28.                 val nestedNumbers = List(List(1, 2), List(3, 4))
  29.                 nestedNumbers.flatMap(x => x.map(_ * 2))
  30.                 flatMap需要一个处理嵌套列表的函数,然后将结果串连起来。
  31.                  nestedNumbers.map((x: List[Int]) => x.map(_ * 2)).flatten
  32.                  */
  33.                  val words = lines.flatMap(x => List(timePat.findFirstIn(x.toString),  ipPat.findFirstIn(x.toString), urlPat.findFirstIn(x.toString)))
  34.                  words.print()
  35.                 //开始执行,并在控制台等待
  36.                 ssc.start()
  37.                 ssc.awaitTermination()
  38.         }
  39. }
复制代码
  1. # peerslee @ peersleeLoveJiaLee in ~ [21:51:53] C:1
  2. $ nc -lk 9998
  3. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url=http://www.aboutyun.com/thread-7746-1-1.html]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url=http://www.aboutyun.com/thread-7746-1-1.html]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url=http://www.aboutyun.com]http://www.aboutyun.com[/url] [url=http://www.aboutyun.com]www.aboutyun.com[/url] 0 370 1054 31
复制代码


2017-03-09 21-58-32 的屏幕截图.jpg

待解决的问题:

  • 在intellij idea 使用 SBT 创建 Spark Streaming + kafka 项目,应该添加哪些依赖?
  • 如何在intellij idea 上打包 scala spark 项目?我打包之后跑起来总是出这个问题...
2017-03-02_153742.jpg

没找到任何评论,期待你打破沉寂

关闭

推荐上一条 /2 下一条