分享

Spark2.1性能调优及容错语义

fc013 2017-5-20 19:11:20 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 8885


问题导读:

1.怎样最小化每个批数据的处理时间?
2.怎样找出一个合适的batch间隔?
3.当SparkStreaming应用程序发生故障时,该如何应对?







性能调优

如果想要群集上的SparkStreaming应用程序中获得最佳性能,你需要进行一些优化操作。本节介绍了一些参数和配置,可以通过调整这些参数和配置以提高应用程序性能。在高层次上,你需要考虑两件事情:

1、通过有效利用集群资源减少每批数据的处理时间。
2、设置正确的batch size,使得每批数据的处理速度可以像接收速度那样快速(即数据处理与数据摄取保持一致)。
减少批处理时间

在Spark中可以进行一些优化操作,以最小化每个批数据的处理时间。这些在“优化指南”中有详细的讨论。本节重点介绍一些最重要的。

数据接收的并行级别

通过网络接收的数据(如Kafka,Flume,Socket等)首先被反序列化并存储在Spark中。如果数据接收成为系统中的瓶颈,则考虑并行化数据接收。请注意,每个输入DStream都会创建一个接收单个数据流的接收器(在worker机器上运行)。因此,可以通过创建多个输入DStream并配置它们分别接收流数据的数据源的的不同分区来实现同时接收多个数据流。例如,原来单个输入DStream接收Kafka中两个topic的数据,现在可以创建两个输入DStream,每个只接收一个topic的数据。这将运行两个接收器,允许并行接收数据,从而提高总体吞吐量。这些多个DStream可以联合起来生成一个DStream。然后,应用于单个输入DStream的转换可以被应用于统一后的数据流。可以通过如下操作来实现:

[mw_shl_code=java,true]val numStreams=5  
val kafkaStreams=(1tonumStreams).map{i=>KafkaUtils.createStream(...)}  
val unifiedStream=streamingContext.union(kafkaStreams)  
unifiedStream.print()  [/mw_shl_code]

另一个你应当考虑的参数是接收器的块(block)间隔,这个参数由配置参数spark.streaming.blockInterval决定。对于大多数的接收器,接收到的数据首先被合并为一个数据块,然后存储到Spark的内存中。每个批次中的数据块的数目决定了在使用类似于map的转换操作处理接收的数据时的task数量。每个接收器中每批数据的任务数量将约为(批间隔/块间隔)。例如,200 ms的块间隔将每2秒批次创建10个任务。如果任务数量太少(即小于每台机器的cpu核心数),则无法充分利用所有可用的cpu内核来处理数据,所以效率会降低。如果想要增加给定批间隔的任务数量,请减少块间隔。但是,建议的块间隔的最小值约为50 ms,低于此值时任务启动的开销会占比过大。

使用多个输入流或接收器接收数据的替代方法是手动对输入数据流重新进行分区(使用inputStream.repartition(<数量的分区>))。在进一步的数据处理操作之前,spark会将接收到的batch数据分发到集群中指定数量的计算机上。

数据处理的并行度

在数据处理的任何阶段,如果启动的并行任务数量不够多,则集群资源就可能未得到充分利用。例如,对于诸如reduceByKey和reduceByKeyAndWindow之类的分布式聚合操作,默认并行任务数由spark.default.parallelism配置属性控制。您可以通过传递参数(参见PairDStreamFunctions文档),或者重新设置spark.default.parallelism配置属性的值以更改默认并行度。

数据序列化

可以通过改变序列化格式来减少数据序列化的开销。在流式传输的情况下,有两种类型的数据会被序列化。

输入数据:在默认情况下,通过Receivers接收的输入数据会以StorageLevel.MEMORY_AND_DISK_SER_2持久化级别存储在executor的存储器中。也就是说,将数据序列化为字节形式以减少GC开销,并对数据进行复制以对executor故障进行容错。此外,数据首先会保留在内存中,并且只有在内存不足以容纳流计算所需的所有输入数据时才会溢出到磁盘。这个序列化过程显然有一定的开销 - 接收器必须先将接收的数据反序列化,然后使用Spark的序列化格式重新序列化它。

