levycui 发表于 2016-11-29 10:40:21

Spark Job调度方式及资源分配策略介绍

问题导读:
1、如何理解在应用程序间、程序内调度?
2、集群的资源分配有哪些方式?
3、资源分配策略有哪些?
4、如何配置调度池属性?

static/image/hrline/3.gif

在应用程序间调度

当在集群上运行,一个 Spark 应用程序获得一系列独立的执行器(Executor) JVMs,执行器仅仅为应用程序运行 task 和存储数据。如果有多个用户需要分享你的集群,就依赖于集群管理器,这样可以有不同的选择来管理资源的分配。

最简单的选项,是在所有的集群管理器上都是可用的,就是对资源进行静态分区。使用这种方法,应用程序每一个应用程序能够得到最大数量的资源,并且能够在所有时间持有资源。这种方法被使用在了 Spark 的单机模式与 YARN 模式中,也和粗粒度的 Mesos 模式一样 。资源分配使用下面的方式配置,且依赖与集群的类型。

单例模式 : 默认情况下,应用程序提交到单机模式集群上,会使用 FIFO 次序来运行,并且每一个应用程序会试图使用所有可用的节点。

为了限制应用程序可使用的节点数目,你可以设置:

    spark.cores.max

或者改变应用程序默认没有设置的配置:

    spark.deploy.defaultCores

最后,除了控制 CPU 核数,可以使用下面的设置控制执行器内存的使用。

    spark.executor.memory

Mesos : 为了在 Mesos 上使用静态分区,需要设置 spark.mesos.coarse 为 true。就像在单机模式下,可以同过设置可选的 spark.cores.max 来限制每个应用程序的资源共享(几个应用程序共享一个执行器)。你也应该通过设置 spark.executor.memory 来控制执行器的内存。

YARN : 可以在 Spark YARN 客户端上使用 –num-executors 选项来控制会他需要为有多少个执行器分配集群资源(同 spark.executor.instances 配置参数)。 –executor-memeory(同 spark.executor.memory 配置参数) 和 –executor-cores(同 –executor-cores 配置参数) 来控制 每一个执行器的资源。

在 Mesos 中第二个可用的选项就是动态的分享 CPU 内核。在该模式下,每一个 Spark 应用程序仍然可以有一个固定并且独立的内存分配(spark.executor.memory),但是当一个应用程序没有在一个机器上运行 task, 其他的应用程序可能在他们的CPU核上运行 task。当你的应用程序拥有大量的不是一直活动的应用程序时,这个模式就十分有用,比如来自许多用户的 shell 回话。然而,他会带来一个无法预测的潜在风险,因为他可能会发生一个应用程序抢夺了一个正在工作的节点的CPU核的情况。如果使用这种模式,简单的使用 mesos:// URL 并且设置 spark.mesos.coarse 为 false。

注意没有一个模式支持应用程序间的内存分享,如果你想要使用分享数据,我们建议你运行一个单独的服务应用程序,他能通过使用相同的RDDs来服务多个请求。

动态的资源分配

Spark 支持一个机制,他能够基于工作量来动态的调整应用程序所占据的资源。也就是说如果你的应用程序不再使用资源并且不久后才需要再次请求他们,应用程序就会把资源归还给集群。这个特性在多个应用程序在你的集群上共享资源时,是非常有用的。

这个特性默认是关闭的,并且在所有粗粒度的集群管理器上可用,比如 单机模式,YARN,Mesos。

配置和开启

使用这个特性,需要做两件事。

你的引用程序必须设置 spark.dynamicAllocation.enabled 为 true。

你必须在同一个集群上的每一个 worker 节点上开启外部 shuffle 服务并且设置应用程序的 spark.shuffle.service.enabled 为 true。

外部 shuffle 服务的目的是为了执行器被移除的时候不会删除他们写的shuffle文件(一会详细讲解)。在集群管理器上开启这个服务的方式:


[*]在单机模式下,使用下面的配置:

    spark.shuffle.service.enabled

设置为 true。


[*]在 Mesos 粗粒度模式下,在所有的 slave 节点上运行 $SPARK_HOME/sbin/start-mesos-shuffle-service.sh 并设置 spark.shuffle.service.enabled 为 true。
[*]在 YARN 模式下,在每个 NodeManager 上开始 shuffle 服务:


[*]使用 YARN profile 编译 Spark。如果你有一个预先包装好的分布式(pre-packaged distribution,应该指的是官方编译好的 bin 包),就可以跳过该步骤。
[*]定位 spark--yarn-shuffle.jar。如果你自己编译的 Spark,他应该在 $SPARK_HOME/common/network-yarn/target/scala- 下面,否则就是就在 lib 下面。
[*]添加这个 jar 到你集群上的所有 NodeManager 的 classpath 中。
[*]在每个节点上的 yarn-site.xml ,添加 yarn.nodemanager.aux-services 设置为spark_shuffle,接着设置 yarn.nodemanager.aux-services.spark_shuffle.class为 org.apache.spark.network.yarn.YarnShuffleService
[*]重启你集群上的所有 NodeManager。

