分享

Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动

坎蒂丝_Swan 发表于 2014-12-13 18:30:47 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 16879
本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:43 编辑

问题导读
1:组件服务ceilometer-alarm-notifier所实现的功能是什么?如何实现的?
2:当报警器被触发之后,方法notify_alarm实现报警器触发后的发送通知操作的具体哪些方法?








ceilometer-alarm-notifier服务的初始化和启动

    在I版的ceilometer的各个服务组件启动是在/ceilometer/cli.py中实现的,而在J版中,是在实体包/celiometer/cmd中实现的。结合配置文件setup.cfg中的内容,可见主要有以下几个服务组件:

    console_scripts=

    ceilometer-api= ceilometer.cli:api

    ceilometer-agent-central= ceilometer.cli:agent_central

    ceilometer-agent-compute= ceilometer.cli:agent_compute

    ceilometer-agent-notification= ceilometer.cli:agent_notification

    ceilometer-send-sample= ceilometer.cli:send_sample

    ceilometer-dbsync= ceilometer.cli:storage_dbsync

    ceilometer-expirer= ceilometer.cli:storage_expirer

    ceilometer-collector= ceilometer.cli:collector_service

    ceilometer-alarm-evaluator= ceilometer.cli:alarm_evaluator

    ceilometer-alarm-notifier= ceilometer.cli:alarm_notifier

    以下几篇博客,分别来关注一下比较重要服务组件的初始化和启动过程。首先来看ceilometer-alarm-notifier的初始化和启动过程,这个组件服务所实现的功能是,当报警器被触发之后,发送相关的通知操作。

    来看方法/ceilometer/cli.py----def alarm_notifier,这个方法即实现了ceilometer-alarm-notifier的初始化和启动操作;

  1. def alarm_notifier():  
  2.     """
  3.     初始化并启动AlarmNotifierService服务(报警器被触发的通知服务);
  4.     """  
  5.     service.prepare_service()  
  6.       
  7.     """
  8.     launch:加载并启动指定的服务,最终调用了服务的start方法实现服务的启动;
  9.     AlarmNotifierService:报警器被触发的通知服务实现;
  10.     AlarmNotifierService():获取类AlarmNotifierService的实例化对象,并实现加载EXTENSIONS_NAMESPACE对应的所有插件;
  11.     """  
  12.     os_service.launch(alarm_service.AlarmNotifierService(  
复制代码


方法小结:
    1 实现AlarmNotifierService类的初始化操作,在类的初始化过程中会加载命名空间ceilometer.alarm.notifier所定义的所有插件,确定所有实现通知操作的实现方式;

    2 launch方法最终会调用类AlarmNotifierService的start方法,实现组件服务的启动操作;

1 服务ceilometer-alarm-notifier的初始化

  1. class AlarmNotifierService(rpc_service.Service):  
  2.     """
  3.     报警器被触发的通知服务实现;
  4.     """  
  5.     EXTENSIONS_NAMESPACE = "ceilometer.alarm.notifier"  
  6.   
  7.     def __init__(self, host, topic):  
  8.         """
  9.         host = cfg.CONF.host
  10.         topic = 'ceilometer.alarm'
  11.          
  12.         获取类AlarmNotifierService的实例化对象;
  13.         并实现加载EXTENSIONS_NAMESPACE对应的所有插件;
  14.         ceilometer.alarm.notifier =
  15.             log = ceilometer.alarm.notifier.log:LogAlarmNotifier
  16.             test = ceilometer.alarm.notifier.test:TestAlarmNotifier
  17.             http = ceilometer.alarm.notifier.rest:RestAlarmNotifier
  18.             https = ceilometer.alarm.notifier.rest:RestAlarmNotifier
  19.         """  
  20.         super(AlarmNotifierService, self).__init__(host, topic, self)  
  21.        self.notifiers = extension.ExtensionManager(self.EXTENSIONS_NAMESPACE,                                   invoke_on_load=True)  
复制代码


方法小结:
    实现AlarmNotifierService类的初始化操作,在类的初始化过程中会加载命名空间ceilometer.alarm.notifier所定义的所有插件,确定所有实现通知操作的实现方式;

    ceilometer.alarm.notifier=
    log= ceilometer.alarm.notifier.log:LogAlarmNotifier
    test= ceilometer.alarm.notifier.test:TestAlarmNotifier
    http= ceilometer.alarm.notifier.rest:RestAlarmNotifier
    https= ceilometer.alarm.notifier.rest:RestAlarmNotifier
    定义了通知操作的实现方式,即记录报警器触发信息到日志和通过http/https协议实现发送报警器触发信息。

2 服务ceilometer-alarm-notifier的启动

class AlarmNotifierService----def start

  1. class AlarmNotifierService(rpc_service.Service):  
  2.     def start(self):  
  3.         """               
  4.         服务的启动;
  5.         为RPC通信建立到信息总线的连接;
  6.         1.建立指定类型的消息消费者;        
  7.         2.执行方法initialize_service_hook,建立一个'topic'类型的消息消费者;
  8.         3.启动协程实现等待并消费处理队列中的消息;
  9.         """  
  10.         super(AlarmNotifierService, self).start()  
  11.         # Add a dummy thread to have wait() working  
  12.         self.tg.add_timer(604800, lambda: None)  
复制代码


方法小结:
    服务的启动,为RPC通信建立到信息总线的连接;

    1.建立指定类型的消息消费者;

    2.执行方法initialize_service_hook,建立一个'topic'类型的消息消费者;

    3.启动协程实现等待并消费处理队列中的消息;

class Service----def start

  1. class Service(service.Service):  
  2.     def start(self):  
  3.         """
  4.         为RPC通信建立到信息总线的连接;
  5.         1.建立指定类型的消息消费者;        
  6.         2.执行方法initialize_service_hook;
  7.         3.启动协程实现等待并消费处理队列中的消息;
  8.         """  
  9.         super(Service, self).start()  
  10.   
  11.         """
  12.         为RPC通信建立到信息总线的连接;
  13.         建立一个新的连接,或者从连接池中获取一个;
  14.         """  
  15.         self.conn = rpc.create_connection(new=True)  
  16.         LOG.debug(_("Creating Consumer connection for Service %s") %  
  17.                   self.topic)  
  18.   
  19.         """
  20.         RpcDispatcher:RPC消息调度类;
  21.         """  
  22.         dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],  
  23.                                                   self.serializer)  
  24.   
  25.         """
  26.         create_consumer:建立指定类型的消息消费者(fanout or topic);
  27.         1.创建以服务的topic为路由键的消费者;
  28.         2.创建以服务的topic和本机名为路由键的消费者
  29.           (基于topic&host,可用来接收定向消息);
  30.         3.fanout直接投递消息,不进行匹配,速度最快
  31.           (fanout类型,可用于接收广播消息);
  32.         """  
  33.         self.conn.create_consumer(self.topic, dispatcher, fanout=False)  
  34.         node_topic = '%s.%s' % (self.topic, self.host)  
  35.         self.conn.create_consumer(node_topic, dispatcher, fanout=False)  
  36.         self.conn.create_consumer(self.topic, dispatcher, fanout=True)  
  37.   
  38.         """
  39.         在消息消费进程启动前,必须先声明消费者;
  40.         建立一个'topic'类型的消息消费者;
  41.         根据消费者类(TopicConsumer)和消息队列名称
  42.         (pool_name:  ceilometer.collector.metering)
  43.         以及指定主题topic(metering)建立消息消费者,并加入消费者列表;
  44.         """  
  45.         if callable(getattr(self.manager, 'initialize_service_hook', None)):  
  46.             self.manager.initialize_service_hook(self)  
  47.   
  48.         """
  49.         启动消费者线程;
  50.         consume_in_thread用evelent.spawn创建一个协程一直运行;
  51.         等待消息,在有消费到来时会创建新的协程运行远程调用的函数;
  52.         启动协程实现等待并消费处理队列中的消息;
  53.         """  
  54.         self.conn.consume_in_thread()  