由流操作生成的持久RDD:通过流式计算生成的RDD可能被缓存在内存中。例如,窗口操作会将数据保留在内存中,因为它们将被多次处理。在Spark core 中使用StorageLevel.MEMORY_ONLY作为默认的持久化级别,但是在Spark Streaming中,默认情况下,通过流式计算生成的持久化RDD将通过StorageLevel.MEMORY_ONLY_SER(即序列化)级别进行持久化,以最大程度降低GC开销。

在上述两种情况下,使用Kryo序列化格式可以减少CPU和内存开销。有关更多详细信息,请参阅“Spark调优指南”。对于Kryo,请考虑注册自定义类,并禁用对象引用跟踪(请参阅配置指南中的Kryo相关配置)。

在streaming应用程序需要保留的数据量不大的特定情况下,可以将数据(上述的两种类型都包括)作为反序列化对象持久化,而不会导致过多的GC开销。例如,如果您使用数秒钟作为批次间隔并且没有窗口操作,那么可以通过显示地设置相应的存储级别来禁用数据的序列化。这将减少序列化的CPU开销,潜在地提高性能,而且不需要太多的GC开销。

任务启动开销

如果每秒启动的任务数量很多(例如大于50个),那么向slave节点发送任务的开销可能很大,并且将难以实现亚秒级延迟。您可以通过以下更改减少开销:

执行模式:在standalone或粗粒度Mesos模式下运行Spark可以获得比细粒度的Mesos模式更短的任务启动时间。有关详细信息,请参阅“Spark on Mesos”指南。

这些更改可能会将批处理时间缩短数百毫秒,从而允许亚秒级的batch size可行。

设置恰当的batch间隔

为了使群集上的SparkStreaming应用程序能够稳定运行,系统处理数据的速度应该能够大于等于数据接收的速度。换句话说,每个batch的处理速度应该像每个batch的生成一样快速。通过监视streaming用户界面中显示的batch处理时间,可以检查批处理时间是否小于批处理间隔。

根据流式计算的性质,在一组固定的集群资源上,所使用的批次间隔会对应用程序所能维持的数据处理速率有很大的影响。例如,在之前的WordCountNetwork示例中,对于特定数据速率,系统能够每2秒跟踪报告单词计数(即,2秒的批次间隔),但不能每500毫秒报告一次。因此,需要谨慎的设置批次间隔,以确保生产系统可以维持预期的数据处理速率。

要找出一个合适的批次间隔,一个不错的方法是先以保守的批次间隔(例如5-10秒)和低数据速率进行测试。要验证系统是否能够跟上数据速率,您可以检查每个处理批次遇到的端到端的延迟(或查找Spark驱动程序log4j日志中的“总延迟”,或者使用StreamingListener接口)。如果延迟一直与批次间隔大小相当,那么系统是稳定的。否则,如果延迟持续增加,则意味着系统数据处理速度跟不上数据接收速度,因此此时系统是不稳定的。一旦系统稳定下来,您可以尝试逐步增加数据接收速率或减少每批数据的大小。注意,由于数据速率增加而导致的短暂的延迟增长可能是正常情况,只要延迟能够降回一个低值即可(即:小于batch size)。

内存调优

关于Spark应用内存占用和GC调优已经在调优指南(Tuning Guide)中有详细的讨论。强烈建议你阅读那篇文档。本节中,我们仅讨论几个专门用于SparkStreaming的调优参数。

Spark Streaming应用需求的集群内存资源严重依赖于具体使用的tranformation算子。例如,如果想要对最近10分钟的数据进行窗口操作,那么你的集群需要有足够的内存保留10分钟的数据;或者如果你需要使用updateStateByKey操纵大量的key,也需要大量的内存资源。而如果你的应用只是做一个简单的 “映射-过滤-存储”(map-filter-store)操作的话,那需要的内存就很少了。

一般情况下,由于streaming接收器接收到的数据会以StorageLevel.MEMORY_AND_DISK_SER_2 的持久化级别存储到spark中,内存容不下的数据会被溢出到磁盘。数据溢出到磁盘上会大大降低streaming应用的性能,因此建议根据streaming应用处理的数据量,务必提供充足的内存。最好仔细的查看内存使用量并进行相应的估算。