所有其他的相关配置都是可选的,配置名字就为 spark.dynamicAllocation.* 和 spark.shuffle.service.* 两种。

资源分配策略(policy)

在高水平上看, 当执行器不再被使用,Spark 就将其放开,并且当需要执行器的时候,就获得执行器。当没有固定的方式去预测一个被移除的执行器会不会在不久后运行一个 task, 或者一个刚刚加入的新的执行器是否会被闲置的时候,我们需要一系列启发式的模式来决定什么时候移除或请求执行器。

请求策略

当一个可动态分配的 Spark 应用程序的task在等待调度的节骨眼上,就能够请求另外一个执行器。这个必要的条件意味着现存的执行器已经不足以同时执行所有已经被提交的但是还未完成task(意思应该就是现有执行器接受不了新的 task 了,需要再分配一个)。

Spark 轮次请求执行器. 当待处理(pending)的 task 超过了 spark.dynamicAllocation.schedulerBacklogTimeout 秒还未被处理,就会触发请求,接着如果待处理 task 的队列中还留有 task, 就每 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 秒触发一次请求。另外,每一轮所请求的执行器数目会比上一轮的数目成指数增长。举栗子来说,一个应用程序会在第一轮添加一个执行器,然后在随后的轮次中,所请求的执行器的数目就为2,4,8。

可以发现增长的底数是2。之所以使用这种方式是因为以下原因。首先,一个应用程序在刚开始时应该谨慎的请求执行器,以防万一发生了使用少量额外的执行器就能满足的情况。第二,应用程序应该能使用一种及时的方式逐渐提升资源利用率,以防万一发生实际需要许多执行器的情况。

移除策略

移除执行器的策略十分简单。当一个执行器被闲置超过了 spark.dynamicAllocation.executorIdleTimeout 秒,就会被移除。这里要注意,在许多情况下,移除的条件与请求条件是彼此对立的,就是说当一直都有待处理的 task 要被调度的时候,执行器就不会被闲置。

让执行器优雅的退役

在动态的分配之后,一个 Spark 执行器(非正常)退出了,那要么是因为失败了,要么是当与之联结的应用程序退出了。在所有的情况下,与执行器列联结的所有状态(state)就不在需要了,并且会被安全的丢弃掉。然而,在动态分配中,当一个执行器被明确的(正常的)移除,应用程序仍然会继续执行。如果应用程序试图存取被这个执行器写入或者存储的状态(state),他就不得不重新计算这个状态。事实上, Spark 需要一个机制来让执行器优雅的退役,也就是在执行器被移除之前保留他的状态。

这个机制对 shuffle 来说是非常重要的。在 Shuffle 过程中, Spark 执行器会先将本地的 map 输出写在磁盘上,然后当其他的执行器试图获取他的时候,再执行在这些文件上的服务。有时会有一些运行时间会比其伙伴运行久的而落伍 task,动态分配可能在 shuffle 完全完成之前移除一个运行完 task 的执行器,这时候被该执行器写入的 shuffle 文件就没有必要再重新计算了。

保留 shuffle 文件的办法就是使用一个外部的 shuffle 服务,在 Spark 1.2 中引入。这个服务会长久的运行在你集群上的每个节点上, 并且独立于你的 Spark 应用程序和他们的执行器。如果服务被激活了,Spark 执行器将会直接从服务上拿取 shuffle 文件,而不是彼此之间。着意味着任何被一个执行器写入的 shuffle 状态(state)会继续得到超出执行器生命期的服务。

除了写出 shuffle 文件,执行器也会缓存(cache)数据在磁盘或者内存里。然而,当一个执行器被移除了之后,所有的缓存数据就无法再存取了。为了缓和这种情况,默认情况下,包含缓存的执行器永远不会被移除。你可以使用 spark.dynamicAllocation.cachedExecutorIdleTimeout 来配置这个行为。在未来的版本中,可能通过一个与外部 shuffle 服务很相似的方法在堆外(off-heap )存储中保留缓存数据。

在应用程序内调度

在内部提供的一个 Spark 应用程序(SparkContext实例),如果多个并行 job 被提交成互相分离的线程,他们就能同时运行。在本小节,这里所说的 “job”,我们指的是 Spark action(例如 save,cllect)和任何需要运行 action 求值的task。 Spark 的调度是完全的线程安全并且通过它就能去激活可以服务多个请求的应用程序(例如多个用户的的查询操作)。

