分享

Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动

坎蒂丝_Swan 发表于 2014-12-13 19:36:02 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 11951
本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:44 编辑

问题导读
问题1:服务ceilometer-agent-central的初始化操作主要实现了哪些内容的操作?
问题2:服务ceilometer-agent-central的启动操作周期性地实现什么任务?









ceilometer-agent-central服务的初始化和启动



    本篇博客将解析服务组件ceilometer-agent-central的初始化和启动操作,ceilometer-agent-central服务组件运行在控制节点上,它主要通过调用相关模块的REST API,通过访问相关模块的客户端,从而实现主动收集相关模块(Image,Volume,Objects,Network)的监控数据,需要定期Poll轮询收集信息。


    来看方法/ceilometer/cli.py----def agent_central,这个方法即实现了ceilometer-agent-central服务的初始化和启动操作。

  1. def agent_central():  
  2.     """
  3.     加载并启动AgentManager服务;   
  4.     Central Agent运行在控制节点上,它主要收集其它服务(Image,Volume,Objects,Network)的信息,实现逻辑和Compute Agent类似,但是是通过调用这些服务的REST API去获取这些数据的。
  5.     """  
  6.     service.prepare_service()  
  7.     os_service.launch(central_manager.AgentManager()).wait()  
复制代码


1 服务ceilometer-agent-central的初始化操作



服务ceilometer-agent-central的初始化操作主要实现了以下内容的操作:

(1)根据指定参数获取命名空间ceilometer.poll.central,获取与ceilometer.poll.central相匹配的所有插件,并加载;ceilometer.poll.central所指定的插件描述了如何获取收集相关模块(Image,Volume,Objects,Network)的监控数据。

(2)获取管理员操作的上下文环境类的初始化对象;

(3)建立线程池,用于后续服务中若干操作的运行;

class AgentManager(agent.AgentManager)----def __init__

  1. class AgentManager(agent.AgentManager):  
  2.     def __init__(self):  
  3.         super(AgentManager, self).__init__('central')
复制代码

