分享

Strom:pluggable scheduler 自己写的自定义分配任务

pig2 发表于 2014-5-22 00:17:36 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 12081
问题导读:
1.Storm提供的一个插拔式调度策略的扩展,是指什么?
2.插拔式扩展设计的目的是什么?
3.出现superviso分配不到任务,该如何解决?






Storm提供的一个插拔式调度策略的扩展,在加入新的topology后,并不会完全替代系统自带的分配策略。它仅仅是扩展,不能替代。而且按照设计,它的出现是为了应付一些极端情况,比如,slot不够用。在这些情况下,storm默认的调度策略,无法很好的实施。然后,才会执行自定义的调度策略。新加入的topology启动后,系统默认的调度策略还在起作用,所以,我们制定的spout/bolt的实体可能已经被分配了,也可能我们制定的supervisor机器的slot已经全部用光了,只有这些都可以满足的时候,我们才能进行一定程度的调度。
为了打破以上的限制,我写的方法是,首先将所有的已经分配的任务全部释放掉。然后,按照逻辑进行任务的分配。剩下的任务使用默认的调度策略,但是,总是出现一台supervisor上面分配不到任务。
最后的解决方法,是把topology中所有的任务,全部自定义分配。然后将无任务可分配的topology交给默认调度策略来分配(这一步不能省,否则,会按照系统默认的调度策略进行调度。这说明调度策略,除了进行任务的分配还进行了其他的配置)。这样的结果,才能将任务按照逻辑成功分配。当然,这样会有很多的问题,比如,在自定义调度策略分配完之前,不能接收任何tuple。调度策略的粒度,是线程级别。
自己写的代码如下:实现了,将spout定向配置到其他某个supervisor上,然后,将所有的blot线程平均分摊到所有的supervisor。
集群环境:4个supervisor,两个blot,并行度各为10.
如下代码仅供参考,不要随便移植走,如要移植走还要解决一切线程分配的算法问题。

  1. package storm;
  2. import java.util.ArrayList;
  3. import java.util.Collection;
  4. import java.util.Iterator;
  5. import java.util.List;
  6. import java.util.Map;
  7. import java.util.Set;
  8. import backtype.storm.scheduler.Cluster;
  9. import backtype.storm.scheduler.EvenScheduler;
  10. import backtype.storm.scheduler.ExecutorDetails;
  11. import backtype.storm.scheduler.IScheduler;
  12. import backtype.storm.scheduler.SchedulerAssignment;
  13. import backtype.storm.scheduler.SupervisorDetails;
  14. import backtype.storm.scheduler.Topologies;
  15. import backtype.storm.scheduler.TopologyDetails;
  16. import backtype.storm.scheduler.WorkerSlot;
  17. public class DemoScheduler implements IScheduler {
  18.     public void prepare(Map conf) {}
  19.     private int flag=0;
  20.     private void Myschedule(Topologies topologies, Cluster cluster)
  21.     {
  22.            
  23.              SchedulerAssignment currentAssignment = cluster.getAssignmentById(topologies.getByName("special-topology").getId());
  24.          if (currentAssignment != null) {
  25.                  System.out.println("MY:current assignments: " + currentAssignment.getExecutorToSlot());
  26.          } else {
  27.                  System.out.println("My:current assignments: {}");
  28.          }
  29.          
  30.          SupervisorDetails specialSupervisor= GetSupervisorDetailsByName(cluster,"special-slave3");
  31.          if(specialSupervisor!=null)
  32.          {
  33.          List<WorkerSlot>  availableSlots = cluster.getAvailableSlots(specialSupervisor);
  34.          System.out.println("availableSlotsNum:"+availableSlots.size());
  35.          System.out.println("availableSlotsNum List:"+availableSlots);
  36.       
  37.          TopologyDetails topology = topologies.getByName("special-topology");
  38.         
  39.          Map<String, List<ExecutorDetails>> componentToExecutors = cluster.getNeedsSchedulingComponentToExecutors(topology);
  40.          List<ExecutorDetails> executors = componentToExecutors.get("1");
  41.          List<ExecutorDetails> executors2 = componentToExecutors.get("2");
  42.         
  43.          Map<String, SupervisorDetails> AllSupervisors= cluster.getSupervisors();
  44.          Collection<SupervisorDetails>AllSuperVaule= AllSupervisors.values();
  45.         
  46.          SupervisorDetails[] superArray=new SupervisorDetails[AllSuperVaule.size()];
  47.          AllSuperVaule.toArray(superArray);
  48.          
  49.          ArrayList<ExecutorDetails> AllExecutor=new ArrayList<ExecutorDetails>();
  50.          
  51.          for(int i=0;i<executors.size();i++)
  52.          {
  53.                  AllExecutor.add(executors.get(i));
  54.                  AllExecutor.add(executors2.get(i));
  55.          }
  56.          
  57.          System.out.println("AllExecutor size:"+AllExecutor.size()+" "+superArray.length);
  58.         for(int i=0;i<superArray.length;i++)
  59.          {
  60.                  List<ExecutorDetails> temp=AllExecutor.subList(i*5, i*5+5);
  61.                  
  62.                  List<WorkerSlot>  availableSlotsInner = cluster.getAvailableSlots(superArray[i]);
  63.                  cluster.assign(availableSlotsInner .get(0), topology.getId(), temp);
  64.                  System.out.println("Assiment:"+temp+"to"+i);
  65.                  
  66.          }
  67.          
  68.        //  cluster.assign(availableSlots.get(1), topology.getId(), executors);
  69.         // cluster.assign(availableSlots.get(2), topology.getId(), executors2);
  70.         
  71.          }
  72.          else
  73.          {
  74.                  System.out.println("special-slave3 is not exits!!!");
  75.          }
  76.     }
  77.     private SupervisorDetails  GetSupervisorDetailsByName(Cluster cluster,String SupervisorName)
  78.     {
  79.         // find out the our "special-supervisor" from the supervisor metadata
  80.         Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
  81.         SupervisorDetails specialSupervisor = null;
  82.         for (SupervisorDetails supervisor : supervisors) {
  83.             Map meta = (Map) supervisor.getSchedulerMeta();
  84.             if (meta.get("name").equals(SupervisorName)) {
  85.                 specialSupervisor = supervisor;
  86.                 break;
  87.             }
  88.         }
  89.         
  90.         return specialSupervisor;
  91.     }
  92.     public void schedule(Topologies topologies, Cluster cluster) {
  93.            
  94.             System.out.println("DemoScheduler: begin scheduling");
  95.         // Gets the topology which we want to schedule
  96.             TopologyDetails topology = topologies.getByName("special-topology");
  97.         // make sure the special topology is submitted,
  98.         if (topology != null) {
  99.                 System.out.println("special-topology is  not null!!!");
  100.                
  101. if(flag==0)
  102. {
  103.       
  104.             boolean needsScheduling = cluster.needsScheduling(topology);
  105.            // cluster.n
  106.             if (needsScheduling) {
  107.                     System.out.println("Our special topology DOES NOT NEED scheduling.");
  108.             } else {
  109.                     System.out.println("Our special topology needs scheduling.");
  110.                 // find out all the needs-scheduling components of this topology
  111.                     
  112.                     
  113.                     
  114.                     Collection<SupervisorDetails> Tempsupervisors = cluster.getSupervisors().values();//d
  115.                     
  116.                     
  117.                     for (SupervisorDetails supervisor : Tempsupervisors) {
  118.                     
  119.                            
  120.                            
  121.                                List<WorkerSlot> availableSlots = cluster.getAvailableSlots(supervisor);
  122.                                //
  123.                               int Availablenum =availableSlots.size();
  124.                               String suName=supervisor.getHost();
  125.                               
  126.                               System.out.println("before:HostName:"+suName+" AvailableNum:"+Availablenum);
  127.                               
  128.                                if(!availableSlots.isEmpty())
  129.                                {
  130.                                        for (Integer port : cluster.getUsedPorts(supervisor)) {
  131.                                    cluster.freeSlot(new WorkerSlot(supervisor.getId(), port));
  132.                                }
  133.                                }
  134.                                List<WorkerSlot> availableSlots2 = cluster.getAvailableSlots(supervisor);
  135.                               
  136.                                int Availablenum2 =availableSlots2.size();
  137.                               
  138.                                
  139.                                System.out.println("after:HostName:"+suName+" AvailableNum:"+Availablenum2);
  140.                         
  141.                         }
  142.                     
  143.                     
  144.                     
  145.                     
  146.                     
  147.                 Map<String, List<ExecutorDetails>> componentToExecutors = cluster.getNeedsSchedulingComponentToExecutors(topology);
  148.                
  149.                 System.out.println("needs scheduling(component->executor): " + componentToExecutors);
  150.                 System.out.println("needs scheduling(executor->compoenents): " + cluster.getNeedsSchedulingExecutorToComponents(topology));
  151.                 SchedulerAssignment currentAssignment = cluster.getAssignmentById(topologies.getByName("special-topology").getId());
  152.                 if (currentAssignment != null) {
  153.                         System.out.println("current assignments: " + currentAssignment.getExecutorToSlot());
  154.                 } else {
  155.                         System.out.println("current assignments: {}");
  156.                 }
  157.                
  158.               
  159.                
  160.                
  161.                 if (!componentToExecutors.containsKey("spout")) {
  162.                         System.out.println("Our special-spout DOES NOT NEED scheduling.");
  163.                 } else {
  164.                     System.out.println("Our special-spout needs scheduling.");
  165.                     List<ExecutorDetails> executors = componentToExecutors.get("spout");
  166.                     
  167.                     
  168.                     
  169.                     
  170.                     // find out the our "special-supervisor" from the supervisor metadata
  171.                     Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
  172.                     SupervisorDetails specialSupervisor = null;
  173.                     for (SupervisorDetails supervisor : supervisors) {
  174.                         Map meta = (Map) supervisor.getSchedulerMeta();
  175.                         if (meta.get("name").equals("special-slave2")) {
  176.                             specialSupervisor = supervisor;
  177.                             break;
  178.                         }
  179.                     }
  180.                     // found the special supervisor
  181.                     if (specialSupervisor != null) {
  182.                             System.out.println("Found the special-supervisor");
  183.                         List<WorkerSlot> availableSlots = cluster.getAvailableSlots(specialSupervisor);
  184.                         
  185.                         // if there is no available slots on this supervisor, free some.
  186.                         // TODO for simplicity, we free all the used slots on the supervisor.
  187.                         if (availableSlots.isEmpty() && !executors.isEmpty()) {
  188.                             for (Integer port : cluster.getUsedPorts(specialSupervisor)) {
  189.                                 cluster.freeSlot(new WorkerSlot(specialSupervisor.getId(), port));
  190.                             }
  191.                         }
  192.                         // re-get the aviableSlots
  193.                         availableSlots = cluster.getAvailableSlots(specialSupervisor);
  194.                         // since it is just a demo, to keep things simple, we assign all the
  195.                         // executors into one slot.
  196.                         cluster.assign(availableSlots.get(0), topology.getId(), executors);
  197.                         Myschedule(topologies, cluster);
  198.                         flag=1;
  199.                         System.out.println("We assigned executors:" + executors + " to slot: [" + availableSlots.get(0).getNodeId() + ", " + availableSlots.get(0).getPort() + "]");
  200.                     } else {
  201.                             System.out.println("There is no supervisor named special-supervisor!!!");
  202.                     }
  203.                 }
  204.             }
  205.             
  206.             
  207.         }//end flag==0
  208. else
  209. {
  210.         System.out.println(" only do once :"+flag);
  211.         }
  212.     }//end special=null
  213.       
  214.         // let system's even scheduler handle the rest scheduling work
  215.         // you can also use your own other scheduler here, this is what
  216.         // makes storm's scheduler composable.
  217.                 System.out.println("using the default system Schedule!!!");
  218.         new EvenScheduler().schedule(topologies, cluster);
  219.         
  220.       
  221.       
  222.     }
  223.            
  224.     }//end class
复制代码




已有(1)人评论

跳转到指定楼层
chencheng06 发表于 2014-5-23 14:12:17
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条