分享

Ceilometer项目源码分析----ceilometer报警器服务的实现概览

坎蒂丝_Swan 发表于 2014-12-12 21:09:59 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 13641
本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:41 编辑

问题导航

问题1:  ceilometer的报警系统包括哪些内容?
问题2:/ceilometer/alarm/service.py中的类是如何发挥其重要作用的?








ceilometer报警器服务的实现概览

    ceilometer的报警系统包括比较多的内容,比如报警器状态的评估判定方式有两种,即联合报警器状态评估和单一报警器状态评估,而报警器系统的实现也有两种,即单例报警系统和分布式报警系统。而ceilometer的报警器实现都包含在/ceilometer/alarm中,在这篇帖子中,我们先来
分析/ceilometer/alarm/service.py,来从整体的高度把握报警系统的内容。

在/ceilometer/alarm/service.py中实现了四个类:

类AlarmService:报警服务实现的基类;

类SingletonAlarmService:单例报警系统服务;

类PartitionedAlarmService:分布式报警器系统服务;

类AlarmNotifierService:报警器被触发的通知服务实现;


1 类AlarmService:报警服务实现的基类

这个类是报警系统实现的基类,来看看其中实现的比较重要的方法;

1.1 def_evaluate_assigned_alarms(self)


  1. def _evaluate_assigned_alarms(self):  
  2.     """
  3.     获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定;
  4.     1.获取当前部分的所有报警器;
  5.     2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination),
  6.       来实现单一报警器模式或者联合报警器模式的评估判定;
  7.     """  
  8.     try:  
  9.         # 获取所有报警器列表;  
  10.         # 对于单例报警器:通过指定客户端通过http协议发送GET请求获取分配给当前部分的报警器;  
  11.         alarms = self._assigned_alarms()  
  12.         LOG.info(_('initiating evaluation cycle on %d alarms') %  
  13.                  len(alarms))  
  14.               
  15.         # 遍历所有的报警器;  
  16.         # 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;  
  17.         for alarm in alarms:  
  18.             self._evaluate_alarm(alarm)  
  19.     except Exception:  
  20.         LOG.exception(_('alarm evaluation cycle failed'))  
复制代码


方法小结:
获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定;
1.获取当前部分的所有报警器;
2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;

1.2 def _evaluate_alarm(self, alarm)


  1. def _evaluate_alarm(self, alarm):  
  2.     """
  3.     根据alarm的type,将alarm分配给对应的evaluator进行处理;
  4.     单一报警器状态的评估判定;
  5.     联合报警器状态的评估判定;
  6.     """  
  7.     # self.supported_evaluators = [threshold,combination]  
  8.     if alarm.type not in self.supported_evaluators:  
  9.         LOG.debug(_('skipping alarm %s: type unsupported') %  
  10.                   alarm.alarm_id)  
  11.         return  
  12.   
  13.     LOG.debug(_('evaluating alarm %s') % alarm.alarm_id)  
  14.          
  15.     # 两种类型报警器ThresholdEvaluator和CombinationEvaluator的evaluate实现;  
  16.     # 单一报警器状态的评估判定;  
  17.     # 联合报警器状态的评估判定;  
  18.     self.evaluators[alarm.type].obj.evaluate(alarm)  
复制代码


2 类SingletonAlarmService:单例的报警服务

这个类是单例报警系统服务的启动实现,主要实现了单例报警系统服务的启动操作;

2.1 def start(self)

  1. def start(self):  
  2.     """
  3.     单例报警器服务SingletonAlarmService的启动操作;
  4.     按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
  5.     获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
  6.     """  
  7.     super(SingletonAlarmService, self).start()  
  8.     if self.evaluators:  
  9.         """
  10.         评估周期60s;
  11.         """  
  12.         interval = cfg.CONF.alarm.evaluation_interval  
  13.               
  14.         # 按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;  
  15.         # _evaluate_assigned_alarms:  
  16.         # 先获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;  
  17.         # 针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;  
  18.         self.tg.add_timer(  
  19.             interval,  
  20.             self._evaluate_assigned_alarms,  
  21.             0)  
  22.     # Add a dummy thread to have wait() working  
  23.     self.tg.add_timer(604800, lambda: None)  
复制代码

方法小结:
单例报警器服务SingletonAlarmService的启动操作;
按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;

2.2 def _assigned_alarms(self)


  1. def _assigned_alarms(self):  
  2.     """
  3.     通过指定客户端通过http协议发送GET请求获取分配给当前部分的报警器;
  4.     """  
  5.     return self._client.alarms.list(q=[{'field': 'enabled',  
  6.                                         'value': True}])  
复制代码

3 类PartitionedAlarmService:分布式报警器系统服务

这个类是分布式报警系统服务的实现,主要实现了分布式报警系统服务的启动和其他操作;