内存调优的另一个方向是垃圾回收。因为streaming应用往往都需要低延迟,所以肯定不希望出现由于JVM垃圾回收而引起的大量暂停。

以下是一些能够帮助你调整内存占用和GC开销的参数:

DStream的持久化级别(Persistence Level of DStreams): 前面数据序列化(Data Serialization)这小节已经提到过,默认streaming的输入RDD会被持久化成序列化的字节流。相比于非序列化数据,这样可以减少内存占用和GC开销。启用Kryo序列化,还可以进一步减少序列化后的数据大小和内存占用量。如果需要进一步减少内存占用,可以开启数据压缩(通过spark.rdd.compress这个配置设定),随之而来的是额外的CPU开销。

清除旧数据(Clearing old data): 默认情况下,所有的输入数据以及DStream的transformation算子产生的持久化RDD都会被自动清理。SparkStreaming会根据所使用的transformation操作来清理旧数据。例如,你正在使用长度为10分钟的窗口操作,那么Spark Streaming会保留至少10分钟的数据,并且会主动把更早的数据都删掉。当然,你可以通过设置streamingContext.remember 参数以保留更长时间段的数据(比如:你可能会需要交互式地查询更早的数据)。

CMS垃圾回收器(CMS Garbage Collector): 为了使GC暂停的时间能始终保持在一个较低的水平,强烈建议您使用“并行标记-扫描”垃圾回收器(CMS, concurrentmark-and-sweep GC)。虽然已知CMS GC会降低系统的总体吞吐量,但我们仍建议使用它,因为CMS GC能使批次处理的时间保持在一个更加恒定的水平上。请确保在驱动器(通过spark-submit中的–driver-Java-options设置)和执行器(使用spark.executor.extraJavaOptions配置参数)上都设置了CMS GC。

其他提示: 如果还想进一步减少GC开销,可以尝试以下建议:

使用OFF_HEAP存储级别来持久化RDD。详见Spark编程指南(Spark Programming Guide)

使用更多但是堆内存更小的执行器进程。这样GC压力就会分散到更多的JVM堆中。

请记住以下要点

·  一个DStream仅与一个接收器相关联。为了实现并行接受数据,你需要创建多个接收器,即多个DStream。一个接收器在运行在一个executor中,这个接收器会占据一个cpu核心。请确保在接收器占据一个核心后还有足够的内核进行数据处理,即spark.cores.max应考虑接收器的核心占用。接收器以循环方式分配给executor。

·  当从流数据源接收数据时,接收器创建数据块。每个blockInterval(以毫秒为单位)生成一个新的数据块,所以在每个batchInterval(批间隔)会生成N个数据块,N=batchInterval / blockInterval。这些块由当前executor的BlockManager分发给其他executor的BlockManager。之后,为了更进一步的数据处理,在驱动程序上运行的网络输入跟踪器会接到有关数据块位置信息的通知。

·  在驱动程序中为在batchInterval期间创建的数据块创建一个RDD。在batchInterval期间生成的每个数据块对应RDD的各个分区。Spark为每个分区生成一个任务。 如果blockInterval 和 batchinterval相等则意味着仅创建单个分区,并且可能在本地进行处理。

·  除非非本地计划启动,否则无论设置怎样的块间隔,数据块上定义的map任务都将直接在数据块所在的executor(一个是接收数据块的executor,另一个是块被复制到的executor)中进行处理。更大的块间隔意味着单个数据块的变大, 如果spark.locality.wait参数的值设置的比较大则会增加数据块在本地节点被处理的几率。你需要在这两个参数(块间隔和spark.locality.wait)之间找到平衡,以确保在本地节点处理较大的数据块。

·  除了依赖于调整batchInterval和blockInterval的值来实现数据的高效处理,您还可以通过调用inputDstream.repartition(n)来对数据重新分区以实现这一点。该操作会重新shuffle RDD中的数据并创建n个分区。虽然shuffle会有一定的开销,但是可以实现并行度的增加。RDD的处理操作作为一个job由driver的作业调度器(jobscheduler)进行调度。在一个特定的时间点,只有一个job是处于活跃状态。所以,如果一个job正在执行,那么其他job将会排队等待。

