分享

hbase Balancer 源码分析-负载均衡

pig2 2014-5-8 16:29:24 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 31535
本帖最后由 nettman 于 2014-5-27 22:30 编辑
问题导读:
hbase通过功能实现负载均衡的?
本文分析平衡的条件是是什么?





看源码很久了,终于开始动手写博客了,为什么是先写负载均衡呢,因为一个室友入职新公司了,然后他们遇到这方面的问题,某些机器的硬盘使用明显比别的机器要多,每次用hadoop做完负载均衡,很快又变回来了。

首先我们先看HMaster当中怎么初始化Balancer的,把集群的状态穿进去,设置master,然后执行初始化。
  1. //initialize load balancer
  2. this.balancer.setClusterStatus(getClusterStatus());
  3. this.balancer.setMasterServices(this);
  4. this.balancer.initialize();
复制代码

然后调用是在HMaster的balance()方法当中调用
  1. Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
  2.         this.assignmentManager.getRegionStates().getAssignmentsByTable();
  3. List<RegionPlan> plans = new ArrayList<RegionPlan>();
  4. //Give the balancer the current cluster state.
  5. this.balancer.setClusterStatus(getClusterStatus());
  6. //针对表来做平衡,返回平衡方案,针对全局,可能不是最优解
  7. for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
  8.     List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
  9.     if (partialPlans != null) plans.addAll(partialPlans);
  10. }
复制代码

可以看到它首先获取了当前的集群的分配情况,这个分配情况是根据表的 Map<TableName, Map<ServerName, List<HRegionInfo>>,然后遍历这个map的values,调用balancer.balanceCluster(assignments) 来生成一个partialPlans,生成RegionPlan(Region的移动计划) 。

  我们就可以切换到StochasticLoadBalancer当中了,这个是默认Balancer具体的实现了,也是最好的实现,下面就说说这玩意儿咋实现的。

  看一下注释,这个玩意儿吹得神乎其神的,它说它考虑到了这么多因素:
  1. Region LoadRegion的负载
  2. Table Load 表的负载
  3. Data Locality 数据本地性
  4. Memstore Sizes 内存Memstore的大小
  5. Storefile Sizes硬盘存储文件的大小
复制代码




 好,我们从balanceCluster开始看吧,一进来第一件事就是判断是否需要平衡
  1. //不需要平衡就退出
  2. if (!needsBalance(new ClusterLoadState(clusterState))) {
  3.    return null;
  4. }
复制代码
 平衡的条件是:负载最大值和最小值要在平均值(region数/server数)的+-slop值之间, 但是这个平均值是基于表的,因为我们传进去的参数clusterState就是基于表的。
  1. // Check if we even need to do any load balancing
  2. // HBASE-3681 check sloppiness first
  3. float average = cs.getLoadAverage(); // for logging
  4. //集群的负载最大值和最小值要在平均值的+-slop值之间
  5. int floor = (int) Math.floor(average * (1 - slop));
  6. int ceiling = (int) Math.ceil(average * (1 + slop));
  7. if (!(cs.getMinLoad() > ceiling || cs.getMaxLoad() < floor)) {
  8.     .....return false;
  9. }
  10. return true;
