Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动
本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:45 编辑问题导读问题1:服务ceilometer-collector的初始化操作实现了哪些操作?问题2:服务ceilometer-agent-notification的启动操作实现了什么任务?
static/image/hrline/4.gif
ceilometer-collector服务的初始化和启动
本篇帖子将解析服务组件ceilometer-collector的初始化和启动操作;当信息发布操作完成之后,ceilometer-collector组件服务将会分别获取相关的消息数据,并实现保存获取的消息数据到数据存储系统中;而数据存储系统方案目前也支持几种实现,即log/mongodb/mysql/postgresql/sqlite/hbase/db2等;
来看方法/ceilometer/cli.py----def collector_service,这个方法即实现了ceilometer-collector服务的初始化和启动操作。
def collector_service():
"""
从消息队列获取相关消息信息,并实现保存数据到数据系统;
通过监听对应的队列来获取发布到消息队列的采样数据信息,
并实现保存到存储系统中;
"""
service.prepare_service()
launcher = os_service.ProcessLauncher()
launcher.launch_service(
collector.CollectorService(cfg.CONF.host,
'ceilometer.collector'),
workers=service.get_workers('collector'))
launcher.wait()
1 服务ceilometer-collector的初始化操作
服务ceilometer-collector的初始化操作主要实现了以下内容的操作:
(1)若干参数的初始化,定义了所要监听序列的host和topic;
(2)建立线程池,用于后续服务中若干操作的运行;
class Service(service.Service)----def __init__
class Service(service.Service):
def __init__(self, host, topic, manager=None, serializer=None):
"""
CollectorService(cfg.CONF.host,'ceilometer.collector')
host:cfg.CONF.host
topic:'ceilometer.collector'
"""
super(Service, self).__init__()
self.host = host
self.topic = topic
self.serializer = serializer
if manager is None:
self.manager = self
else:
self.manager = manager
class Service(object)----def __init__
class Service(object):
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup(threads)
# signal that the service is done shutting itself down:
self._done = event.Event()
2 服务ceilometer-collector的启动操作
服务ceilometer-agent-notification通过监听对应的队列来获取发布到消息队列的采样数据信息,并实现保存到存储系统中;服务ceilometer-agent-notification的启动操作实现了以下任务:
提供两种方式(UDP,RPC)获取收集发布的信息,并保存到数据存储系统中;
1.针对UDP的消息发布方式,调用方法实现:
1.1.获取socket对象;
1.2.一直循环任务通过UDP协议实现接收消息数据data;
1.3.保存数据data到数据存储系统(不同的实现后端);
2.针对RPC的消息发布方式:
2.1.建立指定类型的消息消费者;
2.2.执行方法initialize_service_hook;
建立一个'topic'类型的消息消费者;
根据消费者类(TopicConsumer)和消息队列名称(ceilometer.collector.metering,即监听消息队列ceilometer.collector.metering)以及指定主题topic(metering)建立消息消费者,并加入消费者列表;
2.3.启动协程实现等待并消费处理队列中的消息;
2.4.加载命名空间'ceilometer.dispatcher'中的插件:
ceilometer.dispatcher =
database = ceilometer.dispatcher.database:DatabaseDispatcher
file = ceilometer.dispatcher.file:FileDispatcher
描述了收集发布的监控信息保存到数据系统的实现方式;
注:针对RPC消息发布方式,在消息msg中封装了方法record_metering_data,
所以在消息消费者处理消息的时候,也会调用这个方法,实现保存监控数据到数据存储系统中;
class CollectorService----def start
class CollectorService(service.DispatchedService, rpc_service.Service):
"""
CollectorService通过监听对应的队列来获取发布到消息队列的采样数据信息,并实现保存到存储系统中;
"""
def start(self):
"""
在监控采样信息发布之后,分两种方式(UDP,RPC)获取收集发布的信息,
并保存到数据存储系统中;
1.针对UDP的消息发布方式,调用方法实现:
1.1.获取socket对象;
1.2.一直循环任务通过UDP协议实现接收消息数据data;
1.3.保存数据data到数据存储系统(不同的实现后端);
2.针对RPC的消息发布方式:
2.1.建立指定类型的消息消费者;
2.2.执行方法initialize_service_hook;
建立一个'topic'类型的消息消费者;
根据消费者类(TopicConsumer)和消息队列名称
(pool_name:ceilometer.collector.metering)
以及指定主题topic(metering)建立消息消费者,并加入消费者列表;
2.3.启动协程实现等待并消费处理队列中的消息;
2.4.加载命名空间'ceilometer.dispatcher'中的插件:
ceilometer.dispatcher =
database = ceilometer.dispatcher.database:DatabaseDispatcher
file = ceilometer.dispatcher.file:FileDispatcher
注:针对RPC消息发布方式,在消息msg中封装了方法record_metering_data,
所以在消息消费者处理消息的时候,也会调用这个方法,实现保存监控数据到数据存储系统中;
"""
if cfg.CONF.collector.udp_address:
self.tg.add_thread(self.start_udp)
if cfg.CONF.rpc_backend:
super(CollectorService, self).start()
if not cfg.CONF.collector.udp_address:
self.tg.add_timer(604800, lambda: None)
class CollectorService----def start_udp
def start_udp(self):
"""
1.获取socket对象;
2.一直循环任务实现接收数据data;
3.保存数据data到数据存储系统(不同的实现后端);
"""
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
udp.bind((cfg.CONF.collector.udp_address,<span style="font-family:KaiTi_GB2312;"> </span>cfg.CONF.collector.udp_port))
self.udp_run = True
while self.udp_run:
"""
64 * units.Ki = 64k,这里为接收数据的缓冲区;
接收数据data;
"""
data, source = udp.recvfrom(64 * units.Ki)
try:
sample = msgpack.loads(data)
except Exception:
LOG.warn(_("UDP: Cannot decode data sent by %s"), str(source))
else:
try:
LOG.debug(_("UDP: Storing %s"), str(sample))
"""
保存数据data到数据存储系统(不同的实现后端);
保存数据data到数据存储系统;
class DatabaseDispatcher(dispatcher.Base)----def record_metering_data
记录数据到日志文件;
class FileDispatcher(dispatcher.Base)----def record_metering_data
"""
self.dispatcher_manager.map_method('record_metering_data',
sample)
except Exception:
LOG.exception(_("UDP: Unable to store meter"))
class Service(service.Service)----def start
class Service(service.Service):
def start(self):
"""
为RPC通信建立到信息总线的连接;
1.建立指定类型的消息消费者;
2.执行方法initialize_service_hook;
3.启动协程实现等待并消费处理队列中的消息;
"""
super(Service, self).start()
"""
为RPC通信建立到信息总线的连接;
建立一个新的连接,或者从连接池中获取一个;
"""
self.conn = rpc.create_connection(new=True)
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
"""
RpcDispatcher:RPC消息调度类;
"""
dispatcher = rpc_dispatcher.RpcDispatcher(,
self.serializer)
"""
create_consumer:建立指定类型的消息消费者(fanout or topic);
1.创建以服务的topic为路由键的消费者;
2.创建以服务的topic和本机名为路由键的消费者
(基于topic&host,可用来接收定向消息);
3.fanout直接投递消息,不进行匹配,速度最快
(fanout类型,可用于接收广播消息);
"""
self.conn.create_consumer(self.topic, dispatcher, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, dispatcher, fanout=False)
self.conn.create_consumer(self.topic, dispatcher, fanout=True)
"""
在消息消费进程启动前,必须先声明消费者;
建立一个'topic'类型的消息消费者;
根据消费者类(TopicConsumer)和消息队列名称
(pool_name:ceilometer.collector.metering)
以及指定主题topic(metering)建立消息消费者,并加入消费者列表;
"""
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
"""
启动消费者线程;
consume_in_thread用evelent.spawn创建一个协程一直运行;
等待消息,在有消费到来时会创建新的协程运行远程调用的函数;
启动协程实现等待并消费处理队列中的消息;
"""
self.conn.consume_in_thread()
class CollectorService----def initialize_service_hook
def initialize_service_hook(self, service):
'''''
在消息消费进程启动前,必须先声明消费者;
建立一个'topic'类型的消息消费者;
根据消费者类(TopicConsumer)和消息队列名称
(pool_name:ceilometer.collector.metering)
以及指定主题topic(metering)建立消息消费者,并加入消费者列表;
'''
"""
建立一个'topic'类型的消息消费者;
根据消费者类(TopicConsumer)和消息队列名称
(pool_name:ceilometer.collector.metering)
以及指定主题topic(metering)建立消息消费者,并加入消费者列表;
"""
self.conn.create_worker(
# metering
cfg.CONF.publisher_rpc.metering_topic,
# 获取类RpcDispatcher的初始化对象;
rpc_dispatcher.RpcDispatcher(),
# ceilometer.collector.metering
'ceilometer.collector.' + cfg.CONF.publisher_rpc.metering_topic,
)
针对收集的发布的监控信息数据,系统提供两类方式用以实现监控信息数据保存到数据存储系统的操作。
方式一:用于实现保存数据到日志文件
class FileDispatcher(dispatcher.Base):
'''''
用于实现保存数据到日志文件的实现类;
'''
def record_metering_data(self, data):
"""
记录数据到日志文件;
"""
if self.log:
self.log.info(data)
方式二:用于实现保存数据到数据存储系统(不同的实现后端)
class DatabaseDispatcher(dispatcher.Base):
'''''
用于实现保存数据到数据存储系统(不同的实现后端)的实现类;
'''
def record_metering_data(self, data):
"""
保存数据data到数据存储系统(不同的实现后端);
"""
if not isinstance(data, list):
data =
for meter in data:
LOG.debug(_(
'metering data %(counter_name)s '
'for %(resource_id)s @ %(timestamp)s: %(counter_volume)s')
% ({'counter_name': meter['counter_name'],
'resource_id': meter['resource_id'],
'timestamp': meter.get('timestamp', 'NO TIMESTAMP'),
'counter_volume': meter['counter_volume']}))
if publisher_utils.verify_signature(
meter,
self.conf.publisher.metering_secret):
try:
if meter.get('timestamp'):
ts = timeutils.parse_isotime(meter['timestamp'])
meter['timestamp'] = timeutils.normalize_time(ts)
"""
保存数据到数据存储系统;
"""
self.storage_conn.record_metering_data(meter)
except Exception as err:
LOG.exception(_('Failed to record metering data: %s'),
err)
else:
LOG.warning(_(
'message signature invalid, discarding message: %r'),
meter)
来看配置文件setup.cfg中与监控数据存储实现相关的配置信息:
数据存储的实现(存储到数据库和写入到指定日志文件);ceilometer.dispatcher=
database= ceilometer.dispatcher.database:DatabaseDispatcher
file= ceilometer.dispatcher.file:FileDispatcher
数据存储现在支持MongoDB,MySQL,Postgresql和HBase,现在H3又新增加了对DB2的支持,其中MongoDB是支持最好的;ceilometer.storage=
#日志记录数据;
log= ceilometer.storage.impl_log:LogStorage
#基于配置设置获取到MongoDB数据库的连接实例;
mongodb= ceilometer.storage.impl_mongodb:MongoDBStorage
#实现到SqlAlchemy的连接,以实现存储数据到数据库;
mysql= ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage
#实现到SqlAlchemy的连接,以实现存储数据到数据库;
postgresql= ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage
#实现到SqlAlchemy的连接,以实现存储数据到数据库;
sqlite= ceilometer.storage.impl_sqlalchemy:SQLAlchemyStorage
#实现到HBase数据库的连接,以实现存储数据到数据库;
hbase= ceilometer.storage.impl_hbase:HBaseStorage
#实现到DB2数据库的连接,以实现存储数据到数据库;
db2= ceilometer.storage.impl_db2:DB2Storage
Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动
页:
[1]