分享

Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现

坎蒂丝_Swan 发表于 2014-12-12 23:19:09 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 14000
本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:42 编辑


问题导读
问题1:如何实现对报警器的分配操作?
问题2:拥有主控权的partition,会根据不同的情况实现哪些不同形式的报警器分配操作?









      在前面的帖子中,我们分析过在/ceilometer/alarm/service.py中实现了类PartitionedAlarmService,它的主要功能是实现了分布式报警系统服务的启动和其他操作,当时我们只是从整体上分析了分布式报警系统的启动和实现,而分布式报警系统各个操作的具体实现则是在/ceilometer/alarm/partition/coordination.py中的类PartitionCoordinator中,所以这篇帖子将会详细地分析分布式报警系统的具体实现,即分析类PartitionCoordinator中各个方法的源码实现。

1 def _distribute(self, alarms, rebalance)

  1. def _distribute(self, alarms, rebalance):  
  2.     """
  3.     实现对报警器的分配操作;               
  4.     如果需要对全部报警器执行重新分配操作,self.coordination_rpc.assign(对所有的报警器进行分配操作):
  5.     通过广播发送执行assign操作的通知给所有的Partition,在所有的Partition执行接收操作;在接收操作中,如果uuid和某Partition相应的uuid值相匹配,则该Partition实现赋值本alarms的ID值到其本地;
  6.     如果不需要对全部报警器执行重新分配操作,self.coordination_rpc.allocate(只对部分(新增)的报警器进行分配操作,所以某个Partition上可有多余1个的报警器):
  7.     通过广播发送执行allocate操作的通知给所有的Partition,在所有的Partition执行接收操作;在接收操作中,如果uuid和某Partition相应的uuid值相匹配,则该Partition实现添加本alarms的ID值到其本地;
  8.     """  
  9.     verb = 'assign' if rebalance else 'allocate'  
  10.          
  11.     """
  12.     根据rebalance的值,来确定所要执行的分配报警器的方法;
  13.     如果需要对全部报警器重新分配操作,则调用self.coordination_rpc.assign;
  14.     如果不需要对全部报警器重新分配操作,则调用self.coordination_rpc.allocate;
  15.          
  16.     self.coordination_rpc.assign:
  17.     通过广播发送执行assign操作的通知给所有的Partition,
  18.     在所有的Partition执行接收操作;
  19.     在接收操作中,如果uuid和某Partition相应的uuid值相匹配,
  20.     则该Partition实现赋值本alarms的ID值到其本地;
  21.     self.coordination_rpc.allocate:
  22.     通过广播发送执行allocate操作的通知给所有的Partition,
  23.     在所有的Partition执行接收操作;
  24.     在接收操作中,如果uuid和某Partition相应的uuid值相匹配,
  25.     则该Partition实现添加本alarms的ID值到其本地;
  26.     """  
  27.   
  28.     method = (self.coordination_rpc.assign if rebalance  
  29.               else self.coordination_rpc.allocate)  
  30.     LOG.debug(_('triggering %s') % verb)  
  31.     LOG.debug(_('known evaluators %s') % self.reports)  
  32.          
  33.     """
  34.     计算每个evaluator上所要分配的报警器数目;
  35.     """  
  36.     per_evaluator = int(math.ceil(len(alarms) /  
  37.                         float(len(self.reports) + 1)))  
  38.     LOG.debug(_('per evaluator allocation %s') % per_evaluator)  
  39.          
  40.     """
  41.     获取所有的evaluator;
  42.     对所有的evaluator进行洗牌操作;
  43.     """  
  44.     evaluators = self.reports.keys()  
  45.     random.shuffle(evaluators)  
  46.     offset = 0  
  47.          
  48.     """
  49.     遍历所有的evaluator;
  50.     """  
  51.     for evaluator in evaluators:  
  52.         if self.oldest < self.this:  
  53.             LOG.warn(_('%(this)s bailing on distribution cycle '  
  54.                        'as older partition detected: %(older)s') %  
  55.                      dict(this=self.this, older=self.oldest))  
  56.             return False  
  57.      
  58.         """
  59.         从所有报警器集合中获取一个报警器;
  60.         """  
  61.         allocation = alarms[offset:offset + per_evaluator]  
  62.               
  63.         """
  64.         调用之前确定的分配方法,实现对报警器的分配操作;
  65.         self.coordination_rpc.assign:
  66.         通过广播发送执行assign操作的通知给所有的Partition,
  67.         在所有的Partition执行接收操作;
  68.         在接收操作中,如果uuid和某Partition相应的uuid值相匹配,
  69.         则该Partition实现赋值本alarms的ID值到其本地;
  70.         self.coordination_rpc.allocate:
  71.         通过广播发送执行allocate操作的通知给所有的Partition,
  72.         在所有的Partition执行接收操作;
  73.         在接收操作中,如果uuid和某Partition相应的uuid值相匹配,
  74.         则该Partition实现添加本alarms的ID值到其本地;
  75.         """  
  76.         if allocation:  
  77.             LOG.debug(_('%(verb)s-ing %(alloc)s to %(eval)s') %  
  78.                       dict(verb=verb, alloc=allocation, eval=evaluator))  
  79.             method(evaluator.uuid, allocation)  
  80.               
  81.         """
  82.         为下一个报警器的获取做准备;
  83.         """  
  84.         offset += per_evaluator  
  85.     LOG.debug(_('master taking %s for self') % alarms[offset:])  
  86.          
  87.     """
  88.     对于本Partition所分配的报警器的实现;
  89.     """  
  90.     if rebalance:  
  91.         self.assignment = alarms[offset:]  
  92.     else:  
  93.         self.assignment.extend(alarms[offset:])  
  94.     return True  
