分享

Spark Streaming 快速入门

本帖最后由 xuanxufeng 于 2016-8-11 18:03 编辑
问题导读

1.Spark Streaming 的作用是什么?
2.Spark Streaming工作原理是什么?
3.spark streaming 中的离散流是什么?





Spark Streaming 是为了支持实时数据流计算而在spark api 扩展实现的流式计算框架.

由于spark rdd 的特点,所以spark streaming 和其他流式计算框架概念上有些区别.

spark streaming 官方列举的数据源获取方式有kafka、flume、Twitter、 ZeroMQ、Kinesis等.当然,由于是spark,rdd获取数据的方式完全可以自行实现,所以数据获取方式是无限的.

简单工作原理如图所示.spark streaming 将数据流切成批数据交由spark 引擎处理.

streamingflow.png


在sbt的项目中要引入spark streaming 的包很简单,只需一句:
[mw_shl_code=bash,true]libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "1.3.1"
[/mw_shl_code]

spark streaming 和 spark一样都是从 context开始,只不过叫 StreamingContext,其可以通过sparkconf或者sparkcontext创建.
[mw_shl_code=bash,true]val ssc = new StreamingContext(conf, Seconds(1))

val ssc = new StreamingContext(sc, Seconds(1))[/mw_shl_code]
第二个参数,如例子中的Seconds(1) 是指以多长的时间段来切割数据流.

一个sparkcontext同时只能有一个spark streaming 计算关联

在JVM中,同一时间只能有一个StreamingContext处于活跃状态

离散流介绍

spark streaming 中的离散流实际是连续的按时间段划分的RDD流,英文叫DStreams.如下图:

streaming-dstream1.png

离散流的操作函数和spark的在形式上是一样的,在内部处理的时候,离散流会将所有的操作分别应用到流中的每一个RDD,比如官方的这幅图:

streaming-dstream-ops2.png

Window
spark streaming 中的Window 概念可理解为按一定时间长度将流多个RDD当作一个具有多个RDD数据的大RDD做处理的结果.如下图

streaming-dstream-window3.png

一般来说操作函数名带有window单词的基本都是有窗口操作的.比如:
TransformationMeaning
window(windowLength, slideInterval)基于源DStream产生的窗口化的批数据计算一个新的DStream
countByWindow(windowLength, slideInterval)返回流中元素的一个滑动窗口数
reduceByWindow(func, windowLength, slideInterval)返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数必须是相关联的以使计算能够正确的并行计算。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每一个key的值均由给定的reduce函数聚集起来。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。你可以用numTasks参数设置不同的任务数
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enter the sliding window, and "inverse reducing" the old data that leave the window. An example would be that of "adding" and "subtracting" counts of keys as the window slides. However, it is applicable to only "invertible reduce functions", that is, those reduce functions which have a corresponding "inverse reduce" function (taken as parameter invFunc. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.
countByValueAndWindow(windowLength, slideInterval, [numTasks])应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率。

window 都有两个必须指定的参数:
  • 窗口长度:窗口的持续时间
  • 滑动的时间间隔:窗口操作执行的时间间隔
这两个参数必须是源DStream的批时间间隔的倍数。比如如下代码:

[mw_shl_code=bash,true]// 每 10 秒计算一次过去30秒的数据
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))[/mw_shl_code]

Checkpoint 介绍

spark streaming作为spark的流式计算扩展,一样具有RDD容错恢复机制。但是由于流式计算的特点,数据是无限流进来的,但是RDD容错机制会根据依赖重新计算丢失的RDD,如果依赖链太长,无疑是个噩梦。
举个例子,我们建立一个spark streaming应用用作老婆统计老公总共的花销,老公每次花销都会通知这个spark streaming应用。比如:
checkpoint-money.png
假如某一天,这个数据存储出现了问题,比如“女儿拆了硬盘”(这是不能的!),RDD数据丢失了,当然你说spark的容错机制很强大,2号丢了,从1号开始重新计算出2号就行了。
但是假如这个应用稳定运行了20年了呢,突然昨天的RDD丢失了,我不是要从20年前从头计算?这是多么可怕的噩梦啊。
所以Checkpoint技术就是spark streaming中解决上述噩梦的常用技术。
使用很简单,只需在容错、可靠的文件系统(HDFS、s3等)中设置一个目录用于保存checkpoint信息即可:


[mw_shl_code=bash,true]val ssc = new StreamingContext(...)   // new context
val lines = ssc.socketTextStream(...) // create DStreams
    ...
ssc.checkpoint(checkpointDirectory)   // set checkpoint directory[/mw_shl_code]

当出现故障恢复时,spark streaming就会直接利用保存下来的checkpoint数据了.

可以通过dstream.checkpoint来设置checkpoint的间隔时间.

但是checkpoint有存储成本,我们不可能吧每个rdd都存下来,也不可能将间隔时间设的很长,所以checkpoint需要好好权衡.推荐来说,checkpoint的间隔时间最好为Dstream的间隔时间的5-10倍大小.



已有(1)人评论

跳转到指定楼层
wx_yZAaN6N2 发表于 2016-8-12 10:37:10
套马杆 套马杆汉子你威武雄壮!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条