eason 发表于 2016-3-10 20:55:05

Ceilometer之collector代码分析

经过polling和http://www.aboutyun.com/forum.php?mod=viewthread&tid=17609 (出处: about云开发)]notification 的洗礼,咱们接着说collector,找到setup.cfg,寻找入口函数,CollectorService,看start方法if cfg.CONF.collector.udp_address:
            self.tg.add_thread(self.start_udp)

      transport = messaging.get_transport(optional=True)
      if transport:
            self.rpc_server = messaging.get_rpc_server(
                transport, cfg.CONF.publisher_rpc.metering_topic, self)

            sample_target = oslo.messaging.Target(
                topic=cfg.CONF.publisher_notifier.metering_topic)
            self.sample_listener = messaging.get_notification_listener(
                transport, ,
                ,
                allow_requeue=(cfg.CONF.collector.
                               requeue_sample_on_dispatcher_error))
开启udp,rpc server(endpoint是自己)和监听器listener,udp不说了,详细说下另外两个。
rpc server
着代码继续往下走,最终来到amqpdriver.py,最后发现开启了三个consumer
conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                    topic=target.topic,
                                    callback=listener)
      conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                    topic='%s.%s' % (target.topic,
                                                   target.server),
                                    callback=listener)
      conn.declare_fanout_consumer(target.topic, listener)
分别是消息队列metering,metering.host(这两个都连接到ceilometer这个exchange上),还有一个fanout exchange,通过rabbitmq的命令,可以看到如下bindings
ceilometer        exchange        metering        queue        metering        []
ceilometer        exchange        metering.localhost.localdomain        queue        metering.localhost.localdomain        []
metering_fanout        exchange        metering_fanout_5ee49cbc3a9849b09c6efb9a279f5d68        queue        metering        []
metering_fanout        exchange        metering_fanout_9dea9674aa824939b4b9feef7b3e0a8c        queue        metering        []

ps. 在polling中的publish为rpc的时候,默认消息会通过metering.sample消息队列发送到该rpc server

sample_listener
查看代码,最终来到amqpdriver.py,代码和上边类似
for target, priority in targets_and_priorities:
            conn.declare_topic_consumer(
                exchange_name=self._get_exchange(target),
                topic='%s.%s' % (target.topic, priority),
                callback=listener, queue_name=pool)
只声明了一个consumer,开启了一个metering.sample的队列,连接到ceilometer这个exchange,通过rabbitmq的命令,看到如下信息
ceilometer        exchange        metering.sample        queue        metering.sample        []


ps. 在polling中的publish为notifier的时候,消息会发送到该消息队列上处理
页: [1]
查看完整版本: Ceilometer之collector代码分析