坎蒂丝_Swan 发表于 2014-12-12 23:19:09

Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现

本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:42 编辑



问题导读
问题1:如何实现对报警器的分配操作?
问题2:拥有主控权的partition,会根据不同的情况实现哪些不同形式的报警器分配操作?



static/image/hrline/4.gif





      在前面的帖子中,我们分析过在/ceilometer/alarm/service.py中实现了类PartitionedAlarmService,它的主要功能是实现了分布式报警系统服务的启动和其他操作,当时我们只是从整体上分析了分布式报警系统的启动和实现,而分布式报警系统各个操作的具体实现则是在/ceilometer/alarm/partition/coordination.py中的类PartitionCoordinator中,所以这篇帖子将会详细地分析分布式报警系统的具体实现,即分析类PartitionCoordinator中各个方法的源码实现。

1 def _distribute(self, alarms, rebalance)

def _distribute(self, alarms, rebalance):
    """
    实现对报警器的分配操作;               
    如果需要对全部报警器执行重新分配操作,self.coordination_rpc.assign(对所有的报警器进行分配操作):
    通过广播发送执行assign操作的通知给所有的Partition,在所有的Partition执行接收操作;在接收操作中,如果uuid和某Partition相应的uuid值相匹配,则该Partition实现赋值本alarms的ID值到其本地;
    如果不需要对全部报警器执行重新分配操作,self.coordination_rpc.allocate(只对部分(新增)的报警器进行分配操作,所以某个Partition上可有多余1个的报警器):
    通过广播发送执行allocate操作的通知给所有的Partition,在所有的Partition执行接收操作;在接收操作中,如果uuid和某Partition相应的uuid值相匹配,则该Partition实现添加本alarms的ID值到其本地;
    """
    verb = 'assign' if rebalance else 'allocate'
         
    """
    根据rebalance的值,来确定所要执行的分配报警器的方法;
    如果需要对全部报警器重新分配操作,则调用self.coordination_rpc.assign;
    如果不需要对全部报警器重新分配操作,则调用self.coordination_rpc.allocate;
         
    self.coordination_rpc.assign:
    通过广播发送执行assign操作的通知给所有的Partition,
    在所有的Partition执行接收操作;
    在接收操作中,如果uuid和某Partition相应的uuid值相匹配,
    则该Partition实现赋值本alarms的ID值到其本地;
    self.coordination_rpc.allocate:
    通过广播发送执行allocate操作的通知给所有的Partition,
    在所有的Partition执行接收操作;
    在接收操作中,如果uuid和某Partition相应的uuid值相匹配,
    则该Partition实现添加本alarms的ID值到其本地;
    """

    method = (self.coordination_rpc.assign if rebalance
            else self.coordination_rpc.allocate)
    LOG.debug(_('triggering %s') % verb)
    LOG.debug(_('known evaluators %s') % self.reports)
         
    """
    计算每个evaluator上所要分配的报警器数目;
    """
    per_evaluator = int(math.ceil(len(alarms) /
                        float(len(self.reports) + 1)))
    LOG.debug(_('per evaluator allocation %s') % per_evaluator)
         
    """
    获取所有的evaluator;
    对所有的evaluator进行洗牌操作;
    """
    evaluators = self.reports.keys()
    random.shuffle(evaluators)
    offset = 0
         
    """
    遍历所有的evaluator;
    """
    for evaluator in evaluators:
      if self.oldest < self.this:
            LOG.warn(_('%(this)s bailing on distribution cycle '
                     'as older partition detected: %(older)s') %
                     dict(this=self.this, older=self.oldest))
            return False
   
      """
      从所有报警器集合中获取一个报警器;
      """
      allocation = alarms
            
      """
      调用之前确定的分配方法,实现对报警器的分配操作;
      self.coordination_rpc.assign:
      通过广播发送执行assign操作的通知给所有的Partition,
      在所有的Partition执行接收操作;
      在接收操作中,如果uuid和某Partition相应的uuid值相匹配,
      则该Partition实现赋值本alarms的ID值到其本地;
      self.coordination_rpc.allocate:
      通过广播发送执行allocate操作的通知给所有的Partition,
      在所有的Partition执行接收操作;
      在接收操作中,如果uuid和某Partition相应的uuid值相匹配,
      则该Partition实现添加本alarms的ID值到其本地;
      """
      if allocation:
            LOG.debug(_('%(verb)s-ing %(alloc)s to %(eval)s') %
                      dict(verb=verb, alloc=allocation, eval=evaluator))
            method(evaluator.uuid, allocation)
            
      """
      为下一个报警器的获取做准备;
      """
      offset += per_evaluator
    LOG.debug(_('master taking %s for self') % alarms)
         
    """
    对于本Partition所分配的报警器的实现;
    """
    if rebalance:
      self.assignment = alarms
    else:
      self.assignment.extend(alarms)
    return True