class AgentManager(os_service.Service)----def __init__
  1. class AgentManager(os_service.Service):  
  2.     def __init__(self, namespace, default_discovery=[]):  
  3.         super(AgentManager, self).__init__()  
  4.         self.default_discovery = default_discovery  
  5.          
  6.         """
  7.         加载命名空间ceilometer.poll.central的所有插件:
  8.         ceilometer.poll.central =
  9.         ip.floating = ceilometer.network.floatingip:FloatingIPPollster
  10.         image = ceilometer.image.glance:ImagePollster
  11.         image.size = ceilometer.image.glance:ImageSizePollster
  12.         storage.containers.objects = ceilometer.objectstore.swift:ContainersObjectsPollster
  13.         storage.containers.objects.size = ceilometer.objectstore.swift:ContainersSizePollster
  14.         storage.objects = ceilometer.objectstore.swift:ObjectsPollster
  15.         storage.objects.size = ceilometer.objectstore.swift:ObjectsSizePollster
  16.         storage.objects.containers = ceilometer.objectstore.swift:ObjectsContainersPollster
  17.         energy = ceilometer.energy.kwapi:EnergyPollster
  18.         power = ceilometer.energy.kwapi:PowerPollster
  19.         switch.port = ceilometer.network.statistics.port:PortPollster
  20.         switch.port.receive.packets = ceilometer.network.statistics.port:PortPollsterReceivePackets
  21.         switch.port.transmit.packets = ceilometer.network.statistics.port:PortPollsterTransmitPackets
  22.         switch.port.receive.bytes = ceilometer.network.statistics.port:PortPollsterReceiveBytes
  23.         switch.port.transmit.bytes = ceilometer.network.statistics.port:PortPollsterTransmitBytes
  24.         switch.port.receive.drops = ceilometer.network.statistics.port:PortPollsterReceiveDrops
  25.         switch.port.transmit.drops = ceilometer.network.statistics.port:PortPollsterTransmitDrops
  26.         switch.port.receive.errors = ceilometer.network.statistics.port:PortPollsterReceiveErrors
  27.         switch.port.transmit.errors = ceilometer.network.statistics.port:PortPollsterTransmitErrors
  28.         switch.port.receive.frame_error = ceilometer.network.statistics.port:PortPollsterReceiveFrameErrors
  29.         switch.port.receive.overrun_error = ceilometer.network.statistics.port:PortPollsterReceiveOverrunErrors
  30.         switch.port.receive.crc_error = ceilometer.network.statistics.port:PortPollsterReceiveCRCErrors
  31.         switch.port.collision.count = ceilometer.network.statistics.port:PortPollsterCollisionCount
  32.         switch.table = ceilometer.network.statistics.table:TablePollster
  33.         switch.table.active.entries = ceilometer.network.statistics.table:TablePollsterActiveEntries
  34.         switch.table.lookup.packets = ceilometer.network.statistics.table:TablePollsterLookupPackets
  35.         switch.table.matched.packets = ceilometer.network.statistics.table:TablePollsterMatchedPackets
  36.         switch = ceilometer.network.statistics.switch:SWPollster
  37.         switch.flow = ceilometer.network.statistics.flow:FlowPollster
  38.         switch.flow.bytes = ceilometer.network.statistics.flow:FlowPollsterBytes
  39.         switch.flow.duration.nanoseconds = ceilometer.network.statistics.flow:FlowPollsterDurationNanoseconds
  40.         switch.flow.duration.seconds = ceilometer.network.statistics.flow:FlowPollsterDurationSeconds
  41.         switch.flow.packets = ceilometer.network.statistics.flow:FlowPollsterPackets
  42.         hardware.cpu.load.1min = ceilometer.hardware.pollsters.cpu:CPULoad1MinPollster
  43.         hardware.cpu.load.5min = ceilometer.hardware.pollsters.cpu:CPULoad5MinPollster
  44.         hardware.cpu.load.15min = ceilometer.hardware.pollsters.cpu:CPULoad15MinPollster
  45.         hardware.disk.size.total = ceilometer.hardware.pollsters.disk:DiskTotalPollster
  46.         hardware.disk.size.used = ceilometer.hardware.pollsters.disk:DiskUsedPollster
  47.         hardware.network.bandwidth.bytes = ceilometer.hardware.pollsters.net:BandwidthBytesPollster
  48.         hardware.network.incoming.bytes = ceilometer.hardware.pollsters.net:IncomingBytesPollster
  49.         hardware.network.outgoing.bytes = ceilometer.hardware.pollsters.net:OutgoingBytesPollster
  50.         hardware.network.outgoing.errors = ceilometer.hardware.pollsters.net:OutgoingErrorsPollster
  51.         hardware.memory.total = ceilometer.hardware.pollsters.memory:MemoryTotalPollster
  52.         hardware.memory.used = ceilometer.hardware.pollsters.memory:MemoryUsedPollster
  53.         """  
  54.         self.pollster_manager = self._extensions('poll', namespace)  
  55.          
  56.         """
  57.         加载ceilometer.discover所有插件:
  58.         """  
  59.         self.discovery_manager = self._extensions('discover')  
  60.         self.context = context.RequestContext('admin', 'admin', is_admin=True)  
复制代码

class AgentManager(os_service.Service)----def _extensions
  1. def _extensions(category, agent_ns=None):  
  2.     """
  3.     根据指定参数获取命名空间namespace,
  4.     获取与namespace相匹配的所有插件,并加载;
  5.     """  
  6.     namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns else 'ceilometer.%s' % category)  
  7.          
  8.     """
  9.     获取与namespace相匹配的所有插件,并加载;
  10.     """  
  11.     return extension.ExtensionManager(  
  12.         namespace=namespace,  
  13.         invoke_on_load=True,  
  14.     )  
复制代码

class Service(object)----def __init__

  1. class Service(object):  
  2.     """Service object for binaries running on hosts."""  
  3.     def __init__(self, threads=1000):  
  4.         self.tg = threadgroup.ThreadGroup(threads)  
  5.   
  6.         # signal that the service is done shutting itself down:  
  7.         self._done = event.Event()  
复制代码

2 服务ceilometer-agent-central的启动操作


服务ceilometer-agent-central的启动操作周期性地实现以下任务:


(1)遍历任务(通道),获取每个任务指定获取的监控项的采样数据;

