本帖最后由 pig2 于 2017-8-23 16:41 编辑
问题导读:
1. Spark 有哪几种运行方式?
2. 如何初始化StreamingContext?
3. 如何用scala 写正则表达式进行文本清洗?
4. 待解决的问题有哪些?
解决方案:
Spark运行模式
Spark的运行模式多种多样,在单机上可以以Local和伪分布式模式运行;部署在集群上时有Spark内建的Standalone模式及对于外部框架的支持,有Mesos模式和Spark On YARN模式,这两者主要区别是资源管理交由谁负责。
本地运行,不加任何配置,默认为Local模式。
Standalone模式中Spark集群中Master和Worker节点构成。用户程序通过与Master节点交互,申请所需资源,Worker节点负责具体Executor的启动运行。
Local Cluster即伪分布式模式,是基于Standalone模式实现的,启动Master(主进程)和Worker(工作进程)的位置全部在本地。
- YARN Standalone/YARN Cluster模式
通过Hadoop YARN框架来调度Spark应用所需资源。
Spark On YARN提交常用参数:
初始化StreamingContext
为了初始化Spark Streaming程序,一个StreamingContext对象必需被创建,它是Spark Streaming所有流操作的主要入口。一个StreamingContext对象可以用SparkConf对象创建。
- import org.apache.spark._
- import org.apache.spark.streaming._
- val conf = new SparkConf().setAppName(appName).setMaster(master)
- val ssc = new StreamingContext(conf, Seconds(1))
复制代码
appName表示你的应用程序显示在集群UI上的名字,master是一个Spark、Mesos、YARN集群URL或者一个特殊字符串“local”,它表示程序用本地模式运行。当程序运行在集群中时,你并不希望在程序中硬编码master,而是希望用spark-submit启动应用程序,并从spark-submit中得到master的值。对于本地测试或者单元测试,你可以传递“local”字符串在同一个进程内运行Spark Streaming。需要注意的是,它在内部创建了一个SparkContext对象,你可以通过ssc.sparkContext访问这个SparkContext对象。
批时间片需要根据你的程序的潜在需求以及集群的可用资源来设定,你可以在性能调优那一节获取详细的信息。
可以利用已经存在的SparkContext对象创建StreamingContext对象。
- import org.apache.spark.streaming._
- val sc = ... // existing SparkContext
- val ssc = new StreamingContext(sc, Seconds(1))
复制代码
当一个上下文(context)定义之后,你必须按照以下几步进行操作
- 定义输入源;
- 准备好流计算指令;
- 利用streamingContext.start()方法接收和处理数据;
- 处理过程将一直持续,直到streamingContext.stop()方法被调用。
几点需要注意的地方:
- 一旦一个context已经启动,就不能有新的流算子建立或者是添加到context中。
- 一旦一个context已经停止,它就不能再重新启动
- 在JVM中,同一时间只能有一个StreamingContext处于活跃状态
- 在StreamingContext上调用stop()方法,也会关闭SparkContext对象。如果只想仅关闭StreamingContext对象,设置stop()的可选参数为false
- 一个SparkContext对象可以重复利用去创建多个StreamingContext对象,前提条件是前面的StreamingContext在后面StreamingContext创建之前关闭(不关闭SparkContext)。
scala正则表达式实现Local 模式 Spark 文本清洗
日志:
- 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url=http://www.aboutyun.com/thread-7746-1-1.html]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url=http://www.aboutyun.com]www.aboutyun.com[/url] 0 352 1057 31
- 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url=http://www.aboutyun.com/thread-7746-1-1.html]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url=http://www.aboutyun.com]www.aboutyun.com[/url] 0 352 1058 31
- 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url=http://www.aboutyun.com/thread-7746-1-1.html]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url=http://www.aboutyun.com]www.aboutyun.com[/url] 0 370 1057 31
- 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url=http://www.aboutyun.com/thread-7746-1-1.html]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url=http://www.aboutyun.com]www.aboutyun.com[/url] 0 370 1054 31
复制代码
实现:
- import org.apache.spark.SparkConf
- import org.apache.spark.storage.StorageLevel
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import scala.util.matching.Regex
- /**
- * Created by PeersLee on 2017/2/8.
- */
- object ExtractFromLocal {
- def main(args: Array[String]): Unit = {
- //创建SparkConf,设置AppName和Master
- // var:声明变量,val:声明常量
- val conf = new SparkConf().setAppName("ExtractFromLocal").setMaster("local")
- // 创建一个具有n个线程10秒间隔时间的本地StreamingContext
- val ssc = new StreamingContext(conf, Seconds(10))
- // 上面我们已经创建一个DStream(连续的数据流),这个ssc可以从TCP源(ip : localhost, port : 9999) 获取流数据,
- // 开始获取数据
- val hostname = "192.168.1.10"
- val port = 9999
- val lines = ssc.socketTextStream(hostname, port, StorageLevel.MEMORY_AND_DISK_SER)
- // 处理文本
- // 时间
- val timePat = new Regex("(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2})")
- // ip
- val ipPat = new Regex("((\\d{1,3}\\.){3}\\d{1,3})")
- // url
- val urlPat = new Regex("((http|https)://.*?)\\s")
- /*
- val nestedNumbers = List(List(1, 2), List(3, 4))
- nestedNumbers.flatMap(x => x.map(_ * 2))
- flatMap需要一个处理嵌套列表的函数,然后将结果串连起来。
- nestedNumbers.map((x: List[Int]) => x.map(_ * 2)).flatten
- */
- val words = lines.flatMap(x => List(timePat.findFirstIn(x.toString), ipPat.findFirstIn(x.toString), urlPat.findFirstIn(x.toString)))
- words.print()
- //开始执行,并在控制台等待
- ssc.start()
- ssc.awaitTermination()
- }
- }
复制代码- # peerslee @ peersleeLoveJiaLee in ~ [21:51:53] C:1
- $ nc -lk 9998
- 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url=http://www.aboutyun.com/thread-7746-1-1.html]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url=http://www.aboutyun.com/thread-7746-1-1.html]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url=http://www.aboutyun.com]http://www.aboutyun.com[/url] [url=http://www.aboutyun.com]www.aboutyun.com[/url] 0 370 1054 31
复制代码
待解决的问题:
- 在intellij idea 使用 SBT 创建 Spark Streaming + kafka 项目,应该添加哪些依赖?
- 如何在intellij idea 上打包 scala spark 项目?我打包之后跑起来总是出这个问题...
|
|