分享

求教spark新的资源调度算法

ww102111 发表于 2016-12-26 17:39:58 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 8 10345
想增加个新的调度算法,模仿FIFO,FAIR算法,求教思路或者类似源码,多谢了

已有(8)人评论

跳转到指定楼层
langke93 发表于 2016-12-26 19:33:03


spark 调度算法,其实可以使用yarn.yarn自带的三种调度算法。

spark 的SchedulingAlgorithm 两种调度算法的优先级比较

FIFO:
  • --计算优先级的差。注意,在程序中,大部分时候是优先级的数字越小,它优先级越高
  • --如果优先级相同,那么stage编号越靠前,优先级越高
  • --如果优先级字段和stage id都相同,那么s2比s1更优先。(有这种情况?)

FAIR:
  • --没有达到最小资源的task比已经达到最小资源的task优先级高
  • --如果两个task都没达到最小资源,那么比较它们占用最小资源的比例。比例越小越优先
  • --否则比较占用权重资源的比例,比例越小越优先
  • --如果所有上述的比较都相同,那么名字小的优先(哈哈,名字很重要);
  • --名字相同,则s2优先级高。



[mw_shl_code=java,true]
/**
* An interface for sort algorithm
* FIFO: FIFO algorithm between TaskSetManagers
* FS: FS algorithm between Pools, and FIFO or FS within Pools
*/
private[spark] trait SchedulingAlgorithm {
  def comparator(s1: Schedulable, s2: Schedulable): Boolean
}

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)  --计算优先级的差。注意,在程序中,大部分时候是优先级的数字越小,它优先级越高[/mw_shl_code]



[mw_shl_code=java,true]if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)  --如果优先级相同,那么stage编号越靠前,优先级越高。
    }
    if (res < 0) {
      true
    } else {
      false   ---如果优先级字段和stage id都相同,那么s2比s1更优先。(有这种情况?)
    }
  }
}



private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
    var compare:Int = 0

    if (s1Needy && !s2Needy) {     ----没有达到最小资源的task比已经达到最小资源的task优先级高
      return true
    } else if (!s1Needy && s2Needy) {   ----没有达到最小资源的task比已经达到最小资源的task优先级高
      return false
    } else if (s1Needy && s2Needy) {   ---如果两个task都没达到最小资源,那么比较它们占用最小资源的比例。比例越小越优先
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {   ---否则比较占用权重资源的比例,比例越小越优先
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)  
    }

    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {----如果所有上述的比较都相同,那么名字小的优先(哈哈,名字很重要);名字相同,则s2优先级高。
      s1.name < s2.name
    }
  }
}[/mw_shl_code]


回复

使用道具 举报

xuanxufeng 发表于 2016-12-26 19:53:12
楼主其实说的应该是作业的调度推荐参考

spark internal - 作业调度
http://www.aboutyun.com/forum.php?mod=viewthread&tid=8727

其实也有资源的调度,推荐参考
Spark Job调度方式及资源分配策略介绍
http://www.aboutyun.com/forum.php?mod=viewthread&tid=20419



更多相关内容
Spark之任务调度
http://www.aboutyun.com/forum.php?mod=viewthread&tid=8548


Spark 作业调度--job执行方式介绍
http://www.aboutyun.com/forum.php?mod=viewthread&tid=7600




回复

使用道具 举报

ww102111 发表于 2016-12-26 19:54:31
那有没有别的新的调度方案呢
回复

使用道具 举报

nextuser 发表于 2016-12-26 19:57:30
这有相关内容spark的FAIR公平调度器



回复

使用道具 举报

nextuser 发表于 2016-12-26 19:59:03
本帖最后由 nextuser 于 2016-12-26 20:08 编辑
ww102111 发表于 2016-12-26 19:54
那有没有别的新的调度方案呢

调度方式一般都是固定的,楼主说的新的,具体是指什么?

默认调度算法FIFO 队列策略


计算能力调度算法Capacity Scheduler(Yahoo 开发)


公平份额调度算法Fair Scheduler(Facebook开发)



除这三个以外???【hadoop三个,spark两个,应该没有Capacity 】
回复

使用道具 举报

ww102111 发表于 2016-12-26 20:03:18
就是自己大概设计一种新型的调度策略,比如Hadoop的多种调度算法
回复

使用道具 举报

nextuser 发表于 2016-12-26 20:06:27
ww102111 发表于 2016-12-26 20:03
就是自己大概设计一种新型的调度策略,比如Hadoop的多种调度算法

spark里面没有看到可以自定义,不过hadoop里面是可以自定义调度策略的。也就是你自己写代码实现。hadoop有接口。spark不知道有没有,但是如果你懂源码,你也可以自己修改
回复

使用道具 举报

ww102111 发表于 2016-12-26 20:08:10
哦哦,那好吧,只是在一个论文中看到有人自己重写了一种算法,但是重新编译spark的源码,但只是说了思路。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条