分享

Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动

坎蒂丝_Swan 发表于 2014-12-14 19:22:18 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 22587
本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:45 编辑

问题导读
问题1:服务ceilometer-collector的初始化操作实现了哪些操作?
问题2:服务ceilometer-agent-notification的启动操作实现了什么任务?








ceilometer-collector服务的初始化和启动

    本篇帖子将解析服务组件ceilometer-collector的初始化和启动操作;当信息发布操作完成之后,ceilometer-collector组件服务将会分别获取相关的消息数据,并实现保存获取的消息数据到数据存储系统中;而数据存储系统方案目前也支持几种实现,即log/mongodb/mysql/postgresql/sqlite/hbase/db2等;
   
来看方法/ceilometer/cli.py----def collector_service,这个方法即实现了ceilometer-collector服务的初始化和启动操作。

  1. def collector_service():  
  2.     """
  3.     从消息队列获取相关消息信息,并实现保存数据到数据系统;
  4.     通过监听对应的队列来获取发布到消息队列的采样数据信息,
  5.     并实现保存到存储系统中;
  6.     """  
  7.     service.prepare_service()  
  8.     launcher = os_service.ProcessLauncher()  
  9.     launcher.launch_service(  
  10.         collector.CollectorService(cfg.CONF.host,  
  11.                                    'ceilometer.collector'),  
  12.         workers=service.get_workers('collector'))  
  13.     launcher.wait()  
复制代码


1 服务ceilometer-collector的初始化操作

服务ceilometer-collector的初始化操作主要实现了以下内容的操作:


(1)若干参数的初始化,定义了所要监听序列的host和topic;

(2)建立线程池,用于后续服务中若干操作的运行;

class Service(service.Service)----def __init__

  1. class Service(service.Service):  
  2.     def __init__(self, host, topic, manager=None, serializer=None):  
  3.         """         
  4.         CollectorService(cfg.CONF.host,'ceilometer.collector')
  5.         host:cfg.CONF.host
  6.         topic:'ceilometer.collector'
  7.         """  
  8.         super(Service, self).__init__()  
  9.         self.host = host  
  10.         self.topic = topic  
  11.         self.serializer = serializer  
  12.         if manager is None:  
  13.             self.manager = self  
  14.         else:  
  15.             self.manager = manager
复制代码

class Service(object)----def __init__
  1. class Service(object):  
  2.     def __init__(self, threads=1000):  
  3.         self.tg = threadgroup.ThreadGroup(threads)  
  4.         # signal that the service is done shutting itself down:  
  5.         self._done = event.Event()
复制代码


2 服务ceilometer-collector的启动操作

    服务ceilometer-agent-notification通过监听对应的队列来获取发布到消息队列的采样数据信息,并实现保存到存储系统中;服务ceilometer-agent-notification的启动操作实现了以下任务:

    提供两种方式(UDP,RPC)获取收集发布的信息,并保存到数据存储系统中;

    1.针对UDP的消息发布方式,调用方法实现:
    1.1.获取socket对象;
    1.2.一直循环任务通过UDP协议实现接收消息数据data;
    1.3.保存数据data到数据存储系统(不同的实现后端);

    2.针对RPC的消息发布方式:
    2.1.建立指定类型的消息消费者;      
    2.2.执行方法initialize_service_hook;
        建立一个'topic'类型的消息消费者;
        根据消费者类(TopicConsumer)和消息队列名称(ceilometer.collector.metering,即监听消息队列  ceilometer.collector.metering)以及指定主题topic(metering)建立消息消费者,并加入消费者列表;
    2.3.启动协程实现等待并消费处理队列中的消息;
    2.4.加载命名空间'ceilometer.dispatcher'中的插件:
        ceilometer.dispatcher =
        database = ceilometer.dispatcher.database:DatabaseDispatcher
        file = ceilometer.dispatcher.file:FileDispatcher
        描述了收集发布的监控信息保存到数据系统的实现方式;

    注:针对RPC消息发布方式,在消息msg中封装了方法record_metering_data,

       所以在消息消费者处理消息的时候,也会调用这个方法,实现保存监控数据到数据存储系统中;

