分享

OpenStack Ceilometer Collector代码解读

pig2 发表于 2014-11-24 17:34:04 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 0 17998

问题导读

1.Collector作用是什么?
2.PubSubHubbub包含哪些流程?










Collector功能
Collector顾名思义是负责数据收集的,它负责搜集来自OpenStack其他组件(如Nova,Glance,Cinder等)的Notification信息,以及从Compute Agent和Central Agent发送来的数据,然后将这些数据存储在数据库中。
PubSubHubbub
PubSubHubbub是Google推出的一个基于Web-hook方式的解决方案,它其实是RSS的改进。它具体要解决的是RSS效率低和压力大的问题,有一个Go real time with pubsubhubbub and feeds讲的挺清楚
Tim的这篇博客也讲了它的机制,其中有这个图:
pubsubhubbub.png
一个PubSubHubbub的大致流程如下:
  • Sub找Pub订阅内容,Pub将Hub的地址发给Sub,告诉Sub:你以后找它要内容去
  • Sub将自己要订阅的地址发给Hub,并在Hub那里注册了一个Callback函数,以后有新内容麻烦给Callback就好啦
  • Hub可以主动,也可以被动的从Pub那里获得内容,然后再分发给在自己这里注册的Sub
图中可以看到,有这么几个关键部分,在Ceilometer中,它们对应如下:
  • Publisher 内容提供方,OpenStack的各组件和Agent模块的角色
  • Subscriber 内容订阅方,Collector的角色
  • Hub 中转,Collector也充当了这个角色
Collector代码原理
有些相思代码在之前的OpenStack Ceilometer Compute Agent源码解读讲过
这里只写和collector有关的
入口函数
Collector的核心功能在ceilometer.collector.service:CollectorService中,它是OpenStack的Service服务,启动以后从initialize_service_hook()开始运行
  1. def initialize_service_hook(self, service):
  2.     self.pipeline_manager = pipeline.setup_pipeline(
  3.         transformer.TransformerExtensionManager(
  4.             'ceilometer.transformer',
  5.         ),
  6.         publisher.PublisherExtensionManager(
  7.             'ceilometer.publisher',
  8.         ),
  9.     )
  10.     self.notification_manager = \
  11.         extension_manager.ActivatedExtensionManager(
  12.             namespace=self.COLLECTOR_NAMESPACE,
  13.             disabled_names=
  14.             cfg.CONF.collector.disabled_notification_listeners,
  15.         )
  16.     self.notification_manager.map(self._setup_subscription)
  17.     self.conn.create_worker(
  18.         cfg.CONF.publisher_meter.metering_topic,
  19.         rpc_dispatcher.RpcDispatcher([self]),
  20.         'ceilometer.collector.' + cfg.CONF.publisher_meter.metering_topic,
  21.     )
复制代码


这里只说重点的,self.notification_manager是导入所有可用的内容的处理对象,从setup.cfg中可以找到
  1. ceilometer.collector =
  2.     instance = ceilometer.compute.notifications:Instance
  3.     instance_flavor = ceilometer.compute.notifications:InstanceFlavor
  4.     instance_delete = ceilometer.compute.notifications:InstanceDelete
  5.     ...
复制代码


订阅内容
接着self.notification_manager.map(self._setup_subscription)要对这些对象进行配置,其实就相当于PubSubHubbub中的订阅了
  1. def _setup_subscription(self, ext, *args, **kwds):
  2.     handler = ext.obj
  3.     for exchange_topic in handler.get_exchange_topics(cfg.CONF):
  4.         for topic in exchange_topic.topics:
  5.             self.conn.join_consumer_pool(
  6.                 callback=self.process_notification,
  7.                 pool_name='ceilometer.notifications',
  8.                 topic=topic,
  9.                 exchange_name=exchange_topic.exchange,
  10.             )
复制代码


回调函数
这里_setup_subscription()讲每一个订阅对象都join_consumer_pool,即在AMQP中接收这些订阅相关topic的内容,然后指定了callback函数为self.process_notification
  1. def process_notification(self, notification):
  2.     self.notification_manager.map(self._process_notification_for_ext,
  3.                                   notification=notification,
  4.                                   )
  5. def _process_notification_for_ext(self, ext, notification):
  6.     handler = ext.obj
  7.     if notification['event_type'] in handler.get_event_types():
  8.         ctxt = context.get_admin_context()
  9.         with self.pipeline_manager.publisher(ctxt,
  10.                                              cfg.CONF.counter_source) as p:
  11.             p(list(handler.process_notification(notification)))
复制代码


callback在执行后会调用这些notification中的process_notification(),它的作用是对不同的消息进行不同处理,因为从Nova,Glance等组件发来的消息Collector不一定都读的懂
处理内容
处理好的消息还是会通过Pipeline发送到AMQP中,然后和Agent直接发来的消息类似,Collector接收并交给
  1. def record_metering_data(self, context, data):
  2.     for meter in data:
  3.         if meter.get('timestamp'):
  4.             ts = timeutils.parse_isotime(meter['timestamp'])
  5.             meter['timestamp'] = timeutils.normalize_time(ts)
  6.         self.storage_conn.record_metering_data(meter)
复制代码


来处理,其实相当于自己给自己通过AMQP发了一条信息,这也就能看出,其实Collector充当了Hub和Sub双重身份
总结
Collector相对来说不是很复杂,了解了PubSubHubbub后再看就相对简单了。

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条