复制代码

方法小结:
实现对报警器的分配操作;

1.如果需要对全部报警器执行重新分配操作,self.coordination_rpc.assign(对所有的报警器进行分配操作):
  通过广播发送执行assign操作的通知给所有的Partition,在所有的Partition执行接收操作;在接收操作中,如果uuid和某Partition相应的uuid值相匹配,则该Partition实现赋值本alarms的ID值到其本地;


2.如果不需要对全部报警器执行重新分配操作,self.coordination_rpc.allocate(只对部分(新增)的报警器进行分配操作,所以某个Partition上可有多余1个的报警器):
  通过广播发送执行allocate操作的通知给所有的Partition,在所有的Partition执行接收操作;在接收操作中,如果uuid和某Partition相应的uuid值相匹配,则该Partition实现添加本alarms的ID值到其本地;

2 def _deletion_requires_rebalance(self, alarms)

  1. def _deletion_requires_rebalance(self, alarms):  
  2.     """
  3.     通过获取最新已删除的报警器数据,来确定是否需要执行rebalance操作;
  4.     如果已删除的报警器数据多余当前报警器数据的五分之一,则返回True,说明需要执行rebalance操作;
  5.     """  
  6.     # 获取最新已经删除的报警器集合;  
  7.     deleted_alarms = self.last_alarms - set(alarms)  
  8.     LOG.debug(_('newly deleted alarms %s') % deleted_alarms)  
  9.          
  10.     # 存储最新的已删除的报警器数据;  
  11.     self.deleted_alarms.update(deleted_alarms)  
  12.          
  13.     # 如果已删除的报警器数据多余当前报警器数据的五分之一,则返回True,说明需要执行rebalance操作;  
  14.     if len(self.deleted_alarms) > len(alarms) / 5:  
  15.         LOG.debug(_('alarm deletion activity requires rebalance'))  
  16.         self.deleted_alarms = set()  
  17.         return True  
  18.     return False  
复制代码

方法小结:
通过获取最新已删除的报警器数据,来确定是否需要执行rebalance操作;
如果已删除的报警器数据多余当前报警器数据的五分之一,则返回True,说明需要执行rebalance操作;

3 def _is_master(self, interval)

  1. def _is_master(self, interval):  
  2.     """
  3.     确定当前的partition是否是主控角色;
  4.     """  
  5.     now = timeutils.utcnow()  
  6.     if timeutils.delta_seconds(self.start, now) < interval * 2:  
  7.         LOG.debug(_('%s still warming up') % self.this)  
  8.         return False  
  9.          
  10.     is_master = True  
  11.     for partition, last_heard in self.reports.items():  
  12.         delta = timeutils.delta_seconds(last_heard, now)  
  13.         LOG.debug(_('last heard from %(report)s %(delta)s seconds ago') %  
  14.                   dict(report=partition, delta=delta))  
  15.         if delta > interval * 2:  
  16.             del self.reports[partition]  
  17.             self._record_oldest(partition, stale=True)  
  18.             LOG.debug(_('%(this)s detects stale evaluator: %(stale)s') %  
  19.                       dict(this=self.this, stale=partition))  
  20.             self.presence_changed = True  
  21.         elif partition < self.this:  
  22.             is_master = False  
  23.             LOG.info(_('%(this)s sees older potential master: %(older)s')  
  24.                      % dict(this=self.this, older=partition))  
  25.     LOG.info(_('%(this)s is master?: %(is_master)s') %  
  26.              dict(this=self.this, is_master=is_master))  
  27.     return is_master  
