hyj 发表于 2014-5-4 18:45:25

Spark 作业调度--job执行方式介绍

本帖最后由 hyj 于 2014-5-4 19:01 编辑

问题导读:
1.由不同线程提交的多个“jobs”(Spark actions)是否可以同时运行
2.线程的含义什么?
3.Java main函数是进程还是线程?
4.应用程序启用Spark独立部署模式,job是否并发执行?


static/image/hrline/4.gif
1.Spark是否并发分析
本文是针对spark作业执行方式是否并发解惑的一篇文章。spark有三种作业调度方式,那么他们的调度方式什么?这里在下文的基础上分析一下:
首先我们知道不同的用户,对于一个集群可能会同时不断的提交作业,那么这些job是怎么执行的,这里困惑了不少刚接触spark的同学。其实无论是那种部署方式,他们都是有可能并发执行的,也就是可能是同时执行的。
下面引用下文的内容:
由不同线程提交的多个“jobs”(Spark actions)可以同时运行
这里我产生了疑问,什么是线程,main()函数是进程还是线程。引用下面内容
main是一个线程也是一个进程,一个java程序启动后它就是一个进程,进程相当于一个空盒,它只提供资源装载的空间,具体的调度并不是由进程来完成的,而是由线程来完成的。一个java程序从main开始之后,进程启动,为整个程序提供各种资源,而此时将启动一个线程,这个线程就是主线程,它将调度资源,进行具体的操作。
对于不同用户提交的.jar是可以理解为一个线程的,因此从下文得知
由不同线程提交的多个“jobs”(Spark actions)可以同时运行
无论是那种部署方式,我们的spark程序都是可以并发执行的。详细可以看下文。




static/image/hrline/4.gif
2.Spark 作业调度


概述
Spark有几个在计算中调度资源的工具。
第一需要记得,正如集群模式概述中描述的那样,每个Spark应用中(SparkContext实例)都运行着一组独立的执行进程。Spark运行在的集群管理器提供了应用间调度的工具。
第二,在每个Spark应用中,由不同线程提交的多个“jobs”(Spark actions)可以同时运行。在处理网络请求的应用中这很常见,比如Shark服务器就以这种方式运行。Spark有一个调度均衡器在每个SparkContext中调度资源。

应用程序之间的调度
每个运行在集群上的Spark应用程序都能得到一个独立的JVM虚拟机,而JVM仅仅用于应用程序运行任务和存储数据。如果多用户需要共享你的集群,可以通过集群管理器配置不同的选项来分配资源。
在集群管理器中最简单有效的方式就是静态区分资源。使用此方法,每个应用程序在整个的生命周期中都可以得到一个最大数量的资源。这种方式被用于Spark的standalone和YARN模式中,同样也用于coarse-grained Mesos mode模式。根据集群的类型,可以通过下面的配置来分配资源。

1.Standalone mode:默认情况下,应用程序启用Spark独立部署模式,这种模式按照FIFO(先进先出)的顺序执行,每个应用程序都会尝试使用所有可用的节点。你可以通过spark设置来限制应用程序的节点数。例如:你可以启动一个有10核并长时间运行的服务器,并允许每个用户通过shells使用20核心。最后,除了控制核心外,每个应用程序的spark执行存储器可以控制自己的内存使用。

2.Mesos:在独立部署模式中,要在Mesos上使用静态分区,需要设置spark.mesos.coarse 系统属性为true,另外,可选项设置spark.cores.max可以限制每个应用程序的共享资源数。你也可以配置spark.executor.memory来控制执行器的内存。

3.YARN:num-workers选项用于在Spark YARN端分配集群上workers数量,尽管worker-memory和worker-cores可以控制每个worker的资源分配。


Mesos上的第二个可用选项是动态共享CPU内核。在这种模式下,每个Spark应用程序仍然分配有一个固定和独立的内存(通过spark.executor.memory来设置),当这个应用程序没有在机器上执行任务的时候,其他的应用程序就可能在这些内核上运行任务。当你期望大量但不是过度活跃应用程序的时候,这种模式是非常有用的,例如独立用户中的shell会话。然而,它却伴随着一个不可预知的潜在危险,这是因为当它需要执行任务的时候,在节点上需要耗费一段时间重新获得CPU核心资源。使用这种模式,不需要设置spark.mesos.coarse为true,只需要简单的使用amesos://URL。
请注意,所有的模式目前提供跨应用程序内存共享。如果你喜欢通过这种方式共享数据,我们推荐运行单一服务器的应用程序能够提供多个请求,可以通过查询相同的RDDs得到。例如,Shark JDBC服务器以这种方式进行SQL查询。在将来的版本中,内存中的存储系统,如 Tachyon将会提供另外的一种方式来共享RDDs。