方法小结:
实现对报警器的分配操作;

1.如果需要对全部报警器执行重新分配操作,self.coordination_rpc.assign(对所有的报警器进行分配操作):
通过广播发送执行assign操作的通知给所有的Partition,在所有的Partition执行接收操作;在接收操作中,如果uuid和某Partition相应的uuid值相匹配,则该Partition实现赋值本alarms的ID值到其本地;


2.如果不需要对全部报警器执行重新分配操作,self.coordination_rpc.allocate(只对部分(新增)的报警器进行分配操作,所以某个Partition上可有多余1个的报警器):
通过广播发送执行allocate操作的通知给所有的Partition,在所有的Partition执行接收操作;在接收操作中,如果uuid和某Partition相应的uuid值相匹配,则该Partition实现添加本alarms的ID值到其本地;

2 def _deletion_requires_rebalance(self, alarms)

def _deletion_requires_rebalance(self, alarms):
    """
    通过获取最新已删除的报警器数据,来确定是否需要执行rebalance操作;
    如果已删除的报警器数据多余当前报警器数据的五分之一,则返回True,说明需要执行rebalance操作;
    """
    # 获取最新已经删除的报警器集合;
    deleted_alarms = self.last_alarms - set(alarms)
    LOG.debug(_('newly deleted alarms %s') % deleted_alarms)
         
    # 存储最新的已删除的报警器数据;
    self.deleted_alarms.update(deleted_alarms)
         
    # 如果已删除的报警器数据多余当前报警器数据的五分之一,则返回True,说明需要执行rebalance操作;
    if len(self.deleted_alarms) > len(alarms) / 5:
      LOG.debug(_('alarm deletion activity requires rebalance'))
      self.deleted_alarms = set()
      return True
    return False
方法小结:
通过获取最新已删除的报警器数据,来确定是否需要执行rebalance操作;
如果已删除的报警器数据多余当前报警器数据的五分之一,则返回True,说明需要执行rebalance操作;

3 def _is_master(self, interval)

def _is_master(self, interval):
    """
    确定当前的partition是否是主控角色;
    """
    now = timeutils.utcnow()
    if timeutils.delta_seconds(self.start, now) < interval * 2:
      LOG.debug(_('%s still warming up') % self.this)
      return False
         
    is_master = True
    for partition, last_heard in self.reports.items():
      delta = timeutils.delta_seconds(last_heard, now)
      LOG.debug(_('last heard from %(report)s %(delta)s seconds ago') %
                  dict(report=partition, delta=delta))
      if delta > interval * 2:
            del self.reports
            self._record_oldest(partition, stale=True)
            LOG.debug(_('%(this)s detects stale evaluator: %(stale)s') %
                      dict(this=self.this, stale=partition))
            self.presence_changed = True
      elif partition < self.this:
            is_master = False
            LOG.info(_('%(this)s sees older potential master: %(older)s')
                     % dict(this=self.this, older=partition))
    LOG.info(_('%(this)s is master?: %(is_master)s') %
             dict(this=self.this, is_master=is_master))
    return is_master
