坎蒂丝_Swan 发表于 2014-12-14 19:06:41

Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动

本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:45 编辑


问题导读问题1:服务ceilometer-agent-notification的初始化操作实现了哪些操作?问题2:服务ceilometer-agent-notification的启动操作实现了哪些任务?


static/image/hrline/4.gif




ceilometer-agent-notification服务的初始化和启动
    本篇帖子将解析服务组件ceilometer-agent-compute的初始化和启动操作。ceilometer-agent-notification服务组件实现访问oslo-messaging,openstack中各个模块都会推送通知(notification)信息到oslo-messaging消息框架,ceilometer-agent-notification通过访问这个消息队列服务框架,获取相关通知信息,并进一步转化为采样数据的格式。从消息队列服务框架获取通知信息,并进一步获取采样数据信息,可以理解为被动获取监控数据操作,需要一直监听oslo-messaging消息队列。

    来看方法/ceilometer/cli.py----def agent_notification,这个方法即实现了ceilometer-agent-notification服务的初始化和启动操作。
def agent_notification():
    service.prepare_service()
    launcher = os_service.ProcessLauncher()
    launcher.launch_service(
      notification.NotificationService(cfg.CONF.host,'ceilometer.agent.notification'),
      # workers默认值为1;
      workers=service.get_workers('notification'))
    launcher.wait()
1 服务ceilometer-agent-notification的初始化操作
服务ceilometer-agent-notification的初始化操作主要实现了以下内容的操作:
(1)若干参数的初始化,定义了所要监听序列的host和topic;
(2)建立线程池,用于后续服务中若干操作的运行;
class Service(service.Service)----def __init__
class Service(service.Service):
    def __init__(self, host, topic, manager=None, serializer=None):
      """
      NotificationService(cfg.CONF.host,'ceilometer.agent.notification')
      host:cfg.CONF.host
      topic:'ceilometer.agent.notification'
      """
      super(Service, self).__init__()
      self.host = host
      self.topic = topic
      self.serializer = serializer
      if manager is None:
            self.manager = self
      else:
            self.manager = manager
class Service(object)----def __init__
class Service(object):
    def __init__(self, threads=1000):
      self.tg = threadgroup.ThreadGroup(threads)
      self._done = event.Event()

2 服务ceilometer-agent-notification的启动操作
服务ceilometer-agent-notification的启动操作实现了以下任务:
(1)加载命名空间'ceilometer.dispatcher'中的插件;


(2)为RPC通信建立到信息总线的连接,建立指定类型的消息消费者;
(3)启动协程实现启动启动消费者线程,等待并消费处理队列'ceilometer.agent.notification'中的消息;

(4)连接到消息总线来获取通知信息;实际上就是实现监听oslo-messaging消息框架中compute/image/network/heat/cinder等服务的队列;
(5)从队列中获取通知信息,将通知转换程采样数据的格式,然后进行采样数据的发布操作;从通知获取采样数据信息,可以理解为被动获取数据操作;
class NotificationService----def start
class NotificationService(service.DispatchedService, rpc_service.Service):
    NOTIFICATION_NAMESPACE = 'ceilometer.notification'
    def start(self):
      super(NotificationService, self).start()
      # Add a dummy thread to have wait() working
      self.tg.add_timer(604800, lambda: None)

class DispatchedService----def start
加载命名空间'ceilometer.dispatcher'中的插件:
ceilometer.dispatcher =
    database = ceilometer.dispatcher.database:DatabaseDispatcher
    file = ceilometer.dispatcher.file:FileDispatcher

class DispatchedService(object):
    DISPATCHER_NAMESPACE = 'ceilometer.dispatcher'
    def start(self):
      """
      加载命名空间'ceilometer.dispatcher'中的插件:
      ceilometer.dispatcher =
      database = ceilometer.dispatcher.database:DatabaseDispatcher
      file = ceilometer.dispatcher.file:FileDispatcher
      """
      super(DispatchedService, self).start()
      LOG.debug(_('loading dispatchers from %s'),
                  self.DISPATCHER_NAMESPACE)
         
      self.dispatcher_manager = named.NamedExtensionManager(
            # self.DISPATCHER_NAMESPACE = ceilometer.dispatcher
            namespace=self.DISPATCHER_NAMESPACE,
            # cfg.CONF.dispatcher = ['database']
            names=cfg.CONF.dispatcher,
            invoke_on_load=True,
            invoke_args=)
      if not list(self.dispatcher_manager):
            LOG.warning(_('Failed to load any dispatchers for %s'),
                        self.DISPATCHER_NAMESPACE)