应用中的调度
在给定的Spark应用(已实例化SparkContext)中,如果在不同线程中,多个并行的工作可以同时运行。我们所说的“工作”,其实是一个Spark动作(如保存,收集等)或是任何想需要评估动作的任务。Spark的任务调度员是多线程安全的,它也支持这个用例来使应用服务多个请求(多用户查询).
默认的,Spark调度员是按照FIFO(先进先出)的形式来运行工作的。每一个工作被分为多个“阶段”(如,map和reduce语句),对于所有可用的资源中第一个工作优先级最高,这个工作阶段中的任务会被启动,之后是第二个,依次类推。如果集群不需要队列头中的工作,后面的工作将被立刻启动,如果队列头的工作很大,后面的工作可能大大地推迟。


启动Spark0.8,它可以在两个作业之间配置共享调度。Spark负责在作业之间轮换分配调度,所以,所有的作业都能得到一个大致公平的共享的集群资源。这就意味着即使有一个很长的作业在运行,花费时间较少的作业在提交之后,仍然能够立即获得资源,并且能够有很好的响应,而不是需要等待那个很长的作业运行完之后才能运行。这种模式最适合多用户设置。
要启用公平作业调度,在创建一个SparkContext之前,需要简单的配置spark.scheduler.mode为FAIR:

System.setProperty("spark.scheduler.mode", "FAIR")
公平的调度池

公平调度可以支持在池中将工作分组,而且为不同的池可以设置不同的调度选项(如,权重)。这样可以很有用的为更多重要的工作创建一个“高优先级”池,举例,将每一个用户的工作一起分组,不管有多少并发工作也让每个用户平等的分享 ,以这种方式代替了平分给定的工作。这种方式是模仿Hadoop公平调度。
如果没有设置,新提交的工作将进入默认池中,但是工作池可以在线程中用spark.scheduler.pool来给SparkContent添加“本地属性”并提交。如下:

// 假设context是你SparkContext中的变量
context.setLocalProperty("spark.scheduler.pool", "pool1")
在设置了本地属性之后,所有的在这个线程(在这个线程中调用 RDD.save,count,collect等)的工作提交将会用这个池来命名。这样同一个用户可以让每个线程容易的执行多个工作。如果你想要清除线程相关的池,简单调用如下:
context.setLocalProperty("spark.scheduler.pool", null)
调度池中的默认行为
默认的,每个池都会平等的分享集群(在默认的池中每一个工作也是平等分享的),但在每一个池中,工作是按照FIFO(先进先出)顺序。比如,如果你给每一个用户创建一个池,这就意味着每一个用户都平等的分享一个集群,这样每一个查询都是按顺序查询的。

配置调度池
通过配置文件可以修改调度池的属性。每个调度池都支持3个属性。
schedulingMode:该属性的值可以是FIFO或者FAIR,用来控制作业在调度池中排队运行(默认情况下)或者公平分享调度池资源。
weight:控制调度池在集群之间的分配。默认情况下,所有调度池的weight值都是为1。例如:如果你指定了一个调度池的值为2,那么这个调度池就比其它调度池多获得2倍的资源。设置一个更高的weight值,例如1000,就可以实现线程池之间的优先权——实际上,weight值为1000的调度池无论什么时候作业被激活,它都总是能够最先运行。
minShare:除了一个整体的权重,如果管理员喜欢,可以给每个调度池指定一个最小的shares值(也就是CPU的核数目)。公平调度器通过权重重新分配资源之前总是试图满足所有活动调度池的最小share。在没有给定一个高优先级的其他集群中,minShare属性是另外的一种方式来确保调度池能够迅速的获得一定数量的资源(例如10核CPU),默认情况下,每个调度池的minShare值都为0。