4 def _master_role(self, assuming, api_client)
def _master_role(self, assuming, api_client):
    """
    作为拥有主控权的partition,根据不同的情况实现不同形式的报警器分配操作;
    1.需要整个分布式报警系统的重平衡操作
      如果assuming为True,说明此报警器为新进的分布式报警系统的主控节点(所以需   
      要整个分布式报警系统的重平衡操作);
      如果sufficient_deletion为True,说明已删除的报警器数据多余当前报警器数据的   
      五分之一(变化过多,所以需要整个分布式报警系统的重平衡操作);
      如果presence_changed为True,则说明需要执行整个分布式报警系统的重平衡操作;
    2.对部分报警器(新建立报警器)实现重平衡操作;
    3.对于其他情况,不需要进行报警器的分配操作;
    """
         
    """
    通过客户端获取当前所有partition的报警器;
    """
    alarms =
         
    """
    获取新建立的报警器;
    当前的报警器集合减去之前的报警器集合,得到新建立的报警器;
    """
    created_alarms = list(set(alarms) - self.last_alarms)
    LOG.debug(_('newly created alarms %s') % created_alarms)
         
    """
    通过获取最新已删除的报警器数据,来确定是否需要执行rebalance操作;
    如果已删除的报警器数据多余当前报警器数据的五分之一,则返回True,说明需要执
    行rebalance操作;
    """
    sufficient_deletion = self._deletion_requires_rebalance(alarms)
         
    """
    如果assuming为True,说明此报警器为新进的分布式报警系统的主控节点(所以需要
    整个分布式报警系统的重平衡操作);
    如果sufficient_deletion为True,说明已删除的报警器数据多余当前报警器数据的五
    分之一(变化过多,所以需要整个分布式报警系统的重平衡操作);
    如果presence_changed为True,则说明需要执行整个分布式报警系统的重平衡操作;
    """
    # _distribute:实现对报警器的分配操作;
    if (assuming or sufficient_deletion or self.presence_changed):
      still_ahead = self._distribute(alarms, rebalance=True)
            
    """
    对于新建立报警器,实现对报警器分配操作,不需要进行所有报警器的重新分配操作;
    """
    elif created_alarms:
      still_ahead = self._distribute(list(created_alarms),
                                       rebalance=False)
         
    """
    对于其他情况,不需要进行报警器的分配操作;
    """
    else:
      still_ahead = self.this < self.oldest
         
    """
    实现更新报警器的集合;
    """
    self.last_alarms = set(alarms)
    LOG.info(_('%(this)s not overtaken as master? %(still_ahead)s') %
            ({'this': self.this, 'still_ahead': still_ahead}))
    return still_ahead
方法小结:
作为拥有主控权的partition,根据不同的情况实现不同形式的报警器分配操作;

1.需要整个分布式报警系统的重平衡操作
如果assuming为True,说明此报警器为新进的分布式报警系统的主控节点(所以需要整个分布式报警系统的重平衡操作);
如果sufficient_deletion为True,说明已删除的报警器数据多余当前报警器数据的五分之一(变化过多,所以需要整个分布式报警系统的重平衡操作);
如果presence_changed为True,则说明需要执行整个分布式报警系统的重平衡操作;

2.对部分报警器(新建立报警器)实现重平衡操作;

3.对于其他情况,不需要进行报警器的分配操作;

5 def check_mastership(self, eval_interval, api_client)

def check_mastership(self, eval_interval, api_client):
    """
    实现定期检测主控权角色;
    确定当前的partition是否是主控角色;
    如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
    """
    LOG.debug(_('%s checking mastership status') % self.this)
    try:
      assuming = not self.is_master
            
      """
      _is_master:确定当前的partition是否是主控角色;
      _master_role:作为拥有主控权的partition,根据不同的情况实现不同形式的报警器分配操作;
      """
      self.is_master = (self._is_master(eval_interval) and
                        self._master_role(assuming, api_client))
      self.presence_changed = False
    except Exception:
      LOG.exception(_('mastership check failed'))