复制代码
如果需要平衡的话,就开始计算开销了
  1. // Keep track of servers to iterate through them.
  2. Cluster cluster = new Cluster(clusterState, loads, regionFinder);
  3. //计算出来当前的开销   
  4. double currentCost = computeCost(cluster, Double.MAX_VALUE);
  5. double initCost = currentCost;
  6. double newCost = currentCost;
  7.  for (step = 0; step < computedMaxSteps; step++) {
  8.          //随机挑选一个"选号器"
  9.          int pickerIdx = RANDOM.nextInt(pickers.length);
  10.          RegionPicker p = pickers[pickerIdx];
  11.          //用选号器从集群当中随机跳出一对来,待处理的<server,region>对
  12.          Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> picks = p.pick(cluster);
  13.          int leftServer = picks.getFirst().getFirst();
  14.          int leftRegion = picks.getFirst().getSecond();
  15.          int rightServer = picks.getSecond().getFirst();
  16.          int rightRegion = picks.getSecond().getSecond();
  17.          cluster.moveOrSwapRegion(leftServer,
  18.               rightServer,
  19.               leftRegion,
  20.               rightRegion);
  21.         //移动或者交换完之后,看看新的开销是否要继续
  22.          newCost = computeCost(cluster, currentCost);
  23.          // Should this be kept? 挺好,保存新状态
  24.          if (newCost < currentCost) {
  25.             currentCost = newCost;
  26.          } else {
  27.    // 操作不划算,就回退
  28.            cluster.moveOrSwapRegion(leftServer,
  29.                 rightServer,
  30.                 rightRegion,
  31.                 leftRegion);
  32.        }
  33.      if (initCost > currentCost) {
  34.          //找到了满意的平衡方案
  35.          List<RegionPlan> plans = createRegionPlans(cluster);
  36.          return plans;
  37.     }
复制代码

上面的被我清除了细枝末节之后的代码主体,okay,上面逻辑过程如下:

1. 生成一个虚拟的集群cluster,方便计算计算当前状态的开销,其中clusterState是表的状态,loads是整个集群的状态。
  1. // Keep track of servers to iterate through them.
  2. Cluster cluster = new Cluster(clusterState, loads, regionFinder);
  3. //计算出来当前的开销   
  4. double currentCost = computeCost(cluster, Double.MAX_VALUE);
  5. double initCost = currentCost;
  6. double newCost = currentCost;
复制代码
2. 然后循环computedMaxSteps次,随机从选出一个picker来计算平衡方案
  1. int pickerIdx = RANDOM.nextInt(pickers.length);
  2. RegionPicker p = pickers[pickerIdx];
  3. //用选号器从集群当中随机跳出一对来,待处理的<server,region>对
  4. Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> picks = p.pick(cluster);
复制代码

picker是啥?这里面有三个,第一个是RandomRegionPicker是随机挑选region,这里就不详细介绍了,主要讨论后面两个;第二个LoadPicker是计算负载的,第三个主要是考虑本地性的。

给我感觉就很像ZF的摇号器一样,用哪种算法还要摇个号
  1. pickers = new RegionPicker[] {
  2.       new RandomRegionPicker(),
  3.       new LoadPicker(),
  4.       localityPicker
  5. };
复制代码
下面我们先看localityPicker的pick方法,这个方法是随机抽选出来一个server、region,找出region的其他本地机器,然后他们返回。
  1. @Override
  2.     Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
  3.       if (this.masterServices == null) {
  4.         return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
  5.             new Pair<Integer, Integer>(-1,-1),
  6.             new Pair<Integer, Integer>(-1,-1)
  7.         );
  8.       }
  9.       // Pick a random region server 随机选出一个server来
  10.       int thisServer = pickRandomServer(cluster);
  11.       // Pick a random region on this server 随机选出region
  12.       int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f);
  13.       if (thisRegion == -1) {
  14.         return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
  15.             new Pair<Integer, Integer>(-1,-1),
  16.             new Pair<Integer, Integer>(-1,-1)
  17.         );
  18.       }
  19.       // Pick the server with the highest locality 找出本地性最高的目标server
  20.       int otherServer = pickHighestLocalityServer(cluster, thisServer, thisRegion);
  21.       // pick an region on the other server to potentially swap
  22.       int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f);
  23.       return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
  24.           new Pair<Integer, Integer>(thisServer,thisRegion),
  25.           new Pair<Integer, Integer>(otherServer,otherRegion)
  26.       );
  27.     }
复制代码
okay,这个结束了,下面我们看看LoadPicker吧。
  1. @Override
  2.     Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
  3.       cluster.sortServersByRegionCount();
  4.       //先挑选出负载最高的server
  5.       int thisServer = pickMostLoadedServer(cluster, -1);
  6.       //再选出除了负载最高的server之外负载最低的server
  7.       int otherServer = pickLeastLoadedServer(cluster, thisServer);
  8.       Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
  9.       return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
  10.           new Pair<Integer, Integer>(thisServer, regions.getFirst()),
  11.           new Pair<Integer, Integer>(otherServer, regions.getSecond())
  12.       );
  13.     }
复制代码


这里的负载高和负载低是按照Server上面的region数来算的,而不是存储文件啥的,选出负载最高和负载最低的时候,又随机抽出region来返回了。

pick挑选的过程介绍完了,那么很明显,计算才是重头戏了,什么样的region会导致计算出来的分数高低呢?