可以通过XML文件来设置pool属性,和配置公平调度的xml模板文件一样,只需要设置spark.scheduler.allocation.file的属性:
System.setProperty("spark.scheduler.allocation.file", "/path/to/file")
对于每个池,XML文件的格式是一个简单的<pool>元素,可以在这个元素中设置各种不同元素。例如:
<?xml version="1.0"?>
<allocations>
<pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
</pool>
<pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
</pool>
</allocations>这个完整的例子也可以适用到对公平调度的xml模板文件配置。请注意,任何没有在xml文件中配置的池,都会有一个默认配置值(scheduling mode 值是FIFO,weight值为1,minShare值为0)。










关键先生 发表于 2015-12-4 21:55:55

楼主你好~
我现在有点迷糊,请看如下代码:
object JobScheduling {
    def main(args: Array) {

//      System.setProperty("spark.scheduler.mode", "FAIR")
      val conf = new SparkConf().setAppName("BaseLineDemo").setMaster("local[*]")
      val sc = new SparkContext(conf)

      val ssc = new StreamingContext(sc, Seconds(5))
      ssc.checkpoint("/tmp/checkpoint")

      // 通过TCP连接本机7788端口 获得输入流
      val logs = ssc.receiverStream(new UdpReceiver(9514, "UTF-8"))

      logs.foreachRDD(rdd =>{
            rdd.foreach(event => {
                // action-1
                println(s"---------------${event}")
                Thread.sleep(100)
            })
      })
      logs.foreachRDD(rdd =>{
            // action-2
            rdd.foreach(event => {
                println(s"+++++++++++++++${event}")
            })
      })

      ssc.start()
      ssc.awaitTermination()

    }
}

action-1action-2应该就是两个job。那么这两个job是串行执行的 还是并行执行的?

另:
    在每个Spark应用中,由不同线程提交的多个“jobs”(Spark actions)可以同时运行。
在spark中怎么使用多线程提交jobs? 使用 sparkContent.runJob 么? 能否给个例子??

nextuser 发表于 2015-12-5 22:47:49

关键先生 发表于 2015-12-4 21:55
楼主你好~
我现在有点迷糊,请看如下代码:
object JobScheduling {


spark internal - 作业调度
http://www.aboutyun.com/thread-8727-1-1.html




regan 发表于 2015-12-7 13:20:43

关键先生 发表于 2015-12-4 21:55
楼主你好~
我现在有点迷糊,请看如下代码:
object JobScheduling {


action-1,action-2的确是两个不同的job,当action触发时将会提交job,这两个job的运行将在调度模式下依次执行,即串先行执行。执行的顺序具体看你配置的job调度策略,默认为FIFO,也可以配置成公平调度策略,在公平调度策略中可配置job执行的权重,权重越大越优先执行。每一个job提交后将会划分stage,每个stage中会有多个任务,每个stage中的任务在worker节点在线程池中并行处理。


regan 发表于 2015-12-7 13:24:06

关键先生 发表于 2015-12-4 21:55
楼主你好~
我现在有点迷糊,请看如下代码:
object JobScheduling {


在多个线程中可以提交job,但是提交的job到spark集群中也是遵守调度规则的,即每一个job也是顺序执行的。

regan 发表于 2015-12-7 13:33:37

关键先生 发表于 2015-12-4 21:55
楼主你好~
我现在有点迷糊,请看如下代码:
object JobScheduling {


比如说你要在10个线程中提交Job,像下面一样
for(i <- 1 to 10){
val thread:Thread = new Thread(new Runnable {
    override def run(): Unit = {
      logs.foreachRDD(rdd =>{
      rdd.foreach(event => {
          // action-1
          println(s"---------------${event}")
          Thread.sleep(100)
      })
      })
    }
})
thread.start()
}
你可以把rdd的treasaction和actoin操作,包装一下,成为一个Runnable,在runnable中去写每一个线程要执行的作业,当actin触发的时候,每一个线程的actin操作都将触发job的提交,形成多个job的提交,提交的多个job会根据你的调度策略进行作业的调度。

邓立辉 发表于 2016-3-1 21:53:16

Spark Streaming可以用公平调度策略么?
比如某个时间窗口的数据处理耗时长,先达到了,但是因为耗时长,后来的时间窗口数据耗时短,还的等之前的处理完,才会处理短的。怎么让短的也立即被处理?
页: [1]
查看完整版本: Spark 作业调度--job执行方式介绍