本帖最后由 pig2 于 2015-2-11 18:55 编辑
问题导读
1、如何持久化流数据到内存中?
2、什么是Checkpointing?
3、如何监控应用程序?
本文接前篇:
Spark中文手册-编程指南
Spark之一个快速的例子 Spark之基本概念
Spark之基本概念
Spark之基本概念(2)
(一)缓存或持久化
和RDD相似,DStreams也允许开发者持久化流数据到内存中。在DStream上使用persist()方法可以自动地持久化DStream中的RDD到内存中。如果DStream中的数据需要计算多次,这是非常有用的。像reduceByWindow和reduceByKeyAndWindow这种窗口操作、updateStateByKey这种基于状态的操作,持久化是默认的,不需要开发者调用persist()方法。
例如通过网络(如kafka,flume等)获取的输入数据流,默认的持久化策略是复制数据到两个不同的节点以容错。
注意,与RDD不同的是,DStreams默认持久化级别是存储序列化数据到内存中,这将在性能调优章节介绍。更多的信息请看rdd持久化
(二)Checkpointing
一个流应用程序必须全天候运行,所有必须能够解决应用程序逻辑无关的故障(如系统错误,JVM崩溃等)。为了使这成为可能,Spark Streaming需要checkpoint足够的信息到容错存储系统中, 以使系统从故障中恢复。
Metadata checkpointing:保存流计算的定义信息到容错存储系统如HDFS中。这用来恢复应用程序中运行worker的节点的故障。元数据包括 Configuration :创建Spark Streaming应用程序的配置信息 DStream operations :定义Streaming应用程序的操作集合 Incomplete batches:操作存在队列中的未完成的批 Data checkpointing :保存生成的RDD到可靠的存储系统中,这在有状态transformation(如结合跨多个批次的数据)中是必须的。在这样一个transformation中,生成的RDD依赖于之前 批的RDD,随着时间的推移,这个依赖链的长度会持续增长。在恢复的过程中,为了避免这种无限增长。有状态的transformation的中间RDD将会定时地存储到可靠存储系统中,以截断这个依赖链。
元数据checkpoint主要是为了从driver故障中恢复数据。如果transformation操作被用到了,数据checkpoint即使在简单的操作中都是必须的。
何时checkpoint
应用程序在下面两种情况下必须开启checkpoint
使用有状态的transformation。如果在应用程序中用到了updateStateByKey或者reduceByKeyAndWindow,checkpoint目录必需提供用以定期checkpoint RDD。 从运行应用程序的driver的故障中恢复过来。使用元数据checkpoint恢复处理信息。
注意,没有前述的有状态的transformation的简单流应用程序在运行时可以不开启checkpoint。在这种情况下,从driver故障的恢复将是部分恢复(接收到了但是还没有处理的数据将会丢失)。 这通常是可以接受的,许多运行的Spark Streaming应用程序都是这种方式。
怎样配置Checkpointing
在容错、可靠的文件系统(HDFS、s3等)中设置一个目录用于保存checkpoint信息。着可以通过streamingContext.checkpoint(checkpointDirectory)方法来做。这运行你用之前介绍的 有状态transformation。另外,如果你想从driver故障中恢复,你应该以下面的方式重写你的Streaming应用程序。
当应用程序是第一次启动,新建一个StreamingContext,启动所有Stream,然后调用start()方法 当应用程序因为故障重新启动,它将会从checkpoint目录checkpoint数据重新创建StreamingContext
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination() 复制代码
如果checkpointDirectory存在,上下文将会利用checkpoint数据重新创建。如果这个目录不存在,将会调用functionToCreateContext函数创建一个新的上下文,建立DStreams。 请看RecoverableNetworkWordCount例子 。
除了使用getOrCreate,开发者必须保证在故障发生时,driver处理自动重启。只能通过部署运行应用程序的基础设施来达到该目的。在部署章节将有更进一步的讨论。
注意,RDD的checkpointing有存储成本。这会导致批数据(包含的RDD被checkpoint)的处理时间增加。因此,需要小心的设置批处理的时间间隔。在最小的批容量(包含1秒的数据)情况下,checkpoint每批数据会显著的减少 操作的吞吐量。相反,checkpointing太少会导致谱系以及任务大小增大,这会产生有害的影响。因为有状态的transformation需要RDD checkpoint。默认的间隔时间是批间隔时间的倍数,最少10秒。它可以通过dstream.checkpoint 来设置。典型的情况下,设置checkpoint间隔是DStream的滑动间隔的5-10大小是一个好的尝试。
(三)部署应用程序
Requirements
运行一个Spark Streaming应用程序,有下面一些步骤
有管理器的集群-这是任何Spark应用程序都需要的需求,详见部署指南 将应用程序打为jar包-你必须编译你的应用程序为jar包。如果你用spark-submit 启动应用程序,你不需要将Spark和Spark Streaming打包进这个jar包。 如果你的应用程序用到了高级源(如kafka,flume),你需要将它们关联的外部artifact以及它们的依赖打包进需要部署的应用程序jar包中。例如,一个应用程序用到了TwitterUtils,那么就需要将spark-streaming-twitter_2.10 以及它的所有依赖打包到应用程序jar中。 为executors配置足够的内存-因为接收的数据必须存储在内存中,executors必须配置足够的内存用来保存接收的数据。注意,如果你正在做10分钟的窗口操作,系统的内存要至少能保存10分钟的数据。所以,应用程序的内存需求依赖于使用 它的操作。 配置checkpointing-如果stream应用程序需要checkpointing,然后一个与Hadoop API兼容的容错存储目录必须配置为检查点的目录,流应用程序将checkpoint信息写入该目录用于错误恢复。 配置应用程序driver的自动重启-为了自动从driver故障中恢复,运行流应用程序的部署设施必须能监控driver进程,如果失败了能够重启它。不同的集群管理器,有不同的工具得到该功能 Spark Standalone:一个Spark应用程序driver可以提交到Spark独立集群运行,也就是说driver运行在一个worker节点上。进一步来看,独立的集群管理器能够被指示用来监控driver,并且在driver失败(或者是由于非零的退出代码如exit(1), 或者由于运行driver的节点的故障)的情况下重启driver。 YARN:YARN为自动重启应用程序提供了类似的机制。 Mesos: Mesos可以用Marathon 提供该功能 配置write ahead logs-在Spark 1.2中,为了获得极强的容错保证,我们引入了一个新的实验性的特性-预写日志(write ahead logs)。如果该特性开启,从receiver获取的所有数据会将预写日志写入配置的checkpoint目录。 这可以防止driver故障丢失数据,从而保证零数据丢失。这个功能可以通过设置配置参数spark.streaming.receiver.writeAheadLogs.enable为true来开启。然而,这些较强的语义可能以receiver的接收吞吐量为代价。这可以通过 并行运行多个receiver增加吞吐量来解决。另外,当预写日志开启时,Spark中的复制数据的功能推荐不用,因为该日志已经存储在了一个副本在存储系统中。可以通过设置输入DStream的存储级别为StorageLevel.MEMORY_AND_DISK_SER获得该功能。
升级应用程序代码
如果运行的Spark Streaming应用程序需要升级,有两种可能的方法
启动升级的应用程序,使其与未升级的应用程序并行运行。一旦新的程序(与就程序接收相同的数据)已经准备就绪,旧的应用程序就可以关闭。这种方法支持将数据发送到两个不同的目的地(新程序一个,旧程序一个) 首先,平滑的关闭(StreamingContext.stop(...)或JavaStreamingContext.stop(...))现有的应用程序。在关闭之前,要保证已经接收的数据完全处理完。然后,就可以启动升级的应用程序,升级 的应用程序会接着旧应用程序的点开始处理。这种方法仅支持具有源端缓存功能的输入源(如flume,kafka),这是因为当旧的应用程序已经关闭,升级的应用程序还没有启动的时候,数据需要被缓存。
(四)监控应用程序
除了Spark的监控功能,Spark Streaming增加了一些专有的功能。应用StreamingContext的时候,Spark web UI
显示添加的Streaming菜单,用以显示运行的receivers(receivers是否是存活状态、接收的记录数、receiver错误等)和完成的批的统计信息(批处理时间、队列等待等待)。这可以用来监控 流应用程序的处理过程。
在WEB UI中的Processing Time和Scheduling Delay两个度量指标是非常重要的。第一个指标表示批数据处理的时间,第二个指标表示前面的批处理完毕之后,当前批在队列中的等待时间。如果 批处理时间比批间隔时间持续更长或者队列等待时间持续增加,这就预示系统无法以批数据产生的速度处理这些数据,整个处理过程滞后了。在这种情况下,考虑减少批处理时间。
Spark Streaming程序的处理过程也可以通过StreamingListener 接口来监控,这 个接口允许你获得receiver状态和处理时间。注意,这个接口是开发者API,它有可能在未来提供更多的信息。
未完待续
相关内容:
Spark中文手册1-编程指南
http://www.aboutyun.com/thread-11413-1-1.html
Spark中文手册2:Spark之一个快速的例子
http://www.aboutyun.com/thread-11484-1-1.html
Spark中文手册3:Spark之基本概念
http://www.aboutyun.com/thread-11502-1-1.html
Spark中文手册4:Spark之基本概念(2)
http://www.aboutyun.com/thread-11516-1-1.html
Spark中文手册6:Spark-sql由入门到精通
http://www.aboutyun.com/thread-11562-1-1.html
Spark中文手册7:Spark-sql由入门到精通【续】
http://www.aboutyun.com/thread-11575-1-1.html
Spark中文手册8:spark GraphX编程指南(1)
http://www.aboutyun.com/thread-11589-1-1.html
Spark中文手册9:spark GraphX编程指南(2)
http://www.aboutyun.com/thread-11601-1-1.html
Spark中文手册10:spark部署:提交应用程序及独立部署模式
http://www.aboutyun.com/thread-11615-1-1.html
Spark中文手册11:Spark 配置指南
http://www.aboutyun.com/thread-10652-1-1.html