class Service(service.Service)----def start

这个方法主要完成了以下步骤的内容操作:

(1)为RPC通信建立到信息总线的连接,建立指定类型的消息消费者;
(2)启动协程实现启动启动消费者线程,等待并消费处理队列'ceilometer.agent.notification'中的消息;

(3)连接到消息总线来获取通知信息;实际上就是实现监听oslo-messaging消息框架中compute/image/network/heat/cinder等服务的队列;
(4)从队列中获取通知信息,将通知转换程采样数据的格式,然后进行采样数据的发布操作;从通知获取采样数据信息,可以理解为被动获取数据操作;
注:第(3)(4)步骤是通过执行方法initialize_service_hook实现的;

class Service(service.Service):
    def start(self):
      """
      为RPC通信建立到信息总线的连接;
      1.建立指定类型的消息消费者;      
      2.执行方法initialize_service_hook;
      3.启动协程实现等待并消费处理队列中的消息;
      """
      super(Service, self).start()

      """
      为RPC通信建立到信息总线的连接;
      建立一个新的连接,或者从连接池中获取一个;
      """
      self.conn = rpc.create_connection(new=True)
      LOG.debug(_("Creating Consumer connection for Service %s") %
                  self.topic)

      """
      RpcDispatcher:RPC消息调度类;
      """
      dispatcher = rpc_dispatcher.RpcDispatcher(,
                                                self.serializer)

      # Share this same connection for these Consumers
      """
      create_consumer:建立指定类型的消息消费者(fanout or topic);
      1.创建以服务的topic为路由键的消费者;
      2.创建以服务的topic和本机名为路由键的消费者
          (基于topic&host,可用来接收定向消息);
      3.fanout直接投递消息,不进行匹配,速度最快
          (fanout类型,可用于接收广播消息);
      """
      self.conn.create_consumer(self.topic, dispatcher, fanout=False)
      node_topic = '%s.%s' % (self.topic, self.host)
      self.conn.create_consumer(node_topic, dispatcher, fanout=False)
      self.conn.create_consumer(self.topic, dispatcher, fanout=True)

      # Hook to allow the manager to do other initializations after
      # the rpc connection is created.
      """
      在消息消费进程启动前,必须先声明消费者;
      建立一个'topic'类型的消息消费者;
      根据消费者类(TopicConsumer)和消息队列名称
      (pool_name:ceilometer.collector.metering)
      以及指定主题topic(metering)建立消息消费者,并加入消费者列表;
      """
      if callable(getattr(self.manager, 'initialize_service_hook', None)):
            self.manager.initialize_service_hook(self)

      """
      启动消费者线程;
      consume_in_thread用evelent.spawn创建一个协程一直运行;
      等待消息,在有消费到来时会创建新的协程运行远程调用的函数;
      启动协程实现等待并消费处理队列中的消息;
      """
      self.conn.consume_in_thread()

下面来重点分析方法class NotificationService----def initialize_service_hook,这个方法主要实现以下步骤的内容操作:
1.获取与命名空间ceilometer.event.trait_plugin相匹配的所有插件,并加载;
ceilometer.event.trait_plugin =
      split = ceilometer.event.trait_plugins:SplitterTraitPlugin
      bitfield = ceilometer.event.trait_plugins:BitfieldTraitPlugin
2.获取与命名空间ceilometer.notification相匹配的所有插件,并加载;
ceilometer.notification =
      instance = ceilometer.compute.notifications.instance:Instance
      ......
      stack_crud = ceilometer.orchestration.notifications:StackCRUD
这些插件描述了针对各个监控项,如何从对应的通知中获取相关监控信息并形成采样格式;
3.连接到消息总线来获取通知信息;
实际上就是实现监听oslo-messaging消息框架中compute/image/network/heat/cinder等服务的队列;
从队列中获取通知信息,将通知转换程采样数据的格式,然后进行采样数据的发布操作;

