坎蒂丝_Swan 发表于 2014-12-13 19:06:31

Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动

本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:43 编辑


问题导读
问题1:类SingletonAlarmService的初始化操作完成了哪些内容?
问题2:PartitionedAlarmService类初始化操作完成了哪些内容?



static/image/hrline/4.gif





ceilometer-alarm-evaluator服务的初始化和启动



    本篇博客将解析服务组件ceilometer-alarm-evaluator的初始化和启动操作,这个服务组件即ceilometer报警服务;来看方法/ceilometer/cli.py----def alarm_evaluator,这个方法即实现了ceilometer-alarm-evaluator服务的初始化和启动操作。


def alarm_evaluator():
    """
    加载并启动SingletonAlarmService服务(单例报警服务);
    报警服务系统:SingletonAlarmService和PartitionedAlarmService;
    默认报警服务:ceilometer.alarm.service.SingletonAlarmService;
    如果要应用分布式报警系统,则需要在这里修改配置文件中的参数;
    """
    service.prepare_service()
      
    """
    launch:加载并启动SingletonAlarmService服务,最终调用了服务的start方法实现服务的启动;
    配置文件定义了默认报警服务:ceilometer.alarm.service.SingletonAlarmService
    SingletonAlarmService:单例的报警服务;
    """
    eval_service = importutils.import_object(cfg.CONF.alarm.evaluation_service)
    os_service.launch(eval_service).wait(<span style="font-size:18px;"><span style="font-family:KaiTi_GB2312;">)</span></span>
方法小结:
    1.报警服务系统:
      SingletonAlarmService和PartitionedAlarmService;


    2.默认报警服务:
      ceilometer.alarm.service.SingletonAlarmService;


    3.如果要应用分布式报警系统,则需要在这里修改配置文件中的参数;


1 单例报警服务SingletonAlarmService的初始化和启动操作


1.1 SingletonAlarmService类初始化操作


    类SingletonAlarmService的初始化操作主要完成了两部分内容:

    * 加载命名空间ceilometer.alarm.evaluator中的所有插件;
      ceilometer.alarm.evaluator =
          threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
          combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
      即描述了报警器状态的评估判定的两种模式:联合报警器状态评估和单一报警器状态评估;

* 建立线程池,用于后续报警器服务中若干操作的运行;

class SingletonAlarmService----def __init__
class SingletonAlarmService(AlarmService, os_service.Service):
    def __init__(self):
      super(SingletonAlarmService, self).__init__()
      """
      _load_evaluators:
      这里得到的就是加载命名空间ceilometer.alarm.evaluator中的所有插件;
      namespace = ceilometer.alarm.evaluator
            ceilometer.alarm.evaluator =
            threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
            combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
      """
      self._load_evaluators()
      self.api_client = None
class Service----def __init__

class Service(object):
    def __init__(self, threads=1000):
      self.tg = threadgroup.ThreadGroup(threads)
      # signal that the service is done shutting itself down:
      self._done = event.Event()
class AlarmService----def _load_evaluators
class AlarmService(object):
    EXTENSIONS_NAMESPACE = "ceilometer.alarm.evaluator"

    def _load_evaluators(self):
      """
      这里得到的就是加载命名空间ceilometer.alarm.evaluator中的所有插件;
      namespace = ceilometer.alarm.evaluator
      ceilometer.alarm.evaluator =
      threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
      combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
      """
      self.evaluators = extension.ExtensionManager(
      namespace=self.EXTENSIONS_NAMESPACE,
      invoke_on_load=True,
      invoke_args=(rpc_alarm.RPCAlarmNotifier(),)
      )
      # 这里得到的就是上面所加载的命名空间ceilometer.alarm.evaluator中的所有插件;
      self.supported_evaluators = [ext.name for ext in
                                     self.evaluators.extensions]
1.2 SingletonAlarmService类启动操作


    类的启动操作实现了单例报警器服务SingletonAlarmService的启动操作;


    按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms,方法self._evaluate_assigned_alarms实现获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;

class SingletonAlarmService----def start

def start(self):
    """
    单例报警器服务SingletonAlarmService的启动操作;
    按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
    获取alarm集合,针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;
    """
    super(SingletonAlarmService, self).start()
    if self.evaluators:
      """
      评估周期60s;
      """
      interval = cfg.CONF.alarm.evaluation_interval
            
      # add_timer:按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
      # _evaluate_assigned_alarms:
      # 先获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
      # 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
      # 单一报警器状态的评估判定;
      # 联合报警器状态的评估判定;
      self.tg.add_timer(
            interval,
            self._evaluate_assigned_alarms,
            0)
    # Add a dummy thread to have wait() working
    self.tg.add_timer(604800, lambda: None)
class AlarmService----def _evaluate_assigned_alarms

    这个方法实现获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定;


    1.获取当前部分的所有报警器;


    2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination),来实现单一报警器模式或者联合报警器模式的评估判定;