class CollectorService----def start

  1. class CollectorService(service.DispatchedService, rpc_service.Service):  
  2.     """
  3.     CollectorService通过监听对应的队列来获取发布到消息队列的采样数据信息,并实现保存到存储系统中;
  4.     """  
  5.     def start(self):  
  6.         """      
  7.         在监控采样信息发布之后,分两种方式(UDP,RPC)获取收集发布的信息,
  8.         并保存到数据存储系统中;
  9.         1.针对UDP的消息发布方式,调用方法实现:
  10.         1.1.获取socket对象;
  11.         1.2.一直循环任务通过UDP协议实现接收消息数据data;
  12.         1.3.保存数据data到数据存储系统(不同的实现后端);
  13.          
  14.         2.针对RPC的消息发布方式:
  15.         2.1.建立指定类型的消息消费者;        
  16.         2.2.执行方法initialize_service_hook;
  17.             建立一个'topic'类型的消息消费者;
  18.             根据消费者类(TopicConsumer)和消息队列名称
  19.             (pool_name:ceilometer.collector.metering)
  20.             以及指定主题topic(metering)建立消息消费者,并加入消费者列表;
  21.         2.3.启动协程实现等待并消费处理队列中的消息;
  22.         2.4.加载命名空间'ceilometer.dispatcher'中的插件:
  23.             ceilometer.dispatcher =
  24.             database = ceilometer.dispatcher.database:DatabaseDispatcher
  25.             file = ceilometer.dispatcher.file:FileDispatcher
  26.         注:针对RPC消息发布方式,在消息msg中封装了方法record_metering_data,
  27.         所以在消息消费者处理消息的时候,也会调用这个方法,实现保存监控数据到数据存储系统中;
  28.         """  
  29.         if cfg.CONF.collector.udp_address:  
  30.             self.tg.add_thread(self.start_udp)  
  31.          
  32.         if cfg.CONF.rpc_backend:  
  33.             super(CollectorService, self).start()  
  34.             if not cfg.CONF.collector.udp_address:  
  35.                 self.tg.add_timer(604800, lambda: None)  
复制代码

class CollectorService----def start_udp
  1. def start_udp(self):  
  2.     """
  3.     1.获取socket对象;
  4.     2.一直循环任务实现接收数据data;
  5.     3.保存数据data到数据存储系统(不同的实现后端);
  6.     """  
  7.     udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  
  8.     udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  
  9.     udp.bind((cfg.CONF.collector.udp_address,<span style="font-family:KaiTi_GB2312;"> </span>cfg.CONF.collector.udp_port))  
  10.   
  11.     self.udp_run = True  
  12.     while self.udp_run:  
  13.         """
  14.         64 * units.Ki = 64k,这里为接收数据的缓冲区;
  15.         接收数据data;
  16.         """  
  17.         data, source = udp.recvfrom(64 * units.Ki)  
  18.         try:  
  19.             sample = msgpack.loads(data)  
  20.         except Exception:  
  21.             LOG.warn(_("UDP: Cannot decode data sent by %s"), str(source))  
  22.         else:  
  23.             try:  
  24.                 LOG.debug(_("UDP: Storing %s"), str(sample))  
  25.                 """
  26.                 保存数据data到数据存储系统(不同的实现后端);
  27.                 保存数据data到数据存储系统;
  28.                 class DatabaseDispatcher(dispatcher.Base)----def record_metering_data
  29.                 记录数据到日志文件;
  30.                 class FileDispatcher(dispatcher.Base)----def record_metering_data
  31.                 """  
  32.                 self.dispatcher_manager.map_method('record_metering_data',  
  33.                                                    sample)  
  34.             except Exception:  
  35.                 LOG.exception(_("UDP: Unable to store meter"))  
复制代码

class Service(service.Service)----def start
  1. class Service(service.Service):  
  2.     def start(self):  
  3.         """
  4.         为RPC通信建立到信息总线的连接;
  5.         1.建立指定类型的消息消费者;        
  6.         2.执行方法initialize_service_hook;
  7.         3.启动协程实现等待并消费处理队列中的消息;
  8.         """  
  9.         super(Service, self).start()  
  10.   
  11.         """
  12.         为RPC通信建立到信息总线的连接;
  13.         建立一个新的连接,或者从连接池中获取一个;
  14.         """  
  15.         self.conn = rpc.create_connection(new=True)  
  16.         LOG.debug(_("Creating Consumer connection for Service %s") %  
  17.                   self.topic)  
  18.   
  19.         """
  20.         RpcDispatcher:RPC消息调度类;
  21.         """  
  22.         dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],  
  23.                                                   self.serializer)  
  24.   
  25.         """
  26.         create_consumer:建立指定类型的消息消费者(fanout or topic);
  27.         1.创建以服务的topic为路由键的消费者;
  28.         2.创建以服务的topic和本机名为路由键的消费者
  29.           (基于topic&host,可用来接收定向消息);
  30.         3.fanout直接投递消息,不进行匹配,速度最快
  31.           (fanout类型,可用于接收广播消息);
  32.         """  
  33.         self.conn.create_consumer(self.topic, dispatcher, fanout=False)  
  34.         node_topic = '%s.%s' % (self.topic, self.host)  
  35.         self.conn.create_consumer(node_topic, dispatcher, fanout=False)  
  36.         self.conn.create_consumer(self.topic, dispatcher, fanout=True)  
  37.   
  38.         """
  39.         在消息消费进程启动前,必须先声明消费者;
  40.         建立一个'topic'类型的消息消费者;
  41.         根据消费者类(TopicConsumer)和消息队列名称
  42.         (pool_name:  ceilometer.collector.metering)
  43.         以及指定主题topic(metering)建立消息消费者,并加入消费者列表;
  44.         """  
  45.         if callable(getattr(self.manager, 'initialize_service_hook', None)):  
  46.             self.manager.initialize_service_hook(self)  
  47.   
  48.         """
  49.         启动消费者线程;
  50.         consume_in_thread用evelent.spawn创建一个协程一直运行;
  51.         等待消息,在有消费到来时会创建新的协程运行远程调用的函数;
  52.         启动协程实现等待并消费处理队列中的消息;
  53.         """  
  54.         self.conn.consume_in_thread()  