方法小结:
实现定期检测主控权角色;
确定当前的partition是否是主控角色;
如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;

6 def presence(self, uuid, priority)

def presence(self, uuid, priority):
    """
    实现接收某一个Partition广播的其存在性信息;
    """
    report = PartitionIdentity(uuid, priority)
    if report != self.this:
      if report not in self.reports:
            self.presence_changed = True
      self._record_oldest(report)
      self.reports = timeutils.utcnow()
      LOG.debug(_('%(this)s knows about %(reports)s') %
                  dict(this=self.this, reports=self.reports))
7 def report_presence(self)
def report_presence(self):
    """
    通过方法fanout_cast实现广播当前partition的存在性
    (包括uuid和优先级信息)通知给所有的Partition;
    """
    LOG.debug(_('%s reporting presence') % self.this)
         
    """
    广播当前partition的存在性(包括uuid和优先级信息)通知给所有的Partition;
    """
    try:
      self.coordination_rpc.presence(self.this.uuid, self.this.priority)
    except Exception:
      LOG.exception(_('presence reporting failed'))
8 def assigned_alarms(self, api_client)

def assigned_alarms(self, api_client):
    """
    通过指定客户端获取分配给当前partition的报警器;
    """
    if not self.assignment:
      LOG.debug(_('%s has no assigned alarms to evaluate') % self.this)
      return []

    try:
      LOG.debug(_('%(this)s alarms for evaluation: %(alarms)s') %
                  dict(this=self.this, alarms=self.assignment))
      return [a for a in api_client.alarms.list(q=[{'field': 'enabled',
                                                      'value': True}])
                if a.alarm_id in self.assignment]
    except Exception:
      LOG.exception(_('assignment retrieval failed'))
      return []


好了,分布式报警系统的源码具体实现解析完成,下面对分布式报警系统做总体的小结:


    对于PartitionedAlarmService,它通过rpc实现了一套多个evaluator进程之间的协作协议(PartitionCoordinator),使得可以通过水平扩展来不断增大alarm service的处理能力,这样不仅实现了一个简单的负载均衡,还实现了高可用。下面我们就重点来说一下PartitionCoordinator这个协议。

    PartitionCoordinator允许启动多个ceilometer-alarm-evaluator进程,这多个进程之间的关系是互相协作的关系,他们中最早启动的进程会被选为master进程,master进程主要做的事情就是给其他进程分配alarm,每个进程都在周期性的执行三个任务:

    1.通过rpc,向其它进程广播自己的状态,来告知其他进程,我是活着的,每个进程中都保存有其他进程的最后活跃时间;

    2.争抢master,每个进程都会不断的更新自己所维护的其它进程的状态列表,根据这个状态列表,来判断是否应该由自己来当master,判断一个进程是否是master的条件只有一个,那就是看谁启动的早;

    3.检查本进程负责的alarm,会去调用ceilometer的api,来获取该alarm的监控指标对应的监控数据,然后进行判断,发送报警等;

    当一个进程被确定为master之后,如果它不挂掉,那么它的master是不会被抢走的,该进程就会一直在履行master的职责:

    1.当有新的alarm被创建时,master会将这些新创建的alarm平均的分配给其它worker进程,如果不能平均分配的,剩下的零头就由master自己来负责;

    2.当有新的evaluator进程添加进来,或者是现有的evaluator进程被kill掉,那么master就会重新洗牌一次,把所有的alarm再平均的分配给现有的evaluator进程;

    3.当master挂掉咋办呢?那么就会由第二个最早启动的进程接替master的位置,然后重新洗牌

通过这个协议,就实现了一个简单的分布式alarm服务;


    好了,到此为止,分布式报警系统的具体源码实现分析和总结工作到此为止~~~~谢谢~~~~






Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动





页: [1]
查看完整版本: Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现