def _evaluate_assigned_alarms(self):
    """
    获取当前部分的所有报警器,对每一个报警器进行报警触发的评估判定;
    1.获取当前部分的所有报警器;
    2.针对每一个报警器,实现根据报警器模式的类型(threshold和combination),
      来实现单一报警器模式或者联合报警器模式的评估判定;
    """
    try:
      # 获取所有报警器列表;
      # 对于单例报警器:通过指定客户端通过http协议发送GET请求获取分配给当前部分的报警器;
      alarms = self._assigned_alarms()
      LOG.info(_('initiating evaluation cycle on %d alarms') %
               len(alarms))
            
      # 遍历所有的报警器;
      # 针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
      # 单一报警器状态的评估判定;
      # 联合报警器状态的评估判定;
      for alarm in alarms:
            self._evaluate_alarm(alarm)
    except Exception:
      LOG.exception(_('alarm evaluation cycle failed'))

2 分布式报警服务PartitionedAlarmService的初始化和启动操作

2.1 PartitionedAlarmService类初始化操作

    PartitionedAlarmService类初始化操作和SingletonAlarmService类初始化操作内容大致是相同的,同样主要完成了两部分内容:

    * 加载命名空间ceilometer.alarm.evaluator中的所有插件;
      ceilometer.alarm.evaluator =
          threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
          combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
      即描述了报警器状态的评估判定的两种模式:联合报警器状态评估和单一报警器状态评估;

    * 建立线程池,用于后续报警器服务中若干操作的运行;

    * 初始化分布式报警协议实现类PartitionCoordinator;

class PartitionedAlarmService----def __init__

class PartitionedAlarmService(AlarmService, rpc_service.Service):
    """
    分布式报警器系统服务;
    """
    def __init__(self):
      super(PartitionedAlarmService, self).__init__(
            cfg.CONF.host,
            cfg.CONF.alarm.partition_rpc_topic,
            self
      )
      """
      加载命名空间ceilometer.alarm.evaluator中的所有插件;
      namespace = ceilometer.alarm.evaluator
            ceilometer.alarm.evaluator =
            threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
            combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
      """
      self._load_evaluators()
      self.api_client = None
      self.partition_coordinator = coordination.PartitionCoordinator()
class Service----def __init__
class Service(object):
    def __init__(self, threads=1000):
      self.tg = threadgroup.ThreadGroup(threads)
      # signal that the service is done shutting itself down:
      self._done = event.Event()
class AlarmService----def _load_evaluators
class AlarmService(object):
    EXTENSIONS_NAMESPACE = "ceilometer.alarm.evaluator"

    def _load_evaluators(self):
      """
      这里得到的就是加载命名空间ceilometer.alarm.evaluator中的所有插件;
      namespace = ceilometer.alarm.evaluator
      ceilometer.alarm.evaluator =
      threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
      combination = ceilometer.alarm.evaluator.combination:CombinationEvaluator
      """
      self.evaluators = extension.ExtensionManager(
      namespace=self.EXTENSIONS_NAMESPACE,
      invoke_on_load=True,
      invoke_args=(rpc_alarm.RPCAlarmNotifier(),)
      )
      # 这里得到的就是上面所加载的命名空间ceilometer.alarm.evaluator中的所有插件;
      self.supported_evaluators = [ext.name for ext in
                                     self.evaluators.extensions]
2.2 PartitionedAlarmService类启动操作

分布式报警器系统服务分布式报警器系统服务的启动和运行,按照一定的时间间隔周期性的执行以下操作:


1.实现广播当前partition的存在性的存在性到所有的partition(包括uuid和优先级信息);


2.实现定期检测主控权角色;确定当前的partition是否是主控角色;
如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
情况1:所有报警器都要实现重新分配操作;
情况2:只有新建立的报警器需要实现分配操作;


3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
单一报警器模式或者联合报警器模式的评估判定;

class PartitionedAlarmService----def start

def start(self):
    """
    分布式报警器系统服务分布式报警器系统服务的启动和运行;
    按照一定的时间间隔周期性的执行以下操作:
    1.实现广播当前partition的存在性的存在性到所有的partition
    (包括uuid和优先级信息);
    2.实现定期检测主控权角色;确定当前的partition是否是主控角色;
      如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
      情况1:所有报警器都要实现重新分配操作;
      情况2:只有新建立的报警器需要实现分配操作;
    3.获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
      针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
      单一报警器模式或者联合报警器模式的评估判定;
    """
    # 启动PartitionedAlarmService服务;
    super(PartitionedAlarmService, self).start()
    if self.evaluators:
      """
      报警评估周期60s;
      """
      eval_interval = cfg.CONF.alarm.evaluation_interval
            
      """
      self.tg = threadgroup.ThreadGroup(1000)
      按照一定时间间隔实现循环执行方法self.partition_coordinator.report_presence;
      通过方法fanout_cast实现广播当前partition的存在性的存在性到所有的partition(包括uuid和优先级信息);
      """
      self.tg.add_timer(
            eval_interval / 4, # 15s
            self.partition_coordinator.report_presence,
            0)
            
      """
      按照一定时间间隔实现循环执行方法self.partition_coordinator.check_mastership;
      self.partition_coordinator.check_mastership:
      实现定期检测主控权角色;
      确定当前的partition是否是主控角色;
      如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
      """
      self.tg.add_timer(
            eval_interval / 2, # 30s
            self.partition_coordinator.check_mastership,
            eval_interval,   # 60s
            # _client:构建或重新使用一个经过验证的API客户端;
            *)
            
      """
      add_timer:按照一定时间间隔实现循环执行方法self._evaluate_assigned_alarms;
      self._evaluate_assigned_alarms:
      先获取alarm集合,对每一个alarm,调用_evaluate_alarm方法;
      针对每一个报警器,实现根据报警器的类型(threshold和combination),来实现:
      单一报警器状态的评估判定;
      联合报警器状态的评估判定;
      """
      self.tg.add_timer(
            eval_interval,   # 60s
            # 执行报警器的评估操作;
            self._evaluate_assigned_alarms,
            eval_interval)
         
    # Add a dummy thread to have wait() working
    self.tg.add_timer(604800, lambda: None)