复制代码

class CollectorService----def initialize_service_hook
  1. def initialize_service_hook(self, service):  
  2.     '''''
  3.     在消息消费进程启动前,必须先声明消费者;
  4.     建立一个'topic'类型的消息消费者;
  5.     根据消费者类(TopicConsumer)和消息队列名称
  6.     (pool_name:  ceilometer.collector.metering)
  7.     以及指定主题topic(metering)建立消息消费者,并加入消费者列表;
  8.     '''        
  9.     """
  10.     建立一个'topic'类型的消息消费者;
  11.     根据消费者类(TopicConsumer)和消息队列名称
  12.     (pool_name:  ceilometer.collector.metering)
  13.     以及指定主题topic(metering)建立消息消费者,并加入消费者列表;
  14.     """  
  15.     self.conn.create_worker(  
  16.         # metering  
  17.         cfg.CONF.publisher_rpc.metering_topic,  
  18.         # 获取类RpcDispatcher的初始化对象;  
  19.         rpc_dispatcher.RpcDispatcher([self]),  
  20.         # ceilometer.collector.metering  
  21.         'ceilometer.collector.' + cfg.CONF.publisher_rpc.metering_topic,  
  22.     )  
复制代码


针对收集的发布的监控信息数据,系统提供两类方式用以实现监控信息数据保存到数据存储系统的操作。

方式一:用于实现保存数据到日志文件

  1. class FileDispatcher(dispatcher.Base):  
  2.     '''''
  3.     用于实现保存数据到日志文件的实现类;
  4.     '''  
  5.     def record_metering_data(self, data):  
  6.         """
  7.         记录数据到日志文件;
  8.         """  
  9.         if self.log:  
  10.             self.log.info(data)
复制代码

方式二:用于实现保存数据到数据存储系统(不同的实现后端)
  1. class DatabaseDispatcher(dispatcher.Base):  
  2.     '''''
  3.     用于实现保存数据到数据存储系统(不同的实现后端)的实现类;
  4.     '''  
  5.     def record_metering_data(self, data):  
  6.         """
  7.         保存数据data到数据存储系统(不同的实现后端);
  8.         """  
  9.         if not isinstance(data, list):  
  10.             data = [data]  
  11.   
  12.         for meter in data:  
  13.             LOG.debug(_(  
  14.                 'metering data %(counter_name)s '  
  15.                 'for %(resource_id)s @ %(timestamp)s: %(counter_volume)s')  
  16.                 % ({'counter_name': meter['counter_name'],  
  17.                     'resource_id': meter['resource_id'],  
  18.                     'timestamp': meter.get('timestamp', 'NO TIMESTAMP'),  
  19.                     'counter_volume': meter['counter_volume']}))  
  20.             if publisher_utils.verify_signature(  
  21.                     meter,  
  22.                     self.conf.publisher.metering_secret):  
  23.                 try:  
  24.                     if meter.get('timestamp'):  
  25.                         ts = timeutils.parse_isotime(meter['timestamp'])  
  26.                         meter['timestamp'] = timeutils.normalize_time(ts)  
  27.                      
  28.                     """
  29.                     保存数据到数据存储系统;
  30.                     """  
  31.                     self.storage_conn.record_metering_data(meter)  
  32.                 except Exception as err:  
  33.                     LOG.exception(_('Failed to record metering data: %s'),  
  34.                                   err)  
  35.             else:  
  36.                 LOG.warning(_(  
  37.                     'message signature invalid, discarding message: %r'),  
  38.                     meter)  
复制代码


来看配置文件setup.cfg中与监控数据存储实现相关的配置信息:

数据存储的实现(存储到数据库和写入到指定日志文件);
ceilometer.dispatcher=
     database= ceilometer.dispatcher.database:DatabaseDispatcher
     file= ceilometer.dispatcher.file:FileDispatcher

数据存储现在支持MongoDB,MySQL,Postgresql和HBase,现在H3又新增加了对DB2的支持,其中MongoDB是支持最好的;
ceilometer.storage=
    #日志记录数据;
    log= ceilometer.storage.impl_log:LogStorage
    #基于配置设置获取到MongoDB数据库的连接实例;
    mongodb= ceilometer.storage.impl_mongodb:MongoDBStorage
    #实现到SqlAlchemy的连接,以实现存储数据到数据库;
    mysql= ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage
    #实现到SqlAlchemy的连接,以实现存储数据到数据库;
    postgresql= ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage
    #实现到SqlAlchemy的连接,以实现存储数据到数据库;
    sqlite= ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage
    #实现到HBase数据库的连接,以实现存储数据到数据库;
    hbase= ceilometer.storage.impl_hbase:HBaseStorage
    #实现到DB2数据库的连接,以实现存储数据到数据库;
    db2= ceilometer.storage.impl_db2:DB2Storage





Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动





欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条