class NotificationService----def initialize_service_hook
class NotificationService(service.DispatchedService, rpc_service.Service):
    NOTIFICATION_NAMESPACE = 'ceilometer.notification'
    def initialize_service_hook(self, service):
      '''''
      主要实现的功能:
      1.加载命名空间'ceilometer.notification'的所有插件:
      2.遍历上述加载的所有插件,均执行方法:_setup_subscription
      注:_setup_subscription:连接到消息总线来获取通知信息;
      实际上就是实现监听oslo-messaging消息框架中compute/image/network/heat/cinder等服务的队列;
      从队列中获取通知信息,将通知转换程采样数据的格式,然后进行采样数据的发布操作;
      从通知获取采样数据信息,可以理解为被动获取数据操作;
      '''
      self.pipeline_manager = pipeline.setup_pipeline()

      LOG.debug(_('Loading event definitions'))
         
      """
      extension.ExtensionManager:
      获取与namespace(ceilometer.event.trait_plugin)相匹配的所有插件,并加载;
      ceilometer.event.trait_plugin =
      split = ceilometer.event.trait_plugins:SplitterTraitPlugin
      bitfield = ceilometer.event.trait_plugins:BitfieldTraitPlugin
      """
      self.event_converter = event_converter.setup_events(
            extension.ExtensionManager(
                namespace='ceilometer.event.trait_plugin'))

      """
      NOTIFICATION_NAMESPACE = 'ceilometer.notification'
      加载命名空间'ceilometer.notification'的插件:
      """
      self.notification_manager = \
            extension.ExtensionManager(
                namespace=self.NOTIFICATION_NAMESPACE,
                invoke_on_load=True,
            )

      if not list(self.notification_manager):
            LOG.warning(_('Failed to load any notification handlers for %s'),
                        self.NOTIFICATION_NAMESPACE)
         
      """
      连接到消息总线来获取通知信息;
      实际上就是实现监听oslo-messaging消息框架中compute/image/network/heat/cinder等服务的队列;
      从队列中获取通知信息,将通知转换程采样数据的格式,然后进行采样数据的发布操作;
      从通知获取采样数据信息,可以理解为被动获取数据操作;
         
      遍历上述加载的所有插件,均执行方法:
      def _setup_subscription(ext, *args, **kwds)
      其中ext即为遍历的上述加载的插件;
      """
      self.notification_manager.map(self._setup_subscription) 上述代码主要实现了三部分的内容,下面来进行细致的分析;


步骤1:
self.event_converter = event_converter.setup_events(
    extension.ExtensionManager(
      namespace='ceilometer.event.trait_plugin'))
这里主要实现获取与命名空间ceilometer.event.trait_plugin相匹配的所有插件,并加载;
ceilometer.event.trait_plugin =
      split = ceilometer.event.trait_plugins:SplitterTraitPlugin
      bitfield = ceilometer.event.trait_plugins:BitfieldTraitPlugin
步骤2:
self.notification_manager = \
    extension.ExtensionManager(
      namespace=self.NOTIFICATION_NAMESPACE,
      invoke_on_load=True,
    )
这里主要实现获取与命名空间ceilometer.notification相匹配的所有插件,并加载;
ceilometer.notification =
      instance = ceilometer.compute.notifications.instance:Instance
      instance_flavor = ceilometer.compute.notifications.instance:InstanceFlavor
      instance_delete = ceilometer.compute.notifications.instance:InstanceDelete
      instance_scheduled = ceilometer.compute.notifications.instance:InstanceScheduled
      memory = ceilometer.compute.notifications.instance:Memory
      vcpus = ceilometer.compute.notifications.instance:VCpus
      disk_root_size = ceilometer.compute.notifications.instance:RootDiskSize
      disk_ephemeral_size = ceilometer.compute.notifications.instance:EphemeralDiskSize
      cpu_frequency = ceilometer.compute.notifications.cpu:CpuFrequency
      cpu_user_time = ceilometer.compute.notifications.cpu:CpuUserTime
      cpu_kernel_time = ceilometer.compute.notifications.cpu:CpuKernelTime
      cpu_idle_time = ceilometer.compute.notifications.cpu:CpuIdleTime
      cpu_iowait_time = ceilometer.compute.notifications.cpu:CpuIowaitTime
      cpu_kernel_percent = ceilometer.compute.notifications.cpu:CpuKernelPercent
      cpu_idle_percent = ceilometer.compute.notifications.cpu:CpuIdlePercent
      cpu_user_percent = ceilometer.compute.notifications.cpu:CpuUserPercent
      cpu_iowait_percent = ceilometer.compute.notifications.cpu:CpuIowaitPercent
      cpu_percent = ceilometer.compute.notifications.cpu:CpuPercent
      volume = ceilometer.volume.notifications:Volume
      volume_size = ceilometer.volume.notifications:VolumeSize
      image_crud = ceilometer.image.notifications:ImageCRUD
      image = ceilometer.image.notifications:Image
      image_size = ceilometer.image.notifications:ImageSize
      image_download = ceilometer.image.notifications:ImageDownload
      image_serve = ceilometer.image.notifications:ImageServe
      network = ceilometer.network.notifications:Network
      subnet = ceilometer.network.notifications:Subnet
      port = ceilometer.network.notifications:Port
      router = ceilometer.network.notifications:Router
      floatingip = ceilometer.network.notifications:FloatingIP
      bandwidth = ceilometer.network.notifications:Bandwidth
      http.request = ceilometer.middleware:HTTPRequest
      http.response = ceilometer.middleware:HTTPResponse
      stack_crud = ceilometer.orchestration.notifications:StackCRUD这些插件描述了针对各个监控项,如何从对应的通知中获取相关监控信息并形成采样格式;