3.1 def start(self)

  1. def start(self):  
  2.     """
  3.     分布式报警器系统服务分布式报警器系统服务的启动和运行;
  4.     按照一定的时间间隔周期性的执行以下操作:
  5.     1.实现广播当前partition的存在性的存在性到所有的partition
  6.     (包括uuid和优先级信息);
  7.     2.实现定期检测主控权角色;确定当前的partition是否是主控角色;
  8.       如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
  9.       情况1:所有报警器都要实现重新分配操作;
  10.       情况2:只有新建立的报警器需要实现分配操作;
  11.     3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
  12.       针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
  13.       单一报警器模式或者联合报警器模式的评估判定;
  14.     """  
  15.     # 启动PartitionedAlarmService服务;  
  16.     super(PartitionedAlarmService, self).start()  
  17.     if self.evaluators:  
  18.         """
  19.         报警评估周期60s;
  20.         """  
  21.         eval_interval = cfg.CONF.alarm.evaluation_interval  
  22.               
  23.         """
  24.         self.tg = threadgroup.ThreadGroup(1000)
  25.         按照一定时间间隔实现循环执行方法self.partition_coordinator.report_presence;
  26.         通过方法fanout_cast实现广播当前partition的存在性的存在性到所有的partition(包括uuid和优先级信息);
  27.         """  
  28.         self.tg.add_timer(  
  29.             eval_interval / 4, # 15s  
  30.             self.partition_coordinator.report_presence,  
  31.             0)  
  32.               
  33.         """
  34.         按照一定时间间隔实现循环执行方法self.partition_coordinator.check_mastership;
  35.         self.partition_coordinator.check_mastership:
  36.         实现定期检测主控权角色;
  37.         确定当前的partition是否是主控角色;
  38.         如果为拥有主控权的partition,则根据不同情况实现不同形式的报警器分配操作;
  39.         """  
  40.         self.tg.add_timer(  
  41.             eval_interval / 2, # 30s  
  42.             self.partition_coordinator.check_mastership,  
  43.             eval_interval,     # 60s  
  44.             # _client:构建或重新使用一个经过验证的API客户端;  
  45.             *[eval_interval, self._client])  
  46.               
  47.         """
  48.         add_timer:按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
  49.         self._evaluate_assigned_alarms:
  50.         先获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
  51.         针对每一个报警器,实现根据报警器类型(threshold和combination),来实现:
  52.         单一报警器状态的评估判定;
  53.         联合报警器状态的评估判定;
  54.         """  
  55.         self.tg.add_timer(  
  56.             eval_interval,     # 60s  
  57.             # 执行报警器的评估操作;  
  58.             self._evaluate_assigned_alarms,  
  59.             eval_interval)  
  60.          
  61.     # Add a dummy thread to have wait() working  
  62.     self.tg.add_timer(604800, lambda: None)  
复制代码

方法小结:
分布式报警器系统服务分布式报警器系统服务的启动和运行;
按照一定的时间间隔周期性的执行以下操作:

1.实现广播当前partition的存在性的存在性到所有的partition
(包括uuid和优先级信息);

2.实现定期检测主控权角色;确定当前的partition是否是主控角色;
  如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
  情况1:所有报警器都要实现重新分配操作;
  情况2:只有新建立的报警器需要实现分配操作;

3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
  针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
  单一报警器模式或者联合报警器模式的评估判定;

3.2 def _assigned_alarms(self)


  1. def _assigned_alarms(self):  
  2.     """
  3.     通过指定客户端获取分配给当前部分的报警器;
  4.     """  
  5.     return self.partition_coordinator.assigned_alarms(self._client)  
复制代码

3.3 def assign(self, context, data)

  1. def assign(self, context, data):  
  2.     """
  3.     接收所收到的报警器ID值,如果uuid和本Partition的uuid相匹配,
  4.     则接收报警器ID值,则赋值给assignment;
  5.     用于全部报警器的分配操作;
  6.     """  
  7.     self.partition_coordinator.assign(data.get('uuid'),  
  8.                                       data.get('alarms'))  
复制代码

3.4 def allocate(self, context, data)

  1. <span style="font-size:18px;">def allocate(self, context, data):  
  2.     """
  3.     接收所收到的报警器ID值,如果uuid和本Partition的uuid相匹配,
  4.     则接收报警器ID值,则存储到assignment中;
  5.     用于部分报警器的分配操作;
  6.     """  
  7.     self.partition_coordinator.allocate(data.get('uuid'),  
  8.                                         data.get('alarms'))</span>  
复制代码

4 类AlarmNotifierService:报警器被触发的通知服务实现


这个类所描述的是当报警器被触之后,进行一个报警器报警的通知操作的实现,来看看具体的方法;

4.1 def start(self)


  1. def start(self):  
  2.     """
  3.     服务的启动;  
  4.     为RPC通信建立到信息总线的连接;
  5.     建立指定类型的消息消费者;   
  6.     启动协程实现等待并消费处理队列中的消息;
  7.     """  
  8.     super(AlarmNotifierService, self).start()  
  9.     # Add a dummy thread to have wait() working  
  10.     self.tg.add_timer(604800, lambda: None)  