(2)针对每个监控项的采样数据,实现发布监控项采样数据样本到消息队列,其中实现采样数据发布的方式有三种,即RPC/UDP/FILE;

     其中,RPC将会发布相关消息到消息队列,后续的collector组件服务将会监听相应的消息队列来获取这些数据信息;UDP将会建立socket建立一个信息通道,实现发送相关消息数据,而后续的collector组件服务将会通过这个信息通道接收相关的消息数据;FILE将会直接保存相关消息数据到指定的日志文件中。

class AgentManager(os_service.Service)----def start

  1. class AgentManager(os_service.Service):  
  2.     def start(self):  
  3.         self.pipeline_manager = pipeline.setup_pipeline()  
  4.         for interval,task in self.setup_polling_tasks().iteritems():  
  5.             self.tg.add_timer(interval,  
  6.                               self.interval_task,  
  7.                               task=task)
复制代码

class AgentManager(os_service.Service)----def interval_task
  1. def interval_task(task):  
  2.         task.poll_and_publish()  
复制代码

class PollingTask(object)----def poll_and_publish
  1. def poll_and_publish(self):  
  2.         """
  3.         创建轮徇的任务;
  4.         任务以一定时间间隔周期性地进行;
  5.          
  6.         遍历任务(通道),获取每个任务指定获取的监控项的采样数据;
  7.         针对每个监控项的采样数据,实现发布监控项采样数据样本到消息队列;
  8.         """  
  9.         agent_resources = self.manager.discover()  
  10.         with self.publish_context as publisher:  
  11.             cache = {}      
  12.             """
  13.             遍历任务(通道);
  14.             获取每个任务指定获取的监控项的采样数据;
  15.             """  
  16.             for pollster in self.pollsters:  
  17.                 key = pollster.name  
  18.                 LOG.info(_("Polling pollster %s"), key)  
  19.                 source_resources = list(self.resources[key].resources)  
  20.                 try:  
  21.                     """
  22.                     get_samples:获取某一监控项的采样数据;
  23.                     注:各个get_samples方法的具体实现,都是通过相关客户端访问相关服务实现获取相关的采样数据;
  24.                     class FloatingIPPollster(plugin.CentralPollster)----def get_samples(self, manager, cache)
  25.                     class ImagePollster(_Base)----def get_samples(self, manager, cache)
  26.                     class ImageSizePollster(_Base)----def get_samples(self, manager, cache)
  27.                     class PowerPollster(_Base)----def get_samples(self, manager, cache)
  28.                     class EnergyPollster(_Base)----def get_samples(self, manager, cache)
  29.                     class ObjectsSizePollster(_Base)----def get_samples(self, manager, cache)
  30.                     class ObjectsContainersPollster(_Base)----def get_samples(self, manager, cache)
  31.                     class ObjectsPollster(_Base)----def get_samples(self, manager, cache)
  32.                     class ObjectsPollster(_Base)----def get_samples(self, manager, cache)
  33.                     class ObjectsSizePollster(_Base)----def get_samples(self, manager, cache)
  34.                     class ObjectsContainersPollster(_Base)----def get_samples(self, manager, cache)
  35.                     class ContainersObjectsPollster(_Base)----def get_samples(self, manager, cache)
  36.                     class ContainersSizePollster(_Base)----def get_samples(self, manager, cache)
  37.                     class HardwarePollster(plugin.CentralPollster)def get_samples(self, manager, cache, resources=[])
  38.                     """  
  39.                     samples = list(pollster.obj.get_samples(  
  40.                         self.manager,  
  41.                         cache,  
  42.                         resources=source_resources or agent_resources,  
  43.                     ))  
  44.                      
  45.                     """
  46.                     实现发布监控项采样数据样本;
  47.                     """  
  48.                     publisher(samples)  
  49.                 except Exception as err:  
  50.                     LOG.warning(_(  
  51.                         'Continue after error from %(name)s: %(error)s')  
  52.                         % ({'name': pollster.name, 'error': err}),  
  53.                         exc_info=True)  
复制代码


方法小结:
本方法周期性地执行以下操作:


(1)遍历任务(通道),获取每个任务指定获取的监控项的采样数据;


(2)针对每个监控项的采样数据,实现发布监控项采样数据样本到消息队列;


class PublishContext----def __enter__