class Service(service.Service)----def start

    因为这里实现的是分布式报警系统,涉及到节点服务间的消息通信,所以这里应用RPC消息队列实现服务间的通信,进行相关的初始化操作;

class Service(service.Service):
    def start(self):
      """
      为RPC通信建立到信息总线的连接;
      1.建立指定类型的消息消费者;      
      2.执行方法initialize_service_hook;
      3.启动协程实现等待并消费处理队列中的消息;
      """
      super(Service, self).start()

      """
      为RPC通信建立到信息总线的连接;
      建立一个新的连接,或者从连接池中获取一个;
      """
      self.conn = rpc.create_connection(new=True)
      LOG.debug(_("Creating Consumer connection for Service %s") %
                  self.topic)

      """
      RpcDispatcher:RPC消息调度类;
      """
      dispatcher = rpc_dispatcher.RpcDispatcher(,
                                                self.serializer)

      # Share this same connection for these Consumers
      """
      create_consumer:建立指定类型的消息消费者(fanout or topic);
      1.创建以服务的topic为路由键的消费者;
      2.创建以服务的topic和本机名为路由键的消费者
          (基于topic&host,可用来接收定向消息);
      3.fanout直接投递消息,不进行匹配,速度最快
          (fanout类型,可用于接收广播消息);
      """
      self.conn.create_consumer(self.topic, dispatcher, fanout=False)
      node_topic = '%s.%s' % (self.topic, self.host)
      self.conn.create_consumer(node_topic, dispatcher, fanout=False)
      self.conn.create_consumer(self.topic, dispatcher, fanout=True)

      # Hook to allow the manager to do other initializations after
      # the rpc connection is created.
      """
      在消息消费进程启动前,必须先声明消费者;
      建立一个'topic'类型的消息消费者;
      根据消费者类(TopicConsumer)和消息队列名称
      (pool_name:ceilometer.collector.metering)
      以及指定主题topic(metering)建立消息消费者,并加入消费者列表;
      """
      if callable(getattr(self.manager, 'initialize_service_hook', None)):
            self.manager.initialize_service_hook(self)

      """
      启动消费者线程;
      consume_in_thread用evelent.spawn创建一个协程一直运行;
      等待消息,在有消费到来时会创建新的协程运行远程调用的函数;
      启动协程实现等待并消费处理队列中的消息;
      """
      self.conn.consume_in_thread()

class PartitionedAlarmService----def initialize_service_hook

class PartitionedAlarmService(AlarmService, rpc_service.Service):
    def initialize_service_hook(self, service):
      LOG.debug(_('initialize_service_hooks'))
         
      # create_worker:建立一个'topic'类型的消息消费者;
      # 指定主题topic(alarm_partition_coordination)和队列名称pool_name(ceilometer.alarm.alarm_partition_coordination);
      self.conn.create_worker(
            # alarm_partition_coordination
            cfg.CONF.alarm.partition_rpc_topic,
            rpc_dispatcher.RpcDispatcher(),
            # ceilometer.alarm.alarm_partition_coordination
            'ceilometer.alarm.' + cfg.CONF.alarm.partition_rpc_topic,
      )至此,ceilometer-alarm-evaluator服务的初始化和启动操作分析完成。


附录:python的多重父类继承


    在上面的分析中,我们看到在服务初始化和启动的过程中,若干方法都是多重父类继承,这里需要注意的是父类方法的搜索顺序;实际上python经典类的父类方法搜索顺序是深度优先,而python新式类的父类方法搜索顺序是广度优先;

    我写了一个小小的测试程序,来看新式类的父类方法搜索顺序:

class A(object):
    def start(self):
      print 'A-start'

class B0(A):
    def start(self):
      super(B0,self).start()
      print 'B0-start'

class B1(A):
    def start(self):
      super(B1,self).start()
      print 'B1-start'

class C(B0,B1):
    def start(self):
      super(C,self).start()
      print 'C-start'

TEST = C()
TEST.start()

输出为:
A-start
B1-start
B0-start
C-start






Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动




页: [1]
查看完整版本: Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动