步骤3:
self.notification_manager.map(self._setup_subscription)


这里所实现的功能是:连接到消息总线来获取通知信息;实际上就是实现监听oslo-messaging消息框架中compute/image/network/heat/cinder等服务的队列;从队列中获取通知信息,将通知转换程采样数据的格式,然后进行采样数据的发布操作;
这条语句的执行操作是遍历命名空间ceilometer.notification的所有插件,均执行方法:
def _setup_subscription(ext, *args, **kwds)
方法_setup_subscription解析:
方法_setup_subscription所实现的功能:

针对上述加载的命名空间ceilometer.notification中的一个插件,执行以下操作:
1.调用方法get_exchange_topics获取插件的ExchangeTopics序列;
class ComputeNotificationBase----def get_exchange_topics;
class ImageBase----def get_exchange_topics;
class NetworkNotificationBase----def get_exchange_topics;
class StackCRUD----def get_exchange_topics;
class _Base(卷)----def get_exchange_topics;
ExchangeTopics序列描述了用于连接到所监听队列的交换器exchange和topics;
经过分析所获取的exchange和topics的值为:
topics = 'notifications.info'指定所要监听的消息队列;
exchange = nova/glance/neutron/heat/cinder来区分获取不同服务的通知信息;
2.遍历所监听的消息队列(暂时只有一个队列notifications.info),实现:
2.1.建立一个'topic'类型的消息消费者;
2.2.监听topic指定的消息队列(notifications.info),当进行消息消费操作的时候,将层层调用,最终实现调用方法self.process_notification,实现将接收到的通知转换成采样数据的格式,并进行发布;
(1)根据不同监控项调用不同插件中的process_notification方法,实现从通知中获取监控项的采样数据信息;
(2)实现发布监控项采样数据样本(File/RPC/UDP);

