Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:43 编辑问题导读
问题1:类SingletonAlarmService的初始化操作完成了哪些内容?
问题2:PartitionedAlarmService类初始化操作完成了哪些内容?
static/image/hrline/4.gif
ceilometer-alarm-evaluator服务的初始化和启动
本篇博客将解析服务组件ceilometer-alarm-evaluator的初始化和启动操作,这个服务组件即ceilometer报警服务;来看方法/ceilometer/cli.py----def alarm_evaluator,这个方法即实现了ceilometer-alarm-evaluator服务的初始化和启动操作。
def alarm_evaluator():
"""
加载并启动SingletonAlarmService服务(单例报警服务);
报警服务系统:SingletonAlarmService和PartitionedAlarmService;
默认报警服务:ceilometer.alarm.service.SingletonAlarmService;
如果要应用分布式报警系统,则需要在这里修改配置文件中的参数;
"""
service.prepare_service()
"""
launch:加载并启动SingletonAlarmService服务,最终调用了服务的start方法实现服务的启动;
配置文件定义了默认报警服务:ceilometer.alarm.service.SingletonAlarmService
SingletonAlarmService:单例的报警服务;
"""
eval_service = importutils.import_object(cfg.CONF.alarm.evaluation_service)
os_service.launch(eval_service).wait(<span style="font-size:18px;"><span style="font-family:KaiTi_GB2312;">)</span></span>
方法小结:
1.报警服务系统:
SingletonAlarmService和PartitionedAlarmService;
2.默认报警服务:
ceilometer.alarm.service.SingletonAlarmService;
3.如果要应用分布式报警系统,则需要在这里修改配置文件中的参数;
1 单例报警服务SingletonAlarmService的初始化和启动操作
1.1 SingletonAlarmService类初始化操作
类SingletonAlarmService的初始化操作主要完成了两部分内容:
* 加载命名空间ceilometer.alarm.evaluator中的所有插件;
ceilometer.alarm.evaluator =
threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
即描述了报警器状态的评估判定的两种模式:联合报警器状态评估和单一报警器状态评估;
* 建立线程池,用于后续报警器服务中若干操作的运行;
class SingletonAlarmService----def __init__
class SingletonAlarmService(AlarmService, os_service.Service):
def __init__(self):
super(SingletonAlarmService, self).__init__()
"""
_load_evaluators:
这里得到的就是加载命名空间ceilometer.alarm.evaluator中的所有插件;
namespace = ceilometer.alarm.evaluator
ceilometer.alarm.evaluator =
threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
"""
self._load_evaluators()
self.api_client = None
class Service----def __init__
class Service(object):
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads)
# signal that the service is done shutting itself down:
self._done = event.Event()
class AlarmService----def _load_evaluators
class AlarmService(object):
EXTENSIONS_NAMESPACE = "ceilometer.alarm.evaluator"
def _load_evaluators(self):
"""
这里得到的就是加载命名空间ceilometer.alarm.evaluator中的所有插件;
namespace = ceilometer.alarm.evaluator
ceilometer.alarm.evaluator =
threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
"""
self.evaluators = extension.ExtensionManager(
namespace=self.EXTENSIONS_NAMESPACE,
invoke_on_load=True,
invoke_args=(rpc_alarm.RPCAlarmNotifier(),)
)
# 这里得到的就是上面所加载的命名空间ceilometer.alarm.evaluator中的所有插件;
self.supported_evaluators = [ext.name for ext in
self.evaluators.extensions]
1.2 SingletonAlarmService类启动操作
类的启动操作实现了单例报警器服务SingletonAlarmService的启动操作;
按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms,方法self._evaluate_assigned_alarms实现获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
class SingletonAlarmService----def start
def start(self):
"""
单例报警器服务SingletonAlarmService的启动操作;
按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
"""
super(SingletonAlarmService, self).start()
if self.evaluators:
"""
评估周期60s;
"""
interval = cfg.CONF.alarm.evaluation_interval
# add_timer:按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
# _evaluate_assigned_alarms:
# 先获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
# 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
# 单一报警器状态的评估判定;
# 联合报警器状态的评估判定;
self.tg.add_timer(
interval,
self._evaluate_assigned_alarms,
0)
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
class AlarmService----def _evaluate_assigned_alarms
这个方法实现获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定;
1.获取当前部分的所有报警器;
2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
def _evaluate_assigned_alarms(self):
"""
获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定;
1.获取当前部分的所有报警器;
2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination),
来实现单一报警器模式或者联合报警器模式的评估判定;
"""
try:
# 获取所有报警器列表;
# 对于单例报警器:通过指定客户端通过http协议发送GET请求获取分配给当前部分的报警器;
alarms = self._assigned_alarms()
LOG.info(_('initiating evaluation cycle on %d alarms') %
len(alarms))
# 遍历所有的报警器;
# 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
# 单一报警器状态的评估判定;
# 联合报警器状态的评估判定;
for alarm in alarms:
self._evaluate_alarm(alarm)
except Exception:
LOG.exception(_('alarm evaluation cycle failed'))
2 分布式报警服务PartitionedAlarmService的初始化和启动操作
2.1 PartitionedAlarmService类初始化操作
PartitionedAlarmService类初始化操作和SingletonAlarmService类初始化操作内容大致是相同的,同样主要完成了两部分内容:
* 加载命名空间ceilometer.alarm.evaluator中的所有插件;
ceilometer.alarm.evaluator =
threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
即描述了报警器状态的评估判定的两种模式:联合报警器状态评估和单一报警器状态评估;
* 建立线程池,用于后续报警器服务中若干操作的运行;
* 初始化分布式报警协议实现类PartitionCoordinator;
class PartitionedAlarmService----def __init__
class PartitionedAlarmService(AlarmService, rpc_service.Service):
"""
分布式报警器系统服务;
"""
def __init__(self):
super(PartitionedAlarmService, self).__init__(
cfg.CONF.host,
cfg.CONF.alarm.partition_rpc_topic,
self
)
"""
加载命名空间ceilometer.alarm.evaluator中的所有插件;
namespace = ceilometer.alarm.evaluator
ceilometer.alarm.evaluator =
threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
"""
self._load_evaluators()
self.api_client = None
self.partition_coordinator = coordination.PartitionCoordinator()
class Service----def __init__
class Service(object):
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads)
# signal that the service is done shutting itself down:
self._done = event.Event()
class AlarmService----def _load_evaluators
class AlarmService(object):
EXTENSIONS_NAMESPACE = "ceilometer.alarm.evaluator"
def _load_evaluators(self):
"""
这里得到的就是加载命名空间ceilometer.alarm.evaluator中的所有插件;
namespace = ceilometer.alarm.evaluator
ceilometer.alarm.evaluator =
threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
"""
self.evaluators = extension.ExtensionManager(
namespace=self.EXTENSIONS_NAMESPACE,
invoke_on_load=True,
invoke_args=(rpc_alarm.RPCAlarmNotifier(),)
)
# 这里得到的就是上面所加载的命名空间ceilometer.alarm.evaluator中的所有插件;
self.supported_evaluators = [ext.name for ext in
self.evaluators.extensions]
2.2 PartitionedAlarmService类启动操作
分布式报警器系统服务分布式报警器系统服务的启动和运行,按照一定的时间间隔周期性的执行以下操作:
1.实现广播当前partition的存在性的存在性到所有的partition(包括uuid和优先级信息);
2.实现定期检测主控权角色;确定当前的partition是否是主控角色;
如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
情况1:所有报警器都要实现重新分配操作;
情况2:只有新建立的报警器需要实现分配操作;
3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
单一报警器模式或者联合报警器模式的评估判定;
class PartitionedAlarmService----def start
def start(self):
"""
分布式报警器系统服务分布式报警器系统服务的启动和运行;
按照一定的时间间隔周期性的执行以下操作:
1.实现广播当前partition的存在性的存在性到所有的partition
(包括uuid和优先级信息);
2.实现定期检测主控权角色;确定当前的partition是否是主控角色;
如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
情况1:所有报警器都要实现重新分配操作;
情况2:只有新建立的报警器需要实现分配操作;
3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
单一报警器模式或者联合报警器模式的评估判定;
"""
# 启动PartitionedAlarmService服务;
super(PartitionedAlarmService, self).start()
if self.evaluators:
"""
报警评估周期60s;
"""
eval_interval = cfg.CONF.alarm.evaluation_interval
"""
self.tg = threadgroup.ThreadGroup(1000)
按照一定时间间隔实现循环执行方法self.partition_coordinator.report_presence;
通过方法fanout_cast实现广播当前partition的存在性的存在性到所有的partition(包括uuid和优先级信息);
"""
self.tg.add_timer(
eval_interval / 4, # 15s
self.partition_coordinator.report_presence,
0)
"""
按照一定时间间隔实现循环执行方法self.partition_coordinator.check_mastership;
self.partition_coordinator.check_mastership:
实现定期检测主控权角色;
确定当前的partition是否是主控角色;
如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
"""
self.tg.add_timer(
eval_interval / 2, # 30s
self.partition_coordinator.check_mastership,
eval_interval, # 60s
# _client:构建或重新使用一个经过验证的API客户端;
*)
"""
add_timer:按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
self._evaluate_assigned_alarms:
先获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
单一报警器状态的评估判定;
联合报警器状态的评估判定;
"""
self.tg.add_timer(
eval_interval, # 60s
# 执行报警器的评估操作;
self._evaluate_assigned_alarms,
eval_interval)
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
class Service(service.Service)----def start
因为这里实现的是分布式报警系统,涉及到节点服务间的消息通信,所以这里应用RPC消息队列实现服务间的通信,进行相关的初始化操作;
class Service(service.Service):
def start(self):
"""
为RPC通信建立到信息总线的连接;
1.建立指定类型的消息消费者;
2.执行方法initialize_service_hook;
3.启动协程实现等待并消费处理队列中的消息;
"""
super(Service, self).start()
"""
为RPC通信建立到信息总线的连接;
建立一个新的连接,或者从连接池中获取一个;
"""
self.conn = rpc.create_connection(new=True)
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
"""
RpcDispatcher:RPC消息调度类;
"""
dispatcher = rpc_dispatcher.RpcDispatcher(,
self.serializer)
# Share this same connection for these Consumers
"""
create_consumer:建立指定类型的消息消费者(fanout or topic);
1.创建以服务的topic为路由键的消费者;
2.创建以服务的topic和本机名为路由键的消费者
(基于topic&host,可用来接收定向消息);
3.fanout直接投递消息,不进行匹配,速度最快
(fanout类型,可用于接收广播消息);
"""
self.conn.create_consumer(self.topic, dispatcher, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, dispatcher, fanout=False)
self.conn.create_consumer(self.topic, dispatcher, fanout=True)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.
"""
在消息消费进程启动前,必须先声明消费者;
建立一个'topic'类型的消息消费者;
根据消费者类(TopicConsumer)和消息队列名称
(pool_name:ceilometer.collector.metering)
以及指定主题topic(metering)建立消息消费者,并加入消费者列表;
"""
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
"""
启动消费者线程;
consume_in_thread用evelent.spawn创建一个协程一直运行;
等待消息,在有消费到来时会创建新的协程运行远程调用的函数;
启动协程实现等待并消费处理队列中的消息;
"""
self.conn.consume_in_thread()
class PartitionedAlarmService----def initialize_service_hook
class PartitionedAlarmService(AlarmService, rpc_service.Service):
def initialize_service_hook(self, service):
LOG.debug(_('initialize_service_hooks'))
# create_worker:建立一个'topic'类型的消息消费者;
# 指定主题topic(alarm_partition_coordination)和队列名称pool_name(ceilometer.alarm.alarm_partition_coordination);
self.conn.create_worker(
# alarm_partition_coordination
cfg.CONF.alarm.partition_rpc_topic,
rpc_dispatcher.RpcDispatcher(),
# ceilometer.alarm.alarm_partition_coordination
'ceilometer.alarm.' + cfg.CONF.alarm.partition_rpc_topic,
)至此,ceilometer-alarm-evaluator服务的初始化和启动操作分析完成。
附录:python的多重父类继承
在上面的分析中,我们看到在服务初始化和启动的过程中,若干方法都是多重父类继承,这里需要注意的是父类方法的搜索顺序;实际上python经典类的父类方法搜索顺序是深度优先,而python新式类的父类方法搜索顺序是广度优先;
我写了一个小小的测试程序,来看新式类的父类方法搜索顺序:
class A(object):
def start(self):
print 'A-start'
class B0(A):
def start(self):
super(B0,self).start()
print 'B0-start'
class B1(A):
def start(self):
super(B1,self).start()
print 'B1-start'
class C(B0,B1):
def start(self):
super(C,self).start()
print 'C-start'
TEST = C()
TEST.start()
输出为:
A-start
B1-start
B0-start
C-start
Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动
页:
[1]