本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:43 编辑
问题导读
问题1:类SingletonAlarmService的初始化操作完成了哪些内容?
问题2:PartitionedAlarmService类初始化操作完成了哪些内容?
ceilometer-alarm-evaluator服务的初始化和启动
本篇博客将解析服务组件ceilometer-alarm-evaluator的初始化和启动操作,这个服务组件即ceilometer报警服务;来看方法/ceilometer/cli.py----def alarm_evaluator,这个方法即实现了ceilometer-alarm-evaluator服务的初始化和启动操作。
- def alarm_evaluator():
- """
- 加载并启动SingletonAlarmService服务(单例报警服务);
- 报警服务系统:SingletonAlarmService和PartitionedAlarmService;
- 默认报警服务:ceilometer.alarm.service.SingletonAlarmService;
- 如果要应用分布式报警系统,则需要在这里修改配置文件中的参数;
- """
- service.prepare_service()
-
- """
- launch:加载并启动SingletonAlarmService服务,最终调用了服务的start方法实现服务的启动;
- 配置文件定义了默认报警服务:ceilometer.alarm.service.SingletonAlarmService
- SingletonAlarmService:单例的报警服务;
- """
- eval_service = importutils.import_object(cfg.CONF.alarm.evaluation_service)
- os_service.launch(eval_service).wait(<span style="font-size:18px;"><span style="font-family:KaiTi_GB2312;">)</span></span>
复制代码
方法小结:
1.报警服务系统:
SingletonAlarmService和PartitionedAlarmService;
2.默认报警服务:
ceilometer.alarm.service.SingletonAlarmService;
3.如果要应用分布式报警系统,则需要在这里修改配置文件中的参数;
1 单例报警服务SingletonAlarmService的初始化和启动操作
1.1 SingletonAlarmService类初始化操作
类SingletonAlarmService的初始化操作主要完成了两部分内容:
* 加载命名空间ceilometer.alarm.evaluator中的所有插件;
ceilometer.alarm.evaluator =
threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
即描述了报警器状态的评估判定的两种模式:联合报警器状态评估和单一报警器状态评估;
* 建立线程池,用于后续报警器服务中若干操作的运行;
class SingletonAlarmService----def __init__
- class SingletonAlarmService(AlarmService, os_service.Service):
- def __init__(self):
- super(SingletonAlarmService, self).__init__()
- """
- _load_evaluators:
- 这里得到的就是加载命名空间ceilometer.alarm.evaluator中的所有插件;
- namespace = ceilometer.alarm.evaluator
- ceilometer.alarm.evaluator =
- threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
- combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
- """
- self._load_evaluators()
- self.api_client = None
复制代码
class Service----def __init__
- class Service(object):
- def __init__(self, threads=1000):
- self.tg = threadgroup.ThreadGroup(threads)
- # signal that the service is done shutting itself down:
- self._done = event.Event()
复制代码
class AlarmService----def _load_evaluators
- class AlarmService(object):
- EXTENSIONS_NAMESPACE = "ceilometer.alarm.evaluator"
-
- def _load_evaluators(self):
- """
- 这里得到的就是加载命名空间ceilometer.alarm.evaluator中的所有插件;
- namespace = ceilometer.alarm.evaluator
- ceilometer.alarm.evaluator =
- threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
- combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
- """
- self.evaluators = extension.ExtensionManager(
- namespace=self.EXTENSIONS_NAMESPACE,
- invoke_on_load=True,
- invoke_args=(rpc_alarm.RPCAlarmNotifier(),)
- )
- # 这里得到的就是上面所加载的命名空间ceilometer.alarm.evaluator中的所有插件;
- self.supported_evaluators = [ext.name for ext in
- self.evaluators.extensions]
复制代码
1.2 SingletonAlarmService类启动操作
类的启动操作实现了单例报警器服务SingletonAlarmService的启动操作;
按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms,方法self._evaluate_assigned_alarms实现获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
class SingletonAlarmService----def start
- def start(self):
- """
- 单例报警器服务SingletonAlarmService的启动操作;
- 按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
- 获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
- """
- super(SingletonAlarmService, self).start()
- if self.evaluators:
- """
- 评估周期60s;
- """
- interval = cfg.CONF.alarm.evaluation_interval
-
- # add_timer:按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
- # _evaluate_assigned_alarms:
- # 先获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
- # 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
- # 单一报警器状态的评估判定;
- # 联合报警器状态的评估判定;
- self.tg.add_timer(
- interval,
- self._evaluate_assigned_alarms,
- 0)
- # Add a dummy thread to have wait() working
- self.tg.add_timer(604800, lambda: None)
复制代码
class AlarmService----def _evaluate_assigned_alarms
这个方法实现获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定;
1.获取当前部分的所有报警器;
2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
- def _evaluate_assigned_alarms(self):
- """
- 获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定;
- 1.获取当前部分的所有报警器;
- 2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination),
- 来实现单一报警器模式或者联合报警器模式的评估判定;
- """
- try:
- # 获取所有报警器列表;
- # 对于单例报警器:通过指定客户端通过http协议发送GET请求获取分配给当前部分的报警器;
- alarms = self._assigned_alarms()
- LOG.info(_('initiating evaluation cycle on %d alarms') %
- len(alarms))
-
- # 遍历所有的报警器;
- # 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
- # 单一报警器状态的评估判定;
- # 联合报警器状态的评估判定;
- for alarm in alarms:
- self._evaluate_alarm(alarm)
- except Exception:
- LOG.exception(_('alarm evaluation cycle failed'))
复制代码
2 分布式报警服务PartitionedAlarmService的初始化和启动操作
2.1 PartitionedAlarmService类初始化操作
PartitionedAlarmService类初始化操作和SingletonAlarmService类初始化操作内容大致是相同的,同样主要完成了两部分内容:
* 加载命名空间ceilometer.alarm.evaluator中的所有插件;
ceilometer.alarm.evaluator =
threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
即描述了报警器状态的评估判定的两种模式:联合报警器状态评估和单一报警器状态评估;
* 建立线程池,用于后续报警器服务中若干操作的运行;
* 初始化分布式报警协议实现类PartitionCoordinator;
class PartitionedAlarmService----def __init__
- class PartitionedAlarmService(AlarmService, rpc_service.Service):
- """
- 分布式报警器系统服务;
- """
- def __init__(self):
- super(PartitionedAlarmService, self).__init__(
- cfg.CONF.host,
- cfg.CONF.alarm.partition_rpc_topic,
- self
- )
- """
- 加载命名空间ceilometer.alarm.evaluator中的所有插件;
- namespace = ceilometer.alarm.evaluator
- ceilometer.alarm.evaluator =
- threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
- combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
- """
- self._load_evaluators()
- self.api_client = None
- self.partition_coordinator = coordination.PartitionCoordinator()
复制代码
class Service----def __init__
- class Service(object):
- def __init__(self, threads=1000):
- self.tg = threadgroup.ThreadGroup(threads)
- # signal that the service is done shutting itself down:
- self._done = event.Event()
复制代码
class AlarmService----def _load_evaluators
- class AlarmService(object):
- EXTENSIONS_NAMESPACE = "ceilometer.alarm.evaluator"
-
- def _load_evaluators(self):
- """
- 这里得到的就是加载命名空间ceilometer.alarm.evaluator中的所有插件;
- namespace = ceilometer.alarm.evaluator
- ceilometer.alarm.evaluator =
- threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
- combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
- """
- self.evaluators = extension.ExtensionManager(
- namespace=self.EXTENSIONS_NAMESPACE,
- invoke_on_load=True,
- invoke_args=(rpc_alarm.RPCAlarmNotifier(),)
- )
- # 这里得到的就是上面所加载的命名空间ceilometer.alarm.evaluator中的所有插件;
- self.supported_evaluators = [ext.name for ext in
- self.evaluators.extensions]
复制代码
2.2 PartitionedAlarmService类启动操作
分布式报警器系统服务分布式报警器系统服务的启动和运行,按照一定的时间间隔周期性的执行以下操作:
1.实现广播当前partition的存在性的存在性到所有的partition(包括uuid和优先级信息);
2.实现定期检测主控权角色;确定当前的partition是否是主控角色;
如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
情况1:所有报警器都要实现重新分配操作;
情况2:只有新建立的报警器需要实现分配操作;
3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
单一报警器模式或者联合报警器模式的评估判定;
class PartitionedAlarmService----def start
- def start(self):
- """
- 分布式报警器系统服务分布式报警器系统服务的启动和运行;
- 按照一定的时间间隔周期性的执行以下操作:
- 1.实现广播当前partition的存在性的存在性到所有的partition
- (包括uuid和优先级信息);
- 2.实现定期检测主控权角色;确定当前的partition是否是主控角色;
- 如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
- 情况1:所有报警器都要实现重新分配操作;
- 情况2:只有新建立的报警器需要实现分配操作;
- 3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
- 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
- 单一报警器模式或者联合报警器模式的评估判定;
- """
- # 启动PartitionedAlarmService服务;
- super(PartitionedAlarmService, self).start()
- if self.evaluators:
- """
- 报警评估周期60s;
- """
- eval_interval = cfg.CONF.alarm.evaluation_interval
-
- """
- self.tg = threadgroup.ThreadGroup(1000)
- 按照一定时间间隔实现循环执行方法self.partition_coordinator.report_presence;
- 通过方法fanout_cast实现广播当前partition的存在性的存在性到所有的partition(包括uuid和优先级信息);
- """
- self.tg.add_timer(
- eval_interval / 4, # 15s
- self.partition_coordinator.report_presence,
- 0)
-
- """
- 按照一定时间间隔实现循环执行方法self.partition_coordinator.check_mastership;
- self.partition_coordinator.check_mastership:
- 实现定期检测主控权角色;
- 确定当前的partition是否是主控角色;
- 如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
- """
- self.tg.add_timer(
- eval_interval / 2, # 30s
- self.partition_coordinator.check_mastership,
- eval_interval, # 60s
- # _client:构建或重新使用一个经过验证的API客户端;
- *[eval_interval, self._client])
-
- """
- add_timer:按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
- self._evaluate_assigned_alarms:
- 先获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
- 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
- 单一报警器状态的评估判定;
- 联合报警器状态的评估判定;
- """
- self.tg.add_timer(
- eval_interval, # 60s
- # 执行报警器的评估操作;
- self._evaluate_assigned_alarms,
- eval_interval)
-
- # Add a dummy thread to have wait() working
- self.tg.add_timer(604800, lambda: None)
复制代码
class Service(service.Service)----def start
因为这里实现的是分布式报警系统,涉及到节点服务间的消息通信,所以这里应用RPC消息队列实现服务间的通信,进行相关的初始化操作;
- class Service(service.Service):
- def start(self):
- """
- 为RPC通信建立到信息总线的连接;
- 1.建立指定类型的消息消费者;
- 2.执行方法initialize_service_hook;
- 3.启动协程实现等待并消费处理队列中的消息;
- """
- super(Service, self).start()
-
- """
- 为RPC通信建立到信息总线的连接;
- 建立一个新的连接,或者从连接池中获取一个;
- """
- self.conn = rpc.create_connection(new=True)
- LOG.debug(_("Creating Consumer connection for Service %s") %
- self.topic)
-
- """
- RpcDispatcher:RPC消息调度类;
- """
- dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
- self.serializer)
-
- # Share this same connection for these Consumers
- """
- create_consumer:建立指定类型的消息消费者(fanout or topic);
- 1.创建以服务的topic为路由键的消费者;
- 2.创建以服务的topic和本机名为路由键的消费者
- (基于topic&host,可用来接收定向消息);
- 3.fanout直接投递消息,不进行匹配,速度最快
- (fanout类型,可用于接收广播消息);
- """
- self.conn.create_consumer(self.topic, dispatcher, fanout=False)
- node_topic = '%s.%s' % (self.topic, self.host)
- self.conn.create_consumer(node_topic, dispatcher, fanout=False)
- self.conn.create_consumer(self.topic, dispatcher, fanout=True)
-
- # Hook to allow the manager to do other initializations after
- # the rpc connection is created.
- """
- 在消息消费进程启动前,必须先声明消费者;
- 建立一个'topic'类型的消息消费者;
- 根据消费者类(TopicConsumer)和消息队列名称
- (pool_name: ceilometer.collector.metering)
- 以及指定主题topic(metering)建立消息消费者,并加入消费者列表;
- """
- if callable(getattr(self.manager, 'initialize_service_hook', None)):
- self.manager.initialize_service_hook(self)
-
- """
- 启动消费者线程;
- consume_in_thread用evelent.spawn创建一个协程一直运行;
- 等待消息,在有消费到来时会创建新的协程运行远程调用的函数;
- 启动协程实现等待并消费处理队列中的消息;
- """
- self.conn.consume_in_thread()
复制代码
class PartitionedAlarmService----def initialize_service_hook
- class PartitionedAlarmService(AlarmService, rpc_service.Service):
- def initialize_service_hook(self, service):
- LOG.debug(_('initialize_service_hooks'))
-
- # create_worker:建立一个'topic'类型的消息消费者;
- # 指定主题topic(alarm_partition_coordination)和队列名称pool_name(ceilometer.alarm.alarm_partition_coordination);
- self.conn.create_worker(
- # alarm_partition_coordination
- cfg.CONF.alarm.partition_rpc_topic,
- rpc_dispatcher.RpcDispatcher([self]),
- # ceilometer.alarm.alarm_partition_coordination
- 'ceilometer.alarm.' + cfg.CONF.alarm.partition_rpc_topic,
- )
复制代码
至此,ceilometer-alarm-evaluator服务的初始化和启动操作分析完成。
附录:python的多重父类继承
在上面的分析中,我们看到在服务初始化和启动的过程中,若干方法都是多重父类继承,这里需要注意的是父类方法的搜索顺序;实际上python经典类的父类方法搜索顺序是深度优先,而python新式类的父类方法搜索顺序是广度优先;
我写了一个小小的测试程序,来看新式类的父类方法搜索顺序:
- class A(object):
- def start(self):
- print 'A-start'
-
- class B0(A):
- def start(self):
- super(B0,self).start()
- print 'B0-start'
-
- class B1(A):
- def start(self):
- super(B1,self).start()
- print 'B1-start'
-
- class C(B0,B1):
- def start(self):
- super(C,self).start()
- print 'C-start'
-
- TEST = C()
- TEST.start()
-
- 输出为:
- A-start
- B1-start
- B0-start
- C-start
复制代码
Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动
|