在上述代码中,发布监控项采样样本数据是由这个方法实现的,这个方法主要实现了以下内容:

实现发布监控项采样数据样本,其中实现采样数据发布的方式有三种,即RPC/UDP/FILE;


(1).class FilePublisher(publisher.PublisherBase)----def publish_samples(self, context, samples);
实现发布采样数据到一个日志文件;


(2).class RPCPublisher(publisher.PublisherBase)----def publish_samples(self, context, samples);


实现通过RPC发布采样数据;


* 从若干采样数据信息samples中获取提取数据形成信息格式meters,为信息的发布或存储做准备;


* 将之前从采样数据中提取的信息meters包装成msg;


* 将匹配的topic,msg添加到本地队列local_queue中,topic默认为metering;


* 实现发布本地队列local_queue中的所有数据信息到队列metering中;


* 其中,消息msg中封装的'method'方法为'record_metering_data',即当消息被消费时,将会执行方法record_metering_data,实现存储到数据存储系统中(数据库);


(3).class UDPPublisher(publisher.PublisherBase)----def publish_samples(self, context, samples)
通过UDP发布采样数据;

  1. class PublishContext(object):  
  2.     def __enter__(self):  
  3.         """
  4.         实现发布监控项采样数据样本;
  5.         publish_samples:
  6.         1.class FilePublisher(publisher.PublisherBase)----def publish_samples(self, context, samples);
  7.         实现发布采样数据到一个日志文件;
  8.         2.class RPCPublisher(publisher.PublisherBase)----def publish_samples(self, context, samples);
  9.         通过RPC发布采样数据;
  10.         * 从若干采样数据信息samples中获取提取数据形成信息格式meters,为信息的发布或存储做准备;
  11.         * 将之前从采样数据中提取的信息meters包装成msg;
  12.         * 将匹配的topic,msg添加到本地队列local_queue中,topic默认为metering;
  13.         * 实现发布本地队列local_queue中的所有数据信息到队列metering中;
  14.         * 其中,消息msg中封装的'method'方法为'record_metering_data',即当消息被消费时,将会
  15.           执行方法record_metering_data,实现存储到数据存储系统中(数据库);
  16.         3.class UDPPublisher(publisher.PublisherBase)----def publish_samples(self, context, samples)
  17.         通过UDP发布采样数据;
  18.         """  
  19.         def p(samples):  
  20.             for p in self.pipelines:  
  21.                 p.publish_samples(self.context,  
  22.                                   samples)  
  23.         return p  
复制代码


2.1 实现发布采样数据到一个日志文件
  1. class FilePublisher(publisher.PublisherBase):  
  2.     def publish_samples(self, context, samples):  
  3.         if self.publisher_logger:  
  4.             for sample in samples:  
  5.                 self.publisher_logger.info(sample.as_dict())  
复制代码


2.2 通过RPC发布采样数据(具体见代码注释)

  1. class RPCPublisher(publisher.PublisherBase):  
  2.     def publish_samples(self, context, samples):  
  3.         """
  4.         通过RPC发布信息;
  5.         1.从若干采样数据信息samples中获取提取数据形成信息格式meters,为信息的发布或存储做准备;
  6.         2.将之前从采样数据中提取的信息meters包装成msg;
  7.         3.将匹配的topic,msg添加到本地队列local_queue中,topic默认为metering;
  8.         4.实现发布本地队列local_queue中的所有数据信息到队列metering中;
  9.         5.其中,消息msg中封装的'method'方法为'record_metering_data',即当消息被消费时,将会
  10.           执行方法record_metering_data,实现存储到数据存储系统中(数据库);
  11.         """  
  12.   
  13.         # 从若干采样数据信息中获取提取数据形成信息格式,为信息的发布或存储做准备;  
  14.         meters = [  
  15.             # meter_message_from_counter:  
  16.             # 为一个监控采样数据做好准备被发布或存储;  
  17.             # 从一个采样数据信息中获取提取信息形成msg;  
  18.             utils.meter_message_from_counter(  
  19.                 sample,  
  20.                 cfg.CONF.publisher.metering_secret)  
  21.             for sample in samples  
  22.         ]  
  23.   
  24.         # cfg.CONF.publisher_rpc.metering_topic:metering messages所使用的主题,默认为metering;  
  25.         topic = cfg.CONF.publisher_rpc.metering_topic  
  26.          
  27.         # 将之前从采样数据中提取的信息meters包装成msg;  
  28.         msg = {  
  29.             'method': self.target,  
  30.             'version': '1.0',  
  31.             'args': {'data': meters},  
  32.         }  
  33.          
  34.         # 将匹配的topic,msg添加到本地队列local_queue中,topic默认为metering;  
  35.         self.local_queue.append((context, topic, msg))  
  36.   
  37.         if self.per_meter_topic:  
  38.             for meter_name, meter_list in itertools.groupby(  
  39.                     sorted(meters, key=operator.itemgetter('counter_name')),  
  40.                     operator.itemgetter('counter_name')):  
  41.                 msg = {  
  42.                     'method': self.target,  
  43.                     'version': '1.0',  
  44.                     'args': {'data': list(meter_list)},  
  45.                 }  
  46.                 topic_name = topic + '.' + meter_name  
  47.                 LOG.audit(_('Publishing %(m)d samples on %(n)s') % (  
  48.                           {'m': len(msg['args']['data']), 'n': topic_name}))  
  49.                 self.local_queue.append((context, topic_name, msg))  
  50.   
  51.         # 实现发布本地队列local_queue中的所有数据信息;  
  52.         self.flush()  