来看方法_setup_subscription的源码:
def _setup_subscription(self, ext, *args, **kwds):
    """      
    针对上述加载的命名空间ceilometer.notification中的一个插件,执行以下操作:
    1.调用方法get_exchange_topics获取插件的ExchangeTopics序列;
    class ComputeNotificationBase(plugin.NotificationBase)----def get_exchange_topics;
    class ImageBase(plugin.NotificationBase)----def get_exchange_topics;
    class NetworkNotificationBase(plugin.NotificationBase)----def get_exchange_topics;
    class StackCRUD(plugin.NotificationBase)----def get_exchange_topics;
    class _Base(plugin.NotificationBase)(卷)----def get_exchange_topics;
    ExchangeTopics序列描述了用于连接到所监听队列的交换器exchange和topics;
    经过分析所获取的exchange和topics的值为:
    topics = 'notifications.info'指定所要监听的消息队列;
    exchange = nova/glance/neutron/heat/cinder来区分获取不同服务的通知信息;
    2.遍历所监听的消息队列(暂时只有一个队列notifications.info),实现:
    2<span style="font-family:KaiTi_GB2312;">.</span>1.建立一个'topic'类型的消息消费者;
    2.2.监听topic指定的消息队列(notifications.info),当进行消息消费操作的时候,将层层调用,
    最终实现调用方法self.process_notification,实现将接收到的通知转换成采样数据的格式,并进行发布;
    (1).根据不同监控项和具体插件调用不同的process_notification方法,实现从通知中获取监控项的采样数据信息;
    (2).实现发布监控项采样数据样本(File/RPC/UDP);
    """
    handler = ext.obj
      
    """
    default = True
    """
    ack_on_error = cfg.CONF.notification.ack_on_event_error
    LOG.debug(_('Event types from %(name)s: %(type)s'
                ' (ack_on_error=%(error)s)') %
            {'name': ext.name,
               'type': ', '.join(handler.event_types),
               'error': ack_on_error})

    """
    调用方法get_exchange_topics获取插件的ExchangeTopics序列;
    class ComputeNotificationBase(plugin.NotificationBase)----def get_exchange_topics;
    class ImageBase(plugin.NotificationBase)----def get_exchange_topics;
    class NetworkNotificationBase(plugin.NotificationBase)----def get_exchange_topics;
    class StackCRUD(plugin.NotificationBase)----def get_exchange_topics;
    class _Base(plugin.NotificationBase)(卷)----def get_exchange_topics;
    ExchangeTopics序列描述了用于连接到所监听队列的交换器exchange和topics;
    经过分析所获取的exchange和topics的值为:
    topics = 'notifications.info'指定所要监听的消息队列;
    exchange = nova/glance/neutron/heat/cinder来区分获取不同服务的通知信息;
    """
    for exchange_topic in handler.get_exchange_topics(cfg.CONF):
      """
      遍历所监听的消息队列(暂时只有一个队列notifications.info);
      """
      for topic in exchange_topic.topics:
            try:         
                """
                实现封装方法callback,即self.process_notification;
                1.建立一个'topic'类型的消息消费者;
                2.监听topic指定的消息队列(notifications.info),当进行消息消费操作的时候,将层层调用,
                  最终实现调用方法callback_wrapper,即self.process_notification;
               
                callback=self.process_notification
                将接收到的通知转换成采样数据的格式,并进行发布;
                1.根据不同监控项和具体插件调用不同的process_notification方法,
                  实现从通知中获取监控项的采样数据信息;
                2.实现发布监控项采样数据样本(File/RPC/UDP);
                """
                self.conn.join_consumer_pool(
                  # process_notification:将接收到的通知转换成采样数据的格式,并进行发布;
                  callback=self.process_notification,
                  pool_name=topic,
                  topic=topic,
                  exchange_name=exchange_topic.exchange,
                  ack_on_error=ack_on_error)
            except Exception:
                LOG.exception(_('Could not join consumer pool'
                              ' %(topic)s/%(exchange)s') %
                              {'topic': topic,
                               'exchange': exchange_topic.exchange})
接着来看这里所调用的方法process_notification:
def process_notification(self, notification):
    """
    RPC endpoint for notification messages
    将接收到的通知转换成采样数据的格式,并进行发布;
    1.根据不同监控项和具体插件调用不同的process_notification方法,
      实现从通知中获取监控项的采样数据信息;
    2.实现发布监控项采样数据样本(File/RPC/UDP);

    When another service sends a notification over the message
    bus, this method receives it. See _setup_subscription().
    """
    LOG.debug(_('notification %r'), notification.get('event_type'))
      
    """
    _process_notification_for_ext:
    将接收到的通知转换成采样数据的格式,并进行发布;
    1.根据不同监控项和具体插件调用不同的process_notification方法,
      实现从通知中获取监控项的采样数据信息;
    2.实现发布监控项采样数据样本(File/RPC/UDP);
    """
    self.notification_manager.map(self._process_notification_for_ext,
                                  notification=notification)

    # cfg.CONF.notification.store_events:默认值为False;
    if cfg.CONF.notification.store_events:
      # 转换通知消息到Ceilometer Event;
      self._message_to_event(notification)