复制代码

class AlarmNotifierService----def initialize_service_hook

  1. class AlarmNotifierService(rpc_service.Service):  
  2.     def initialize_service_hook(self, service):  
  3.         """
  4.         建立一个'topic'类型的消息消费者;
  5.         指定主题topic(alarm_notifier)和队列名称pool_name(ceilometer.alarm.alarm_notifier);
  6.         """  
  7.         LOG.debug(_('initialize_service_hooks'))  
  8.         self.conn.create_worker(  
  9.             # alarm_notifier  
  10.             cfg.CONF.alarm.notifier_rpc_topic,  
  11.             rpc_dispatcher.RpcDispatcher([self]),  
  12.             # ceilometer.alarm.alarm_notifier  
  13.             'ceilometer.alarm.' + cfg.CONF.alarm.notifier_rpc_topic,  
  14.         )  
复制代码


3 类AlarmNotifierService中的方法notify_alarm

    方法notify_alarm是具体实现报警器触发后的发送通知操作的具体方法,当报警器被触发之后,都会调用这个方法,主要实现了以下内容:

    遍历所有要通知报警器被触发的URL,针对每个要通知的URL地址,实现:

    1 获取系统所采用的消息通信方式;

    2 通过HTTP/HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),(or)通过日志记录相关报警器被触发的通知;


  1. def notify_alarm(self, context, data):  
  2.     """
  3.     遍历所有要通知报警器被触发的URL,针对每个要通知的URL地址,实现:
  4.     1.获取系统所采用的消息通信方式;
  5.     2.通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),(or)通过日志记录相关报警器被触发的通知;               
  6.          
  7.     通知报警器被触发,通知内容中包括:
  8.     alarm_id:被触发的报警器的ID;
  9.     previous:报警器(触发)之前的状态;
  10.     current:报警器被触发后转换到的新状态;
  11.     reason:改变报警器状态的原因;
  12.     """  
  13.     actions = data.get('actions')  
  14.     if not actions:  
  15.         LOG.error(_("Unable to notify for an alarm with no action"))  
  16.         return  
  17.   
  18.     # actions:要运行的action的URL(多个);  
  19.     # alarm_id:被触发的报警器的ID;  
  20.     # previous:报警器(触发)之前的状态;  
  21.     # current:报警器被触发后转换到的新状态;  
  22.     # reason:改变报警器状态的原因;  
  23.     for action in actions:  
  24.         """
  25.         获取系统所采用的消息通信方式;
  26.         通过HTTPS协议POST方法实现发送相关报警器被触发的通知(到action指定的URL),(or)通过日志记录相关报警器被触发的通知;
  27.         """  
  28.         self._handle_action(action,  
  29.                             data.get('alarm_id'),  
  30.                             data.get('previous'),  
  31.                             data.get('current'),  
  32.                             data.get('reason'),  
  33.                             data.get('reason_data'))  
复制代码






Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动






欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条