本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:41 编辑
问题导航
问题1: ceilometer的报警系统包括哪些内容? 问题2:/ceilometer/alarm/service.py中的类是如何发挥其重要作用的?
ceilometer报警器服务的实现概览
ceilometer的报警系统包括比较多的内容,比如报警器状态的评估判定方式有两种,即联合报警器状态评估和单一报警器状态评估,而报警器系统的实现也有两种,即单例报警系统和分布式报警系统。而ceilometer的报警器实现都包含在/ceilometer/alarm中,在这篇帖子中,我们先来 分析/ceilometer/alarm/service.py,来从整体的高度把握报警系统的内容。
在/ceilometer/alarm/service.py中实现了四个类:
类AlarmService:报警服务实现的基类;
类SingletonAlarmService:单例报警系统服务;
类PartitionedAlarmService:分布式报警器系统服务;
类AlarmNotifierService:报警器被触发的通知服务实现;
1 类AlarmService:报警服务实现的基类
这个类是报警系统实现的基类,来看看其中实现的比较重要的方法;
1.1 def_evaluate_assigned_alarms(self)
- def _evaluate_assigned_alarms(self):
- """
- 获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定;
- 1.获取当前部分的所有报警器;
- 2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination),
- 来实现单一报警器模式或者联合报警器模式的评估判定;
- """
- try:
- # 获取所有报警器列表;
- # 对于单例报警器:通过指定客户端通过http协议发送GET请求获取分配给当前部分的报警器;
- alarms = self._assigned_alarms()
- LOG.info(_('initiating evaluation cycle on %d alarms') %
- len(alarms))
-
- # 遍历所有的报警器;
- # 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
- for alarm in alarms:
- self._evaluate_alarm(alarm)
- except Exception:
- LOG.exception(_('alarm evaluation cycle failed'))
复制代码
方法小结:
获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定;
1.获取当前部分的所有报警器;
2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
1.2 def _evaluate_alarm(self, alarm)
- def _evaluate_alarm(self, alarm):
- """
- 根据alarm的type,将alarm分配给对应的evaluator进行处理;
- 单一报警器状态的评估判定;
- 联合报警器状态的评估判定;
- """
- # self.supported_evaluators = [threshold,combination]
- if alarm.type not in self.supported_evaluators:
- LOG.debug(_('skipping alarm %s: type unsupported') %
- alarm.alarm_id)
- return
-
- LOG.debug(_('evaluating alarm %s') % alarm.alarm_id)
-
- # 两种类型报警器ThresholdEvaluator和CombinationEvaluator的evaluate实现;
- # 单一报警器状态的评估判定;
- # 联合报警器状态的评估判定;
- self.evaluators[alarm.type].obj.evaluate(alarm)
复制代码
2 类SingletonAlarmService:单例的报警服务
这个类是单例报警系统服务的启动实现,主要实现了单例报警系统服务的启动操作;
2.1 def start(self)
- def start(self):
- """
- 单例报警器服务SingletonAlarmService的启动操作;
- 按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
- 获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
- """
- super(SingletonAlarmService, self).start()
- if self.evaluators:
- """
- 评估周期60s;
- """
- interval = cfg.CONF.alarm.evaluation_interval
-
- # 按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
- # _evaluate_assigned_alarms:
- # 先获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
- # 针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
- self.tg.add_timer(
- interval,
- self._evaluate_assigned_alarms,
- 0)
- # Add a dummy thread to have wait() working
- self.tg.add_timer(604800, lambda: None)
复制代码
方法小结:
单例报警器服务SingletonAlarmService的启动操作;
按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
2.2 def _assigned_alarms(self)
- def _assigned_alarms(self):
- """
- 通过指定客户端通过http协议发送GET请求获取分配给当前部分的报警器;
- """
- return self._client.alarms.list(q=[{'field': 'enabled',
- 'value': True}])
复制代码
3 类PartitionedAlarmService:分布式报警器系统服务
这个类是分布式报警系统服务的实现,主要实现了分布式报警系统服务的启动和其他操作;
3.1 def start(self)
- def start(self):
- """
- 分布式报警器系统服务分布式报警器系统服务的启动和运行;
- 按照一定的时间间隔周期性的执行以下操作:
- 1.实现广播当前partition的存在性的存在性到所有的partition
- (包括uuid和优先级信息);
- 2.实现定期检测主控权角色;确定当前的partition是否是主控角色;
- 如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
- 情况1:所有报警器都要实现重新分配操作;
- 情况2:只有新建立的报警器需要实现分配操作;
- 3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
- 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
- 单一报警器模式或者联合报警器模式的评估判定;
- """
- # 启动PartitionedAlarmService服务;
- super(PartitionedAlarmService, self).start()
- if self.evaluators:
- """
- 报警评估周期60s;
- """
- eval_interval = cfg.CONF.alarm.evaluation_interval
-
- """
- self.tg = threadgroup.ThreadGroup(1000)
- 按照一定时间间隔实现循环执行方法self.partition_coordinator.report_presence;
- 通过方法fanout_cast实现广播当前partition的存在性的存在性到所有的partition(包括uuid和优先级信息);
- """
- self.tg.add_timer(
- eval_interval / 4, # 15s
- self.partition_coordinator.report_presence,
- 0)
-
- """
- 按照一定时间间隔实现循环执行方法self.partition_coordinator.check_mastership;
- self.partition_coordinator.check_mastership:
- 实现定期检测主控权角色;
- 确定当前的partition是否是主控角色;
- 如果为拥有主控权的partition,则根据不同情况实现不同形式的报警器分配操作;
- """
- self.tg.add_timer(
- eval_interval / 2, # 30s
- self.partition_coordinator.check_mastership,
- eval_interval, # 60s
- # _client:构建或重新使用一个经过验证的API客户端;
- *[eval_interval, self._client])
-
- """
- add_timer:按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
- self._evaluate_assigned_alarms:
- 先获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
- 针对每一个报警器,实现根据报警器类型(threshold和combination),来实现:
- 单一报警器状态的评估判定;
- 联合报警器状态的评估判定;
- """
- self.tg.add_timer(
- eval_interval, # 60s
- # 执行报警器的评估操作;
- self._evaluate_assigned_alarms,
- eval_interval)
-
- # Add a dummy thread to have wait() working
- self.tg.add_timer(604800, lambda: None)
复制代码
方法小结:
分布式报警器系统服务分布式报警器系统服务的启动和运行;
按照一定的时间间隔周期性的执行以下操作:
1.实现广播当前partition的存在性的存在性到所有的partition
(包括uuid和优先级信息);
2.实现定期检测主控权角色;确定当前的partition是否是主控角色;
如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
情况1:所有报警器都要实现重新分配操作;
情况2:只有新建立的报警器需要实现分配操作;
3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
单一报警器模式或者联合报警器模式的评估判定;
3.2 def _assigned_alarms(self)
- def _assigned_alarms(self):
- """
- 通过指定客户端获取分配给当前部分的报警器;
- """
- return self.partition_coordinator.assigned_alarms(self._client)
复制代码
3.3 def assign(self, context, data)
- def assign(self, context, data):
- """
- 接收所收到的报警器ID值,如果uuid和本Partition的uuid相匹配,
- 则接收报警器ID值,则赋值给assignment;
- 用于全部报警器的分配操作;
- """
- self.partition_coordinator.assign(data.get('uuid'),
- data.get('alarms'))
复制代码
3.4 def allocate(self, context, data)
- <span style="font-size:18px;">def allocate(self, context, data):
- """
- 接收所收到的报警器ID值,如果uuid和本Partition的uuid相匹配,
- 则接收报警器ID值,则存储到assignment中;
- 用于部分报警器的分配操作;
- """
- self.partition_coordinator.allocate(data.get('uuid'),
- data.get('alarms'))</span>
复制代码
4 类AlarmNotifierService:报警器被触发的通知服务实现
这个类所描述的是当报警器被触之后,进行一个报警器报警的通知操作的实现,来看看具体的方法;
4.1 def start(self)
- def start(self):
- """
- 服务的启动;
- 为RPC通信建立到信息总线的连接;
- 建立指定类型的消息消费者;
- 启动协程实现等待并消费处理队列中的消息;
- """
- super(AlarmNotifierService, self).start()
- # Add a dummy thread to have wait() working
- self.tg.add_timer(604800, lambda: None)
复制代码
4.2 def _handle_action(self, action, alarm_id, previous, current, reason, reason_data)
- def _handle_action(self, action, alarm_id, previous,
- current, reason, reason_data):
- """
- 获取系统所采用的消息通信方式;
- 通过HTTPS协议POST方法实现发送相关报警器被触发的通知(or)通过日志记录相关报警器被触发的通知;
-
-
- 采用两种方式之一实现相关报警器的通知:
- 通过HTTPS协议POST方法实现发送相关报警器的通知;
- 通过日志记录相关报警器的通知;
-
- # action:要通知的action的URL;
- # alarm_id:被触发的报警器的ID;
- # previous:报警器(触发)之前的状态;
- # current:报警器被触发后转换到的新状态;
- # reason:改变报警器状态的原因;
- """
- try:
- action = network_utils.urlsplit(action)
- except Exception:
- LOG.error(
- _("Unable to parse action %(action)s for alarm %(alarm_id)s"),
- {'action': action, 'alarm_id': alarm_id})
- return
-
- """
- 获取系统所采用的消息通信方式;
- ceilometer.alarm.notifier =
- log = ceilometer.alarm.notifier.log:LogAlarmNotifier
- test = ceilometer.alarm.notifier.test:TestAlarmNotifier
- http = ceilometer.alarm.notifier.rest:RestAlarmNotifier
- https = ceilometer.alarm.notifier.rest:RestAlarmNotifier
- """
- try:
- notifier = self.notifiers[action.scheme].obj
- except KeyError:
- scheme = action.scheme
- LOG.error(
- _("Action %(scheme)s for alarm %(alarm_id)s is unknown, "
- "cannot notify"),
- {'scheme': scheme, 'alarm_id': alarm_id})
- return
-
- """
- 通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),或者是通过日志记录相关报警器的通知;
- """
- try:
- LOG.debug(_("Notifying alarm %(id)s with action %(act)s") % (
- {'id': alarm_id, 'act': action}))
- notifier.notify(action, alarm_id, previous,
- current, reason, reason_data)
- except Exception:
- LOG.exception(_("Unable to notify alarm %s"), alarm_id)
- return
复制代码
方法小结:
获取系统所采用的消息通信方式;
通过HTTPS协议POST方法实现发送相关报警器被触发的通知(or)通过日志记录相关报警器被触发的通知;
4.3 def notify_alarm(self, context, data)
- def notify_alarm(self, context, data):
- """
- 遍历所有要通知报警器被触发的URL,针对每个要通知的URL地址,实现:
- 获取系统所采用的消息通信方式;
- 通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),(or)通过日志记录相关报警器被触发的通知;
-
- 通知内容中包括:
- alarm_id:被触发的报警器的ID;
- previous:报警器(触发)之前的状态;
- current:报警器被触发后转换到的新状态;
- reason:改变报警器状态的原因;
- """
- actions = data.get('actions')
- if not actions:def notify_alarm(self, context, data):
- """
- 遍历所有要通知报警器被触发的URL,针对每个要通知的URL地址,实现:
- 获取系统所采用的消息通信方式;
- 通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),(or)通过日志记录相关报警器被触发的通知;
-
- 通知内容中包括:
- alarm_id:被触发的报警器的ID;
- previous:报警器(触发)之前的状态;
- current:报警器被触发后转换到的新状态;
- reason:改变报警器状态的原因;
- """
- actions = data.get('actions')
- if not actions:
- LOG.error(_("Unable to notify for an alarm with no action"))
- return
-
- for action in actions:
- """
- 获取系统所采用的消息通信方式;
- 通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),(or)通过日志记录相关报警器被触发的通知;
- """
- self._handle_action(action,
- data.get('alarm_id'),
- data.get('previous'),
- data.get('current'),
- data.get('reason'),
- data.get('reason_data'))
复制代码
方法小结:
遍历所有要通知报警器被触发的URL,针对每个要通知的URL地址,实现:
获取系统所采用的消息通信方式;
通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),(or)通过日志记录相关报警器被触发的通知;
Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动
|