经过polling和[url=Ceilometer之notification agent代码分析 http://www.aboutyun.com/forum.php?mod=viewthread&tid=17609 (出处: about云开发)]notification[/url] 的洗礼,咱们接着说collector,找到setup.cfg,寻找入口函数,CollectorService,看start方法[mw_shl_code=applescript,true]if cfg.CONF.collector.udp_address:
self.tg.add_thread(self.start_udp)
transport = messaging.get_transport(optional=True)
if transport:
self.rpc_server = messaging.get_rpc_server(
transport, cfg.CONF.publisher_rpc.metering_topic, self)
sample_target = oslo.messaging.Target(
topic=cfg.CONF.publisher_notifier.metering_topic)
self.sample_listener = messaging.get_notification_listener(
transport, [sample_target],
[SampleEndpoint(self.dispatcher_manager)],
allow_requeue=(cfg.CONF.collector.
requeue_sample_on_dispatcher_error))[/mw_shl_code]
开启udp,rpc server(endpoint是自己)和监听器listener,udp不说了,详细说下另外两个。
rpc server
着代码继续往下走,最终来到amqpdriver.py,最后发现开启了三个consumer
[mw_shl_code=applescript,true]conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic=target.topic,
callback=listener)
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
topic='%s.%s' % (target.topic,
target.server),
callback=listener)
conn.declare_fanout_consumer(target.topic, listener)[/mw_shl_code]
分别是消息队列metering,metering.host(这两个都连接到ceilometer这个exchange上),还有一个fanout exchange,通过rabbitmq的命令,可以看到如下bindings
[mw_shl_code=applescript,true]ceilometer exchange metering queue metering []
ceilometer exchange metering.localhost.localdomain queue metering.localhost.localdomain []
metering_fanout exchange metering_fanout_5ee49cbc3a9849b09c6efb9a279f5d68 queue metering []
metering_fanout exchange metering_fanout_9dea9674aa824939b4b9feef7b3e0a8c queue metering [][/mw_shl_code]
ps. 在polling中的publish为rpc的时候,默认消息会通过metering.sample消息队列发送到该rpc server
sample_listener
查看代码,最终来到amqpdriver.py,代码和上边类似
[mw_shl_code=applescript,true]for target, priority in targets_and_priorities:
conn.declare_topic_consumer(
exchange_name=self._get_exchange(target),
topic='%s.%s' % (target.topic, priority),
callback=listener, queue_name=pool)[/mw_shl_code]
只声明了一个consumer,开启了一个metering.sample的队列,连接到ceilometer这个exchange,通过rabbitmq的命令,看到如下信息
[mw_shl_code=applescript,true]ceilometer exchange metering.sample queue metering.sample []
[/mw_shl_code]
ps. 在polling中的publish为notifier的时候,消息会发送到该消息队列上处理
|