复制代码

4.2 def _handle_action(self, action, alarm_id, previous, current, reason, reason_data)

  1. def _handle_action(self, action, alarm_id, previous,  
  2.                        current, reason, reason_data):  
  3.     """
  4.     获取系统所采用的消息通信方式;
  5.     通过HTTPS协议POST方法实现发送相关报警器被触发的通知(or)通过日志记录相关报警器被触发的通知;
  6.          
  7.          
  8.     采用两种方式之一实现相关报警器的通知:
  9.     通过HTTPS协议POST方法实现发送相关报警器的通知;
  10.     通过日志记录相关报警器的通知;
  11.      
  12.     # action:要通知的action的URL;
  13.     # alarm_id:被触发的报警器的ID;
  14.     # previous:报警器(触发)之前的状态;
  15.     # current:报警器被触发后转换到的新状态;
  16.     # reason:改变报警器状态的原因;
  17.     """  
  18.     try:  
  19.         action = network_utils.urlsplit(action)  
  20.     except Exception:  
  21.         LOG.error(  
  22.             _("Unable to parse action %(action)s for alarm %(alarm_id)s"),  
  23.             {'action': action, 'alarm_id': alarm_id})  
  24.         return  
  25.   
  26.     """
  27.     获取系统所采用的消息通信方式;
  28.     ceilometer.alarm.notifier =
  29.     log = ceilometer.alarm.notifier.log:LogAlarmNotifier
  30.     test = ceilometer.alarm.notifier.test:TestAlarmNotifier
  31.     http = ceilometer.alarm.notifier.rest:RestAlarmNotifier
  32.     https = ceilometer.alarm.notifier.rest:RestAlarmNotifier
  33.     """  
  34.     try:  
  35.         notifier = self.notifiers[action.scheme].obj  
  36.     except KeyError:  
  37.         scheme = action.scheme  
  38.         LOG.error(  
  39.             _("Action %(scheme)s for alarm %(alarm_id)s is unknown, "  
  40.               "cannot notify"),  
  41.             {'scheme': scheme, 'alarm_id': alarm_id})  
  42.         return  
  43.   
  44.     """
  45.     通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),或者是通过日志记录相关报警器的通知;
  46.     """  
  47.     try:  
  48.         LOG.debug(_("Notifying alarm %(id)s with action %(act)s") % (  
  49.                   {'id': alarm_id, 'act': action}))  
  50.         notifier.notify(action, alarm_id, previous,  
  51.                         current, reason, reason_data)  
  52.     except Exception:  
  53.         LOG.exception(_("Unable to notify alarm %s"), alarm_id)  
  54.         return  
复制代码

方法小结:

获取系统所采用的消息通信方式;
通过HTTPS协议POST方法实现发送相关报警器被触发的通知(or)通过日志记录相关报警器被触发的通知;

4.3 def notify_alarm(self, context, data)


  1. def notify_alarm(self, context, data):  
  2.     """
  3.     遍历所有要通知报警器被触发的URL,针对每个要通知的URL地址,实现:
  4.     获取系统所采用的消息通信方式;
  5.     通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),(or)通过日志记录相关报警器被触发的通知;
  6.     通知内容中包括:
  7.     alarm_id:被触发的报警器的ID;
  8.     previous:报警器(触发)之前的状态;
  9.     current:报警器被触发后转换到的新状态;
  10.     reason:改变报警器状态的原因;
  11.     """  
  12.     actions = data.get('actions')  
  13.     if not actions:def notify_alarm(self, context, data):  
  14.     """
  15.     遍历所有要通知报警器被触发的URL,针对每个要通知的URL地址,实现:
  16.     获取系统所采用的消息通信方式;
  17.     通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),(or)通过日志记录相关报警器被触发的通知;
  18.     通知内容中包括:
  19.     alarm_id:被触发的报警器的ID;
  20.     previous:报警器(触发)之前的状态;
  21.     current:报警器被触发后转换到的新状态;
  22.     reason:改变报警器状态的原因;
  23.     """  
  24.     actions = data.get('actions')  
  25.     if not actions:  
  26.         LOG.error(_("Unable to notify for an alarm with no action"))  
  27.         return  
  28.   
  29.     for action in actions:  
  30.         """
  31.         获取系统所采用的消息通信方式;
  32.         通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),(or)通过日志记录相关报警器被触发的通知;
  33.         """  
  34.         self._handle_action(action,  
  35.                             data.get('alarm_id'),  
  36.                             data.get('previous'),  
  37.                             data.get('current'),  
  38.                             data.get('reason'),  
  39.                             data.get('reason_data'))  
复制代码

方法小结:
遍历所有要通知报警器被触发的URL,针对每个要通知的URL地址,实现:
获取系统所采用的消息通信方式;
通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),(or)通过日志记录相关报警器被触发的通知;






Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动


欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条