再来看方法_process_notification_for_ext:
def _process_notification_for_ext(self, ext, notification):
    """
    将接收到的通知转换成采样数据的格式,并进行发布;
    1.根据不同监控项和具体插件调用不同的process_notification方法,
      实现从通知中获取监控项的采样数据信息;
    2.实现发布监控项采样数据样本(File/RPC/UDP);
   
    to_samples:
    根据不同监控项和具体插件调用以下process_notification方法,实现从通知中获取监控项的采样数据信息;
    class ComputeMetricsNotificationBase----def process_notification(self, message)
    class UserMetadataAwareInstanceNotificationBase----def process_notification(self, message)
    class ImageCRUD(ImageCRUDBase)----def process_notification(self, message)
    class Image(ImageCRUDBase)----def process_notification(self, message)
    class ImageSize(ImageCRUDBase)----def process_notification(self, message)
    class ImageDownload(ImageBase)----def process_notification(self, message)
    class ImageServe(ImageBase)----def process_notification(self, message)
    class NetworkNotificationBase(plugin.NotificationBase)----def process_notification(self, message)
    class Bandwidth(NetworkNotificationBase)----def process_notification(self, message)
    class StackCRUD(plugin.NotificationBase)----def process_notification(self, message)
    class Volume(_Base)----def process_notification(self, message)
    class VolumeSize(_Base)----def process_notification(self, message)
    class HTTPRequest(plugin.NotificationBase)----def process_notification(self, message)
    class NotificationService(service.DispatchedService, rpc_service.Service)----def process_notification(self, notification)
   
    publisher:
    实现发布监控项采样数据样本;
    1.class FilePublisher(publisher.PublisherBase)----def publish_samples(self, context, samples);
    实现发布采样数据到一个日志文件;
    2.class RPCPublisher(publisher.PublisherBase)----def publish_samples(self, context, samples);
    通过RPC发布采样数据;
    * 从若干采样数据信息samples中获取提取数据形成信息格式meters,为信息的发布或存储做准备;
    * 将之前从采样数据中提取的信息meters包装成msg;
    * 将匹配的topic,msg添加到本地队列local_queue中,topic默认为metering;
    * 实现发布本地队列local_queue中的所有数据信息到队列metering中;
    * 其中,消息msg中封装的'method'方法为'record_metering_data',即当消息被消费时,将会
      执行方法record_metering_data,实现存储到数据存储系统中(数据库);
    3.class UDPPublisher(publisher.PublisherBase)----def publish_samples(self, context, samples)
    通过UDP发布采样数据;
   
    to_samples:
    根据不同监控项和具体插件调用以下process_notification方法,
    实现从通知中获取监控项的采样数据信息;
    """
    with self.pipeline_manager.publisher(context.get_admin_context()) as p:
      # FIXME(dhellmann): Spawn green thread?
      p(list(ext.obj.to_samples(notification)))
再来看方法to_samples:
def to_samples(self, notification):
      """
      根据不同监控项和具体插件调用以下process_notification方法,
      实现从通知中获取监控项的采样数据信息;
      class ComputeMetricsNotificationBase----def process_notification(self, message)
      class UserMetadataAwareInstanceNotificationBase----def process_notification(self, message)
      class ImageCRUD(ImageCRUDBase)----def process_notification(self, message)
      class Image(ImageCRUDBase)----def process_notification(self, message)
      class ImageSize(ImageCRUDBase)----def process_notification(self, message)
      class ImageDownload(ImageBase)----def process_notification(self, message)
      class ImageServe(ImageBase)----def process_notification(self, message)
      class NetworkNotificationBase(plugin.NotificationBase)----def process_notification(self, message)
      class Bandwidth(NetworkNotificationBase)----def process_notification(self, message)
      class StackCRUD(plugin.NotificationBase)----def process_notification(self, message)
      class Volume(_Base)----def process_notification(self, message)
      class VolumeSize(_Base)----def process_notification(self, message)
      class HTTPRequest(plugin.NotificationBase)----def process_notification(self, message)
      class NotificationService(service.DispatchedService, rpc_service.Service)----def process_notification(self, notification)
      """
      if self._handle_event_type(notification['event_type'],
                                 self.event_types):
            return self.process_notification(notification)
      return []

同样,这里实现监控信息发布操作可选取三种模式RPC/UDP/FILE:

模式1:实现发布采样数据到一个日志文件
class FilePublisher(publisher.PublisherBase):   
    def publish_samples(self, context, samples):   
      if self.publisher_logger:   
            for sample in samples:   
                self.publisher_logger.info(sample.as_dict())   