·  如果你有两个DStream,那么将会生成两个RDD,并且将创建两个被依次执行的作业。为了避免这种情况,你可以对两个dstream执行union操作,该操作会将dstream的两个RDD联合成一个unionRDD。然后这个unionRDD会仅生成一个job。但RDD的分区数量和原来相同。

·  如果批处理时间超过批量间隔,那么接收器的内存将被逐渐耗尽,最终会抛出异常(最可能是BlockNotFoundException)。目前还没有暂停数据接收器的办法。您可以使用SparkConf配置spark.streaming.receiver.maxRate参数来限制接收器的接受速率。
容错语义

在本节中,我们将讨论当SparkStreaming应用程序发生故障时,该如何应对。

背景

在了解SparkStreaming提供的容错语义之前,我们先回顾一下Spark RDD的基本容错语义。

1、RDD是一个不可变的,确定性的可重新计算的分布式数据集。 每个RDD会记住自己的谱系以及创建自身的确定性操作,它的父RDD同样是可容错的。

2、如果RDD的任何分区由于worker节点故障而丢失,则可以使用谱系信息以及原始容错数据集重新计算出该分区。

3、假设所有的RDD转换都是确定的,无论Spark集群出现什么故障,最后一个输出操作产出的RDD中的数据将永远是一样的。

Spark主要操作一些可容错文件系统的数据,如:HDFS或S3。因此,所有从这些可容错数据源获取的RDD也是可容错的。然而,对于Spark Streaming并非如此,因为多数情况下Streaming需要从网络远端接收数据(除非使用fileStream),这时Streaming的数据源就没有那么可靠了。此时要使生成的RDD同样是可容错的,接受的数据就必须复制到集群中不同worker节点上的多个Spark executor(默认副本因子是2)。因此一旦出现故障,系统需要恢复两种数据:

接收并保存了副本的数据 –数据不会因为单个worker节点故障而丢失,因为有副本!

接收但尚未来得及创建副本 –因为数据并没有副本,恢复数据的唯一方法是从数据源重新获取。

此外,还有两种需要考虑的故障类型:

Worker节点故障 – 任何运行executor的worker节点都有可能发生故障,而一旦故障发生,故障节点内存中的数据都会丢失。如果这些节点上有接收器在运行,那么其包含的缓存数据也会丢失。

Driver节点故障 – 如果运行Spark Streaming应用的driver节点发生故障,那么很显然SparkContext对象就会丢失,然后所有executor及其内存数据也会丢失。

有了以上这些基本知识,下面我们就进一步了解一下Spark Streaming的容错语义。

定义

流式数据处理系统的可靠性语义通常是通过系统可以处理每个记录的次数来定义的。系统可以在所有可能的操作情形下提供三种类型的保证(无论出现何种故障)

1、最多一次:每个记录将被处理一次或不处理。

2、至少一次:每个记录将被处理一次或多次。 这比“最多一次”更强,因为它确保不会丢失任何数据。但可能有重复处理。

3、完全一次:每个记录将被精确处理一次 - 不会丢失数据,并且不会多次处理数据。 这显然是三者中最强的保证。

基础语义

广义上来说,任何流式处理系统都会包含以下三个数据处理步骤:

1、数据接收(Receiving the data): 使用接收器或其他方式从数据源接受数据。

2、数据转换(Transforming the data): 使用DStream和RDD transformation算子对接收到的数据进行转换操作。

2、数据推送(Pushing out the data): 将转换后最终数据推送到文件系统,数据库或仪表板等外部系统。

如果Streaming应用需要做到端到端的“精确一次”的保证,那么就必须在每个步骤中都提供 “精确一次”保证:即,每条记录必须,只接收一次、处理一次、推送一次。下面让我们在Spark Streaming的上下文环境中来理解一下这三个步骤的语义:

1、数据接收: 不同数据源提供的保证不同,下一节再详细讨论。

2、数据转换: 归功于RDD提供的保障,所有的数据都会被“精确一次”处理。即使出现故障,只要数据源还能访问,最终所转换得到的RDD总是包含相同的内容。

3、数据推送: 输出操作默认保证“至少一次”的语义,是否能“精确一次”还要看所使用的输出算子(是否幂等)以及下游系统(是否支持事务)。不过用户也可以开发自己的事务机制来实现“精确一次”语义。这个后续会有详细讨论。