复制代码

4 def _master_role(self, assuming, api_client)
  1. def _master_role(self, assuming, api_client):  
  2.     """
  3.     作为拥有主控权的partition,根据不同的情况实现不同形式的报警器分配操作;
  4.     1.需要整个分布式报警系统的重平衡操作
  5.       如果assuming为True,说明此报警器为新进的分布式报警系统的主控节点(所以需   
  6.       要整个分布式报警系统的重平衡操作);
  7.       如果sufficient_deletion为True,说明已删除的报警器数据多余当前报警器数据的   
  8.       五分之一(变化过多,所以需要整个分布式报警系统的重平衡操作);
  9.       如果presence_changed为True,则说明需要执行整个分布式报警系统的重平衡操作;
  10.     2.对部分报警器(新建立报警器)实现重平衡操作;
  11.     3.对于其他情况,不需要进行报警器的分配操作;
  12.     """  
  13.          
  14.     """
  15.     通过客户端获取当前所有partition的报警器;
  16.     """  
  17.     alarms = [a.alarm_id for a in api_client.alarms.list()]  
  18.          
  19.     """
  20.     获取新建立的报警器;
  21.     当前的报警器集合减去之前的报警器集合,得到新建立的报警器;
  22.     """  
  23.     created_alarms = list(set(alarms) - self.last_alarms)  
  24.     LOG.debug(_('newly created alarms %s') % created_alarms)  
  25.          
  26.     """
  27.     通过获取最新已删除的报警器数据,来确定是否需要执行rebalance操作;
  28.     如果已删除的报警器数据多余当前报警器数据的五分之一,则返回True,说明需要执
  29.     行rebalance操作;
  30.     """  
  31.     sufficient_deletion = self._deletion_requires_rebalance(alarms)  
  32.          
  33.     """
  34.     如果assuming为True,说明此报警器为新进的分布式报警系统的主控节点(所以需要
  35.     整个分布式报警系统的重平衡操作);
  36.     如果sufficient_deletion为True,说明已删除的报警器数据多余当前报警器数据的五
  37.     分之一(变化过多,所以需要整个分布式报警系统的重平衡操作);
  38.     如果presence_changed为True,则说明需要执行整个分布式报警系统的重平衡操作;
  39.     """  
  40.     # _distribute:实现对报警器的分配操作;  
  41.     if (assuming or sufficient_deletion or self.presence_changed):  
  42.         still_ahead = self._distribute(alarms, rebalance=True)  
  43.               
  44.     """
  45.     对于新建立报警器,实现对报警器分配操作,不需要进行所有报警器的重新分配操作;
  46.     """  
  47.     elif created_alarms:  
  48.         still_ahead = self._distribute(list(created_alarms),  
  49.                                        rebalance=False)  
  50.          
  51.     """
  52.     对于其他情况,不需要进行报警器的分配操作;
  53.     """  
  54.     else:  
  55.         still_ahead = self.this < self.oldest  
  56.          
  57.     """
  58.     实现更新报警器的集合;
  59.     """  
  60.     self.last_alarms = set(alarms)  
  61.     LOG.info(_('%(this)s not overtaken as master? %(still_ahead)s') %  
  62.             ({'this': self.this, 'still_ahead': still_ahead}))  
  63.     return still_ahead  
复制代码

方法小结:
作为拥有主控权的partition,根据不同的情况实现不同形式的报警器分配操作;

1.需要整个分布式报警系统的重平衡操作
  如果assuming为True,说明此报警器为新进的分布式报警系统的主控节点(所以需要整个分布式报警系统的重平衡操作);
  如果sufficient_deletion为True,说明已删除的报警器数据多余当前报警器数据的五分之一(变化过多,所以需要整个分布式报警系统的重平衡操作);
  如果presence_changed为True,则说明需要执行整个分布式报警系统的重平衡操作;

2.对部分报警器(新建立报警器)实现重平衡操作;

3.对于其他情况,不需要进行报警器的分配操作;

5 def check_mastership(self, eval_interval, api_client)

  1. def check_mastership(self, eval_interval, api_client):  
  2.     """
  3.     实现定期检测主控权角色;
  4.     确定当前的partition是否是主控角色;
  5.     如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
  6.     """  
  7.     LOG.debug(_('%s checking mastership status') % self.this)  
  8.     try:  
  9.         assuming = not self.is_master  
  10.               
  11.         """
  12.         _is_master:确定当前的partition是否是主控角色;
  13.         _master_role:作为拥有主控权的partition,根据不同的情况实现不同形式的报警器分配操作;
  14.         """  
  15.         self.is_master = (self._is_master(eval_interval) and  
  16.                           self._master_role(assuming, api_client))  
  17.         self.presence_changed = False  
  18.     except Exception:  
  19.         LOG.exception(_('mastership check failed'))  
