本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:44 编辑
问题导读
问题1:服务ceilometer-agent-central的初始化操作主要实现了哪些内容的操作?
问题2:服务ceilometer-agent-central的启动操作周期性地实现什么任务?
ceilometer-agent-central服务的初始化和启动
本篇博客将解析服务组件ceilometer-agent-central的初始化和启动操作,ceilometer-agent-central服务组件运行在控制节点上,它主要通过调用相关模块的REST API,通过访问相关模块的客户端,从而实现主动收集相关模块(Image,Volume,Objects,Network)的监控数据,需要定期Poll轮询收集信息。
来看方法/ceilometer/cli.py----def agent_central,这个方法即实现了ceilometer-agent-central服务的初始化和启动操作。
- def agent_central():
- """
- 加载并启动AgentManager服务;
- Central Agent运行在控制节点上,它主要收集其它服务(Image,Volume,Objects,Network)的信息,实现逻辑和Compute Agent类似,但是是通过调用这些服务的REST API去获取这些数据的。
- """
- service.prepare_service()
- os_service.launch(central_manager.AgentManager()).wait()
复制代码
1 服务ceilometer-agent-central的初始化操作
服务ceilometer-agent-central的初始化操作主要实现了以下内容的操作:
(1)根据指定参数获取命名空间ceilometer.poll.central,获取与ceilometer.poll.central相匹配的所有插件,并加载;ceilometer.poll.central所指定的插件描述了如何获取收集相关模块(Image,Volume,Objects,Network)的监控数据。
(2)获取管理员操作的上下文环境类的初始化对象;
(3)建立线程池,用于后续服务中若干操作的运行;
class AgentManager(agent.AgentManager)----def __init__
- class AgentManager(agent.AgentManager):
- def __init__(self):
- super(AgentManager, self).__init__('central')
复制代码
class AgentManager(os_service.Service)----def __init__
- class AgentManager(os_service.Service):
- def __init__(self, namespace, default_discovery=[]):
- super(AgentManager, self).__init__()
- self.default_discovery = default_discovery
-
- """
- 加载命名空间ceilometer.poll.central的所有插件:
- ceilometer.poll.central =
- ip.floating = ceilometer.network.floatingip:FloatingIPPollster
- image = ceilometer.image.glance:ImagePollster
- image.size = ceilometer.image.glance:ImageSizePollster
- storage.containers.objects = ceilometer.objectstore.swift:ContainersObjectsPollster
- storage.containers.objects.size = ceilometer.objectstore.swift:ContainersSizePollster
- storage.objects = ceilometer.objectstore.swift:ObjectsPollster
- storage.objects.size = ceilometer.objectstore.swift:ObjectsSizePollster
- storage.objects.containers = ceilometer.objectstore.swift:ObjectsContainersPollster
- energy = ceilometer.energy.kwapi:EnergyPollster
- power = ceilometer.energy.kwapi:PowerPollster
- switch.port = ceilometer.network.statistics.port:PortPollster
- switch.port.receive.packets = ceilometer.network.statistics.port:PortPollsterReceivePackets
- switch.port.transmit.packets = ceilometer.network.statistics.port:PortPollsterTransmitPackets
- switch.port.receive.bytes = ceilometer.network.statistics.port:PortPollsterReceiveBytes
- switch.port.transmit.bytes = ceilometer.network.statistics.port:PortPollsterTransmitBytes
- switch.port.receive.drops = ceilometer.network.statistics.port:PortPollsterReceiveDrops
- switch.port.transmit.drops = ceilometer.network.statistics.port:PortPollsterTransmitDrops
- switch.port.receive.errors = ceilometer.network.statistics.port:PortPollsterReceiveErrors
- switch.port.transmit.errors = ceilometer.network.statistics.port:PortPollsterTransmitErrors
- switch.port.receive.frame_error = ceilometer.network.statistics.port:PortPollsterReceiveFrameErrors
- switch.port.receive.overrun_error = ceilometer.network.statistics.port:PortPollsterReceiveOverrunErrors
- switch.port.receive.crc_error = ceilometer.network.statistics.port:PortPollsterReceiveCRCErrors
- switch.port.collision.count = ceilometer.network.statistics.port:PortPollsterCollisionCount
- switch.table = ceilometer.network.statistics.table:TablePollster
- switch.table.active.entries = ceilometer.network.statistics.table:TablePollsterActiveEntries
- switch.table.lookup.packets = ceilometer.network.statistics.table:TablePollsterLookupPackets
- switch.table.matched.packets = ceilometer.network.statistics.table:TablePollsterMatchedPackets
- switch = ceilometer.network.statistics.switch:SWPollster
- switch.flow = ceilometer.network.statistics.flow:FlowPollster
- switch.flow.bytes = ceilometer.network.statistics.flow:FlowPollsterBytes
- switch.flow.duration.nanoseconds = ceilometer.network.statistics.flow:FlowPollsterDurationNanoseconds
- switch.flow.duration.seconds = ceilometer.network.statistics.flow:FlowPollsterDurationSeconds
- switch.flow.packets = ceilometer.network.statistics.flow:FlowPollsterPackets
- hardware.cpu.load.1min = ceilometer.hardware.pollsters.cpu:CPULoad1MinPollster
- hardware.cpu.load.5min = ceilometer.hardware.pollsters.cpu:CPULoad5MinPollster
- hardware.cpu.load.15min = ceilometer.hardware.pollsters.cpu:CPULoad15MinPollster
- hardware.disk.size.total = ceilometer.hardware.pollsters.disk:DiskTotalPollster
- hardware.disk.size.used = ceilometer.hardware.pollsters.disk:DiskUsedPollster
- hardware.network.bandwidth.bytes = ceilometer.hardware.pollsters.net:BandwidthBytesPollster
- hardware.network.incoming.bytes = ceilometer.hardware.pollsters.net:IncomingBytesPollster
- hardware.network.outgoing.bytes = ceilometer.hardware.pollsters.net:OutgoingBytesPollster
- hardware.network.outgoing.errors = ceilometer.hardware.pollsters.net:OutgoingErrorsPollster
- hardware.memory.total = ceilometer.hardware.pollsters.memory:MemoryTotalPollster
- hardware.memory.used = ceilometer.hardware.pollsters.memory:MemoryUsedPollster
- """
- self.pollster_manager = self._extensions('poll', namespace)
-
- """
- 加载ceilometer.discover所有插件:
- """
- self.discovery_manager = self._extensions('discover')
- self.context = context.RequestContext('admin', 'admin', is_admin=True)
复制代码
class AgentManager(os_service.Service)----def _extensions
- def _extensions(category, agent_ns=None):
- """
- 根据指定参数获取命名空间namespace,
- 获取与namespace相匹配的所有插件,并加载;
- """
- namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns else 'ceilometer.%s' % category)
-
- """
- 获取与namespace相匹配的所有插件,并加载;
- """
- return extension.ExtensionManager(
- namespace=namespace,
- invoke_on_load=True,
- )
复制代码
class Service(object)----def __init__
- class Service(object):
- """Service object for binaries running on hosts."""
- def __init__(self, threads=1000):
- self.tg = threadgroup.ThreadGroup(threads)
-
- # signal that the service is done shutting itself down:
- self._done = event.Event()
复制代码
2 服务ceilometer-agent-central的启动操作
服务ceilometer-agent-central的启动操作周期性地实现以下任务:
(1)遍历任务(通道),获取每个任务指定获取的监控项的采样数据;
(2)针对每个监控项的采样数据,实现发布监控项采样数据样本到消息队列,其中实现采样数据发布的方式有三种,即RPC/UDP/FILE;
其中,RPC将会发布相关消息到消息队列,后续的collector组件服务将会监听相应的消息队列来获取这些数据信息;UDP将会建立socket建立一个信息通道,实现发送相关消息数据,而后续的collector组件服务将会通过这个信息通道接收相关的消息数据;FILE将会直接保存相关消息数据到指定的日志文件中。
class AgentManager(os_service.Service)----def start
- class AgentManager(os_service.Service):
- def start(self):
- self.pipeline_manager = pipeline.setup_pipeline()
- for interval,task in self.setup_polling_tasks().iteritems():
- self.tg.add_timer(interval,
- self.interval_task,
- task=task)
复制代码
class AgentManager(os_service.Service)----def interval_task
- def interval_task(task):
- task.poll_and_publish()
复制代码
class PollingTask(object)----def poll_and_publish
- def poll_and_publish(self):
- """
- 创建轮徇的任务;
- 任务以一定时间间隔周期性地进行;
-
- 遍历任务(通道),获取每个任务指定获取的监控项的采样数据;
- 针对每个监控项的采样数据,实现发布监控项采样数据样本到消息队列;
- """
- agent_resources = self.manager.discover()
- with self.publish_context as publisher:
- cache = {}
- """
- 遍历任务(通道);
- 获取每个任务指定获取的监控项的采样数据;
- """
- for pollster in self.pollsters:
- key = pollster.name
- LOG.info(_("Polling pollster %s"), key)
- source_resources = list(self.resources[key].resources)
- try:
- """
- get_samples:获取某一监控项的采样数据;
- 注:各个get_samples方法的具体实现,都是通过相关客户端访问相关服务实现获取相关的采样数据;
- class FloatingIPPollster(plugin.CentralPollster)----def get_samples(self, manager, cache)
- class ImagePollster(_Base)----def get_samples(self, manager, cache)
- class ImageSizePollster(_Base)----def get_samples(self, manager, cache)
- class PowerPollster(_Base)----def get_samples(self, manager, cache)
- class EnergyPollster(_Base)----def get_samples(self, manager, cache)
- class ObjectsSizePollster(_Base)----def get_samples(self, manager, cache)
- class ObjectsContainersPollster(_Base)----def get_samples(self, manager, cache)
- class ObjectsPollster(_Base)----def get_samples(self, manager, cache)
- class ObjectsPollster(_Base)----def get_samples(self, manager, cache)
- class ObjectsSizePollster(_Base)----def get_samples(self, manager, cache)
- class ObjectsContainersPollster(_Base)----def get_samples(self, manager, cache)
- class ContainersObjectsPollster(_Base)----def get_samples(self, manager, cache)
- class ContainersSizePollster(_Base)----def get_samples(self, manager, cache)
- class HardwarePollster(plugin.CentralPollster)def get_samples(self, manager, cache, resources=[])
- """
- samples = list(pollster.obj.get_samples(
- self.manager,
- cache,
- resources=source_resources or agent_resources,
- ))
-
- """
- 实现发布监控项采样数据样本;
- """
- publisher(samples)
- except Exception as err:
- LOG.warning(_(
- 'Continue after error from %(name)s: %(error)s')
- % ({'name': pollster.name, 'error': err}),
- exc_info=True)
复制代码
方法小结:
本方法周期性地执行以下操作:
(1)遍历任务(通道),获取每个任务指定获取的监控项的采样数据;
(2)针对每个监控项的采样数据,实现发布监控项采样数据样本到消息队列;
class PublishContext----def __enter__
在上述代码中,发布监控项采样样本数据是由这个方法实现的,这个方法主要实现了以下内容:
实现发布监控项采样数据样本,其中实现采样数据发布的方式有三种,即RPC/UDP/FILE;
(1).class FilePublisher(publisher.PublisherBase)----def publish_samples(self, context, samples);
实现发布采样数据到一个日志文件;
(2).class RPCPublisher(publisher.PublisherBase)----def publish_samples(self, context, samples);
实现通过RPC发布采样数据;
* 从若干采样数据信息samples中获取提取数据形成信息格式meters,为信息的发布或存储做准备;
* 将之前从采样数据中提取的信息meters包装成msg;
* 将匹配的topic,msg添加到本地队列local_queue中,topic默认为metering;
* 实现发布本地队列local_queue中的所有数据信息到队列metering中;
* 其中,消息msg中封装的'method'方法为'record_metering_data',即当消息被消费时,将会执行方法record_metering_data,实现存储到数据存储系统中(数据库);
(3).class UDPPublisher(publisher.PublisherBase)----def publish_samples(self, context, samples)
通过UDP发布采样数据;
- class PublishContext(object):
- def __enter__(self):
- """
- 实现发布监控项采样数据样本;
- publish_samples:
- 1.class FilePublisher(publisher.PublisherBase)----def publish_samples(self, context, samples);
- 实现发布采样数据到一个日志文件;
- 2.class RPCPublisher(publisher.PublisherBase)----def publish_samples(self, context, samples);
- 通过RPC发布采样数据;
- * 从若干采样数据信息samples中获取提取数据形成信息格式meters,为信息的发布或存储做准备;
- * 将之前从采样数据中提取的信息meters包装成msg;
- * 将匹配的topic,msg添加到本地队列local_queue中,topic默认为metering;
- * 实现发布本地队列local_queue中的所有数据信息到队列metering中;
- * 其中,消息msg中封装的'method'方法为'record_metering_data',即当消息被消费时,将会
- 执行方法record_metering_data,实现存储到数据存储系统中(数据库);
- 3.class UDPPublisher(publisher.PublisherBase)----def publish_samples(self, context, samples)
- 通过UDP发布采样数据;
- """
- def p(samples):
- for p in self.pipelines:
- p.publish_samples(self.context,
- samples)
- return p
复制代码
2.1 实现发布采样数据到一个日志文件
- class FilePublisher(publisher.PublisherBase):
- def publish_samples(self, context, samples):
- if self.publisher_logger:
- for sample in samples:
- self.publisher_logger.info(sample.as_dict())
复制代码
2.2 通过RPC发布采样数据(具体见代码注释)
- class RPCPublisher(publisher.PublisherBase):
- def publish_samples(self, context, samples):
- """
- 通过RPC发布信息;
- 1.从若干采样数据信息samples中获取提取数据形成信息格式meters,为信息的发布或存储做准备;
- 2.将之前从采样数据中提取的信息meters包装成msg;
- 3.将匹配的topic,msg添加到本地队列local_queue中,topic默认为metering;
- 4.实现发布本地队列local_queue中的所有数据信息到队列metering中;
- 5.其中,消息msg中封装的'method'方法为'record_metering_data',即当消息被消费时,将会
- 执行方法record_metering_data,实现存储到数据存储系统中(数据库);
- """
-
- # 从若干采样数据信息中获取提取数据形成信息格式,为信息的发布或存储做准备;
- meters = [
- # meter_message_from_counter:
- # 为一个监控采样数据做好准备被发布或存储;
- # 从一个采样数据信息中获取提取信息形成msg;
- utils.meter_message_from_counter(
- sample,
- cfg.CONF.publisher.metering_secret)
- for sample in samples
- ]
-
- # cfg.CONF.publisher_rpc.metering_topic:metering messages所使用的主题,默认为metering;
- topic = cfg.CONF.publisher_rpc.metering_topic
-
- # 将之前从采样数据中提取的信息meters包装成msg;
- msg = {
- 'method': self.target,
- 'version': '1.0',
- 'args': {'data': meters},
- }
-
- # 将匹配的topic,msg添加到本地队列local_queue中,topic默认为metering;
- self.local_queue.append((context, topic, msg))
-
- if self.per_meter_topic:
- for meter_name, meter_list in itertools.groupby(
- sorted(meters, key=operator.itemgetter('counter_name')),
- operator.itemgetter('counter_name')):
- msg = {
- 'method': self.target,
- 'version': '1.0',
- 'args': {'data': list(meter_list)},
- }
- topic_name = topic + '.' + meter_name
- LOG.audit(_('Publishing %(m)d samples on %(n)s') % (
- {'m': len(msg['args']['data']), 'n': topic_name}))
- self.local_queue.append((context, topic_name, msg))
-
- # 实现发布本地队列local_queue中的所有数据信息;
- self.flush()
复制代码
- def flush(self):
- """
- 实现发布本地队列中的所有数据信息;
- """
- # 获取本地队列的数据信息;
- queue = self.local_queue
- self.local_queue = []
-
- # 实现循环发布队列queue中的信息数据;
- self.local_queue = self._process_queue(queue, self.policy) + \self.local_queue
-
- if self.policy == 'queue':
- self._check_queue_length()
复制代码
- @staticmethod
- def _process_queue(queue, policy):
- """
- 实现循环发布队列queue中的信息数据;
- """
- while queue:
- # 取出第一位的topic、msg等数据;
- context, topic, msg = queue[0]
- try:
- # 实现远程发布信息,不返回任何值;
- rpc.cast(context, topic, msg)
- except (SystemExit, rpc.common.RPCException):
- samples = sum([len(m['args']['data']) for n, n, m in queue])
- if policy == 'queue':
- LOG.warn(_("Failed to publish %d samples, queue them"),samples)
- return queue
- elif policy == 'drop':
- LOG.warn(_("Failed to publish %d samples, dropping them"),samples)
- return []
- # default, occur only if rabbit_max_retries > 0
- raise
- else:
- # 从队列中删除发布后的信息;
- queue.pop(0)
- return []
复制代码
2.3 通过UDP发布采样数据(具体见代码注释)
- class UDPPublisher(publisher.PublisherBase):
- def publish_samples(self, context, samples):
- """
- 通过UDP协议发送meter信息到服务器端,实现监控信息的发布;
- """
- for sample in samples:
- """
- 为一个监控采样数据做好准备被发布或存储;
- 从一个采样数据信息中获取提取信息形成msg;
- """
- msg = utils.meter_message_from_counter(
- sample,
- cfg.CONF.publisher.metering_secret)
- host = self.host
- port = self.port
- LOG.debug(_("Publishing sample %(msg)s over UDP to "
- "%(host)s:%(port)d") % {'msg': msg, 'host': host,'port': port})
- """
- 通过UDP协议发送meter信息到服务器端,实现监控信息的发布;
- """
- try:
- self.socket.sendto(msgpack.dumps(msg),(self.host, self.port))
- except Exception as e:
- LOG.warn(_("Unable to send sample over UDP"))
- LOG.exception(e)
复制代码
Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动
|