默认情况下, Spark调度器 使用 FIFO 策略调度 jobs,每一个 job 被分为多个“阶段(stage)”(例如 map 和 reduce 阶段(phases)),并且当第一个 job 的阶段们有 task运行,他会获取所有可用资源的优先权;接着第二个 job 得到优先权,依次类推。如果在队首的的 job不需要使用所有的集群,随后 job 就够立刻运行,然而如果在队首的 jobs 太大了,之后的 jobs 就可能被明显的延后一段时间。

从 Spark 0.8 开始,你也能够通过配置,让资源在 jobs 之间的公平分享(fair sharing)。在公平分享模式下, Spark 使用循环赛(round robin)策略分配 jobs 之间的 tasks,以至于所有的 job 对集群资源进行粗鲁的平等分享。这意味着当一个长的 job 正在运行的时候,一个短的 job 不用等待长的job的完成,能够开始立刻接收资源并且获得足够的响应时间。这个模式适合多用户时设置(注意这里与 FIFO 的区别是 job 运行并没有次序)。

为了激活公平调度器,简单的在配置一个 SparkContext 的时候设置 spark.scheduler.mode 属性为 “FATR” 。

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

公平调度器池

公平调度器也支持将 jobs 分组进 池中,并且在每一池中设置不同的调度选项(比如 weight)。他可以用在为非常重要的 jobs 创建一个高优先的池,或者每一个用户的 jobs 分在一个池中,并且给予用户间的平等分享,而不是 jobs 之间的平等分享,不管他们有多少同时运行的job。这个是模仿 Hadoop 的公平调度。

如果没有任何的介入,新提交的 job 就进入默认的池中,但是 job 的池可以在提交他们的线程中添加 spark.scheduler.pool “本地属性” 给 SparkContext来设置。下面就是这样做的:

// 假定 sc 是你的 SparkContext 变量
sc.setLocalProperty("spark.scheduler.pool", "pool1")


当设置完这个本地变量,在当前线程中提交(在当前线程中调用 RDD.save, count, collect 等等)的所有的 job 将会使用这个名字的池。这个设置是单线程中的,这样就可以很简单的为相同用户提供一个运行多个 job 的线程。如果你想清理和你的线程联结的池,可以使用下面的方式:

sc.setLocalProperty("spark.scheduler.pool", null)

池的默认行为

默认情况下,每一个池会获得集群的一个平等分享(也等于默认的池中的每个作业的共享),但是在每一个池中,job 的运行是按照 FIFO 顺序。举个例子,如果你为每个用户创建一个池,这意味着每一个用户将会得到一个集群上的平等共享,并且每个用户的查询将会按顺序运行而不是晚来的查询会从该用户的先到的查询中获取资源。

配置池属性

也可以通过修改配置文件改变池的属性。每一个池支持三种属性:


[*]schedulingMode : 这个可以是 FIFO 或者 FAIR,来控制池中的 job 是使用队列还是公平的共享池中的资源。
[*]weight : 这是控制池在集群中的共享与其他池的关系。默认情况在,所有池的权重(weight)为1。如果你指定一个池的权重是2,那么这个池将会比其他活动的池获得两倍的资源。设置一个高的权重,比如1000,也可能实现池之间的优先级设置,这样每当权重为1000的池中有活动的任务,将总是第一个运行 task。
[*]minShare : 除了全局性的权重,每个池能按照管理员的想法设置一个最小的共享(比如 CPU 的核数)。公平调度总是试着先满足所有活动中的池的最小共享,然后再按照权重(weight)重新分配剩下的资源给各个池。由此,minShare 属性就成为了另一种能够确保一个池在没有给予高优先级的时候,总是能得到一些确定数量的资源(例如10个CPU 核)的方式。默认情况下,池的 minShare 是 0。

池属性可以通过创建一个 XML 文件来设置,与 conf/fairscheduler.xml.template 相似,然后通过在你的 SparkConf 中设置 spark.scheduler.allocation.file 来引入池设置。

conf.set("spark.scheduler.allocation.file", "/path/to/file")

XML 的格式很简单,一个 元素对应于一个池,在其中使用不同的元素来为设置各个池属性的值。去个例子:

<?xml version="1.0"?>
<allocations>
<pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
</pool>
<pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
</pool>
</allocations>

完整的例子在 conf/fairscheduler.xml.template 文件中。注意如果在 XML 中没有任何的池配置,每个池的所有属性就直接使用默认值(调度模式(scheduling mode) FIFO, 权重(weight) 1, 最小共享(minShare) 0)。

来源:CSDN
作者:Henvealf

页: [1]
查看完整版本: Spark Job调度方式及资源分配策略介绍