复制代码


方法小结:
实现定期检测主控权角色;
确定当前的partition是否是主控角色;
如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;

6 def presence(self, uuid, priority)

  1. def presence(self, uuid, priority):  
  2.     """
  3.     实现接收某一个Partition广播的其存在性信息;
  4.     """  
  5.     report = PartitionIdentity(uuid, priority)  
  6.     if report != self.this:  
  7.         if report not in self.reports:  
  8.             self.presence_changed = True  
  9.         self._record_oldest(report)  
  10.         self.reports[report] = timeutils.utcnow()  
  11.         LOG.debug(_('%(this)s knows about %(reports)s') %  
  12.                   dict(this=self.this, reports=self.reports))  
复制代码

7 def report_presence(self)

  1. def report_presence(self):  
  2.     """
  3.     通过方法fanout_cast实现广播当前partition的存在性
  4.     (包括uuid和优先级信息)通知给所有的Partition;
  5.     """  
  6.     LOG.debug(_('%s reporting presence') % self.this)  
  7.          
  8.     """
  9.     广播当前partition的存在性(包括uuid和优先级信息)通知给所有的Partition;
  10.     """  
  11.     try:  
  12.         self.coordination_rpc.presence(self.this.uuid, self.this.priority)  
  13.     except Exception:  
  14.         LOG.exception(_('presence reporting failed'))  
复制代码

8 def assigned_alarms(self, api_client)

  1. def assigned_alarms(self, api_client):  
  2.     """
  3.     通过指定客户端获取分配给当前partition的报警器;
  4.     """  
  5.     if not self.assignment:  
  6.         LOG.debug(_('%s has no assigned alarms to evaluate') % self.this)  
  7.         return []  
  8.   
  9.     try:  
  10.         LOG.debug(_('%(this)s alarms for evaluation: %(alarms)s') %  
  11.                   dict(this=self.this, alarms=self.assignment))  
  12.         return [a for a in api_client.alarms.list(q=[{'field': 'enabled',  
  13.                                                       'value': True}])  
  14.                 if a.alarm_id in self.assignment]  
  15.     except Exception:  
  16.         LOG.exception(_('assignment retrieval failed'))  
  17.         return []  
复制代码


好了,分布式报警系统的源码具体实现解析完成,下面对分布式报警系统做总体的小结:


    对于PartitionedAlarmService,它通过rpc实现了一套多个evaluator进程之间的协作协议(PartitionCoordinator),使得可以通过水平扩展来不断增大alarm service的处理能力,这样不仅实现了一个简单的负载均衡,还实现了高可用。下面我们就重点来说一下PartitionCoordinator这个协议。

    PartitionCoordinator允许启动多个ceilometer-alarm-evaluator进程,这多个进程之间的关系是互相协作的关系,他们中最早启动的进程会被选为master进程,master进程主要做的事情就是给其他进程分配alarm,每个进程都在周期性的执行三个任务:

    1.通过rpc,向其它进程广播自己的状态,来告知其他进程,我是活着的,每个进程中都保存有其他进程的最后活跃时间;

    2.争抢master,每个进程都会不断的更新自己所维护的其它进程的状态列表,根据这个状态列表,来判断是否应该由自己来当master,判断一个进程是否是master的条件只有一个,那就是看谁启动的早;

    3.检查本进程负责的alarm,会去调用ceilometer的api,来获取该alarm的监控指标对应的监控数据,然后进行判断,发送报警等;

    当一个进程被确定为master之后,如果它不挂掉,那么它的master是不会被抢走的,该进程就会一直在履行master的职责:

    1.当有新的alarm被创建时,master会将这些新创建的alarm平均的分配给其它worker进程,如果不能平均分配的,剩下的零头就由master自己来负责;

    2.当有新的evaluator进程添加进来,或者是现有的evaluator进程被kill掉,那么master就会重新洗牌一次,把所有的alarm再平均的分配给现有的evaluator进程;

    3.当master挂掉咋办呢?那么就会由第二个最早启动的进程接替master的位置,然后重新洗牌

通过这个协议,就实现了一个简单的分布式alarm服务;


    好了,到此为止,分布式报警系统的具体源码实现分析和总结工作到此为止~~~~谢谢~~~~






Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动





欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条