接收数据语义

不同的输入源提供不同的数据可靠性保证,从“至少一次”到“精确一次”,继续阅读以了解更多。

从文件接收数据

如果所有输入数据已经存在于像HDFS这样的可容错文件系统中,Spark Streaming总是可以从任何故障中恢复并处理所有数据。这给出了“精确一次”的语义,这意味着无论发生什么故障,所有的数据将被精确地处理一次。

从接收器接收数据

对于基于接收器的输入源,容错语义将同时依赖于故障场景和接收器类型。如前面所述,spark Streaming主要有两种类型的接收器:

可靠接收器 – 这类接收器会在接收数据并保存好副本后,向可靠数据源发送确认信息。如果接收器发生故障,就不会发送缓存(已接收但尚未保存副本)数据的确认信息。因此,一旦接收器重启,数据源会重新发送未确认的数据,从而不会有任何数据丢失。

不可靠接收器 – 这类接收器不会发送确认信息,因此一旦worker和driver出现故障,就有可能会丢失数据。

对于不同的接收器,我们可以获得如下不同的语义。如果一个worker节点发生故障,对于可靠接收器来说,不会有数据丢失。而对于不可靠接收器,已接收但尚未保存副本的数据可能会丢失。如果driver节点发生故障,除了正在缓存的数据之外,其他的已经接收且已经保存了内存副本的数据也都会丢失,这将会影响有状态算子的计算结果。

为了避免丢失已经收到且保存副本的数据,从 spark 1.2 开始引入了WAL(write aheadlogs),以便将这些数据写入到可容错的存储中。只要你使用可靠接收器,同时启用WAL(write aheadlogs enabled),就能实现数据零丢失。并且这时候,还能提供“至少一次”的语义保证。

下表总结了发生故障时的各种语义:

部署场景
Worker故障
Driver故障
Spark 1.1及以前版本 或者
Spark 1.2及以后版本,且未开启WAL
若使用不可靠接收器,则可能丢失缓存(已接收但尚未保存副本)数据;
若使用可靠接收器,则没有数据丢失,且提供至少一次处理语义
若使用不可靠接收器,缓存数据会丢失;
对于两种接收器,已保存数据会丢失,且不提供语义保证
Spark 1.2及以后版本,并启用WAL
若使用可靠接收器,则没有数据丢失,且提供至少一次语义保证
若使用可靠接收器和文件,则无数据丢失,且提供至少一次语义保证

从Kafka Direct API接收数据

从Spark 1.3开始,我们引入了一个新的Kafka Direct API,该API能为Kafka数据源提供“精确一次”语义保证。有了这个API,再加上输出算子的“精确一次”保证,你就能真正实现端到端的“精确一次”语义保证。更详细的说明见:KafkaIntegration Guide

输出算子的语义

输出算子(如foreachRDD)提供“至少一次”语义保证,也就是说,如果worker发生故障,单条输出数据可能会被多次写入外部实体中。不过这对于文件系统来说是可以接受的(使用saveAs***Files 多次保存文件会覆盖之前的),所以我们需要一些额外的工作来实现“精确一次”语义。主要有两种实现方式:

幂等更新(Idempotent updates): 就是说多次操作,产生的结果相同。例如,多次调用saveAs***Files保存的文件总是包含相同的数据。

事务更新(Transactional updates): 所有的更新都是事务性的,这样一来就能保证更新的原子性。以下是一种实现方式:

1、用批次时间(在foreachRDD中可用)和分区索引创建一个唯一标识,该标识代表流式应用中唯一的一个数据块。

2、基于这个标识建立更新事务,并使用数据块数据更新外部系统。也就是说,如果该标识未被提交,则原子地将标识代表的数据更新到外部系统。否则,就认为该标识已经被提交,直接忽略之。

[mw_shl_code=scala,true]dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  }
}[/mw_shl_code]

接下来

1、附加指南
·        Kafka Integration Guide
·        Custom Receiver Guide
1、 第三方的DStream数据源:Third Party Projects
2、 Scala API文档:
·        StreamingContext and DStream
3、 更多示例:Scala and Java and Python
4、 介绍Spark streaming的论文视频




来源:csdn
作者:_旧时光_


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条