3. 重点在计算函数上 computeCost(cluster, Double.MAX_VALUE) 结果这个函数也超级简单,哈哈
  1. protected double computeCost(Cluster cluster, double previousCost) {
  2.     double total = 0;
  3.    
  4.     for (CostFunction c:costFunctions) {
  5.       if (c.getMultiplier() <= 0) {
  6.         continue;
  7.       }
  8.       total += c.getMultiplier() * c.cost(cluster);
  9.       if (total > previousCost) {
  10.         return total;
  11.       }
  12.     }
  13.     return total;
  14.   }
复制代码

遍历CostFunction,拿cost的加权平均和计算出来。

那costFunction里面都有啥呢?localityCost又出现了,看来本地性是一个很大的考虑的情况。
  1. costFunctions = new CostFunction[]{
  2.       new RegionCountSkewCostFunction(conf),
  3.       new MoveCostFunction(conf),
  4.       localityCost,
  5.       new TableSkewCostFunction(conf),
  6.       regionLoadFunctions[0],
  7.       regionLoadFunctions[1],
  8.       regionLoadFunctions[2],
  9.       regionLoadFunctions[3],
  10. };
  11.  regionLoadFunctions = new CostFromRegionLoadFunction[] {
  12.         new ReadRequestCostFunction(conf),
  13.         new WriteRequestCostFunction(conf),
  14.         new MemstoreSizeCostFunction(conf),
  15.         new StoreFileCostFunction(conf)
  16.      };
复制代码
可以看出来,里面真正看中硬盘内容大小的,只有一个StoreFileCostFunction,cost的计算方式有些区别,但都是一个0-1之间的数字,下面给出里面5个函数都用过的cost的函数。
  1. //cost函数
  2. double max = ((count - 1) * mean) + (total - mean);
  3. for (double n : stats) {
  4.         double diff = Math.abs(mean - n);
  5.         totalCost += diff;
  6. }
  7. double scaled =  scale(0, max, totalCost);
  8. return scaled;
  9. //scale函数
  10. protected double scale(double min, double max, double value) {
  11.       if (max == 0 || value == 0) {
  12.         return 0;
  13.       }
  14.       return Math.max(0d, Math.min(1d, (value - min) / max));
  15. }
复制代码


经过分析吧,我觉得影响里面最后cost最大的是它的权重,下面给一下,这些function的默认权重。

RegionCountSkewCostFunction hbase.master.balancer.stochastic.regionCountCost ,默认值500

MoveCostFunction hbase.master.balancer.stochastic.moveCost,默认值是100

localityCost hbase.master.balancer.stochastic.localityCost,默认值是25

TableSkewCostFunction hbase.master.balancer.stochastic.tableSkewCost,默认值是35

ReadRequestCostFunction hbase.master.balancer.stochastic.readRequestCost,默认值是5

WriteRequestCostFunction hbase.master.balancer.stochastic.writeRequestCost,默认值是5

MemstoreSizeCostFunction hbase.master.balancer.stochastic.memstoreSizeCost,默认值是5

StoreFileCostFunction hbase.master.balancer.stochastic.storefileSizeCost,默认值是5

Storefile的默认值是5,那么低。。。可以试着提高一下这个参数,使它在计算cost消耗的时候,产生更加正向的意义,效果不好说。
4. 根据虚拟的集群状态生成RegionPlan,这里就不说了
  1. List<RegionPlan> plans = createRegionPlans(cluster);
复制代码
 源码的分析完毕,要想减少存储内容分布不均匀,可以试着考虑增加一个picker,这样又不会缺少对其他条件的考虑,具体可以参考LoadPicker,复制它的实现再写一个,在pickMostLoadedServer和pickLeastLoadedServer这两个方法里面把考虑的条件改一下,以前的条件是Integer[] servers = cluster.serverIndicesSortedByRegionCount; 通过这个来查找一下负载最高和最低的server,那么现在我们要在Cluster里面增加一个Server ---> StoreFile大小的关系映射集合,但是这里面没有,只有regionLoads,RegionLoad这个类有一个方法getStorefileSizeMB可以获得StoreFile的大小,我们通过里面的region和server的映射regionIndexToServerIndex来最后计算出来这个映射关系即可,这个计算映射关系个过程放在Cluster的构造函数里面。




岑玉海



已有(1)人评论

跳转到指定楼层
llp 发表于 2017-5-8 21:09:56
请问大神负载均衡策略是放在hbase的哪个模块,有没有体现均衡策略的细节图呢
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条