复制代码
  1. def flush(self):  
  2.         """
  3.         实现发布本地队列中的所有数据信息;
  4.         """         
  5.         # 获取本地队列的数据信息;  
  6.         queue = self.local_queue  
  7.         self.local_queue = []  
  8.          
  9.         # 实现循环发布队列queue中的信息数据;  
  10.         self.local_queue = self._process_queue(queue, self.policy) + \self.local_queue  
  11.          
  12.         if self.policy == 'queue':  
  13.             self._check_queue_length()  
复制代码
  1. @staticmethod  
  2.     def _process_queue(queue, policy):  
  3.         """
  4.         实现循环发布队列queue中的信息数据;
  5.         """  
  6.         while queue:  
  7.             # 取出第一位的topic、msg等数据;  
  8.             context, topic, msg = queue[0]  
  9.             try:  
  10.                 # 实现远程发布信息,不返回任何值;  
  11.                 rpc.cast(context, topic, msg)  
  12.             except (SystemExit, rpc.common.RPCException):  
  13.                 samples = sum([len(m['args']['data']) for n, n, m in queue])  
  14.                 if policy == 'queue':  
  15.                     LOG.warn(_("Failed to publish %d samples, queue them"),samples)  
  16.                     return queue  
  17.                 elif policy == 'drop':  
  18.                     LOG.warn(_("Failed to publish %d samples, dropping them"),samples)  
  19.                     return []  
  20.                 # default, occur only if rabbit_max_retries > 0  
  21.                 raise  
  22.             else:  
  23.                 # 从队列中删除发布后的信息;  
  24.                 queue.pop(0)  
  25.         return []  
复制代码


2.3 通过UDP发布采样数据(具体见代码注释)

  1. class UDPPublisher(publisher.PublisherBase):  
  2.     def publish_samples(self, context, samples):  
  3.         """
  4.         通过UDP协议发送meter信息到服务器端,实现监控信息的发布;
  5.         """  
  6.         for sample in samples:  
  7.             """
  8.             为一个监控采样数据做好准备被发布或存储;
  9.             从一个采样数据信息中获取提取信息形成msg;
  10.             """  
  11.             msg = utils.meter_message_from_counter(  
  12.                 sample,  
  13.                 cfg.CONF.publisher.metering_secret)  
  14.             host = self.host  
  15.             port = self.port  
  16.             LOG.debug(_("Publishing sample %(msg)s over UDP to "  
  17.                         "%(host)s:%(port)d") % {'msg': msg, 'host': host,'port': port})  
  18.             """
  19.             通过UDP协议发送meter信息到服务器端,实现监控信息的发布;
  20.             """  
  21.             try:  
  22.                 self.socket.sendto(msgpack.dumps(msg),(self.host, self.port))  
  23.             except Exception as e:  
  24.                 LOG.warn(_("Unable to send sample over UDP"))  
  25.                 LOG.exception(e)  
复制代码






Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动








欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条