模式2:通过RPC发布采样数据(具体见代码注释)
class RPCPublisher(publisher.PublisherBase):   
    def publish_samples(self, context, samples):   
      """
      通过RPC发布信息;
      1.从若干采样数据信息samples中获取提取数据形成信息格式meters,为信息的发布或存储做准备;
      2.将之前从采样数据中提取的信息meters包装成msg;
      3.将匹配的topic,msg添加到本地队列local_queue中,topic默认为metering;
      4.实现发布本地队列local_queue中的所有数据信息到队列metering中;
      5.其中,消息msg中封装的'method'方法为'record_metering_data',即当消息被消费时,将会
          执行方法record_metering_data,实现存储到数据存储系统中(数据库);
      """   
      
      # 从若干采样数据信息中获取提取数据形成信息格式,为信息的发布或存储做准备;   
      meters = [   
            # meter_message_from_counter:   
            # 为一个监控采样数据做好准备被发布或存储;   
            # 从一个采样数据信息中获取提取信息形成msg;   
            utils.meter_message_from_counter(   
                sample,   
                cfg.CONF.publisher.metering_secret)   
            for sample in samples   
      ]   
      
      # cfg.CONF.publisher_rpc.metering_topic:metering messages所使用的主题,默认为metering;   
      topic = cfg.CONF.publisher_rpc.metering_topic   
               
      # 将之前从采样数据中提取的信息meters包装成msg;   
      msg = {   
            'method': self.target,   
            'version': '1.0',   
            'args': {'data': meters},   
      }   
               
      # 将匹配的topic,msg添加到本地队列local_queue中,topic默认为metering;   
      self.local_queue.append((context, topic, msg))   
      
      if self.per_meter_topic:   
            for meter_name, meter_list in itertools.groupby(   
                  sorted(meters, key=operator.itemgetter('counter_name')),   
                  operator.itemgetter('counter_name')):   
                msg = {   
                  'method': self.target,   
                  'version': '1.0',   
                  'args': {'data': list(meter_list)},   
                }   
                topic_name = topic + '.' + meter_name   
                LOG.audit(_('Publishing %(m)d samples on %(n)s') % (   
                        {'m': len(msg['args']['data']), 'n': topic_name}))   
                self.local_queue.append((context, topic_name, msg))   
      
      # 实现发布本地队列local_queue中的所有数据信息;   
      self.flush()def flush(self):   
    """
    实现发布本地队列中的所有数据信息;
    """            
    # 获取本地队列的数据信息;   
    queue = self.local_queue   
    self.local_queue = []   
               
    # 实现循环发布队列queue中的信息数据;   
    self.local_queue = self._process_queue(queue, self.policy) + \self.local_queue   
               
    if self.policy == 'queue':   
      self._check_queue_length() @staticmethod   
def _process_queue(queue, policy):   
    """
    实现循环发布队列queue中的信息数据;
    """   
    while queue:   
      # 取出第一位的topic、msg等数据;   
      context, topic, msg = queue   
      try:   
            # 实现远程发布信息,不返回任何值;   
            rpc.cast(context, topic, msg)   
      except (SystemExit, rpc.common.RPCException):   
            samples = sum(['data']) for n, n, m in queue])   
            if policy == 'queue':   
                LOG.warn(_("Failed to publish %d samples, queue them"),samples)   
                return queue   
            elif policy == 'drop':   
                LOG.warn(_("Failed to publish %d samples, dropping them"),samples)   
                return []   
            # default, occur only if rabbit_max_retries > 0   
            raise   
      else:   
            # 从队列中删除发布后的信息;   
            queue.pop(0)   
    return []
模式3:通过UDP发布采样数据(具体见代码注释)
class UDPPublisher(publisher.PublisherBase):   
    def publish_samples(self, context, samples):   
      """
      通过UDP协议发送meter信息到服务器端,实现监控信息的发布;
      """   
      for sample in samples:   
            """
            为一个监控采样数据做好准备被发布或存储;
            从一个采样数据信息中获取提取信息形成msg;
            """   
            msg = utils.meter_message_from_counter(   
                sample,   
                cfg.CONF.publisher.metering_secret)   
            host = self.host   
            port = self.port   
            LOG.debug(_("Publishing sample %(msg)s over UDP to "   
                        "%(host)s:%(port)d") % {'msg': msg, 'host': host,'port': port})   
            """
            通过UDP协议发送meter信息到服务器端,实现监控信息的发布;
            """   
            try:   
                self.socket.sendto(msgpack.dumps(msg),(self.host, self.port))   
            except Exception as e:   
                LOG.warn(_("Unable to send sample over UDP"))   
                LOG.exception(e)





Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动






页: [1]
查看完整版本: Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动