分享

OpenStack Ceilometer中的Pipeline机制

徐超 发表于 2015-1-14 20:15:36 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 26537
问题导读
1、如何定义Pipeline?
2、了解Ceilometer的数据流?
3、Pipeline机制在Ceilometer中的作用是什么?





Pipeline作用
Pipeline翻译过来是管道的意思,它在ceilometer中的作用类似一个过滤器一样,或者说是转换器。它是一般是一个方法链,这个方法链前面一部分是transformer,transformer实现数据转换等功能,它可以有多个。在链尾是publisher,它负责将数据发送到AMQP中去。


Pipeline定义
在Agent的构造函数中,第一个创建的属性就是pipeline_manager
  1. self.pipeline_manager = pipeline.setup_pipeline(
  2.     transformer.TransformerExtensionManager(
  3.         'ceilometer.transformer',
  4.     ),
  5.     publisher.PublisherExtensionManager(
  6.         'ceilometer.publisher',
  7.     ),
  8. )
复制代码


其中,transformer和publisher来自setup.cfg中
  1. ceilometer.transformer =
  2.     accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
  3. ceilometer.publisher =
  4.     meter_publisher = ceilometer.publisher.meter:MeterPublisher
  5.     meter = ceilometer.publisher.meter:MeterPublisher
  6.     udp = ceilometer.publisher.udp:UDPPublisher
复制代码



Pipeline设置
它调用了ceilometer.pipeline中的setup_pipline(),setup_pipeline()通过导入pipeline.yaml,获得pipeline的配置,默认配置如下
  1. name: meter_pipeline
  2. interval: 600
  3. counters:
  4.     - "*"
  5. transformers:
  6. publishers:
  7.     - meter
复制代码



最后它创建了一个PipelineManager给self.pipeline_manager
  1. PipelineManager(pipeline_cfg,transformer_manager,publisher_manager)
复制代码



PipelineManager做的事情如下:
  1. self.pipelines = [Pipeline(pipedef, publisher_manager,transformer_manager) for pipedef in cfg]
复制代码



它遍历cfg中对pipeline的定义(基本都是一个),然后生成一个Pipeline对象数组
  1. def __init__(self, cfg, publisher_manager, transformer_manager):
  2.     self.cfg = cfg
  3.     self.name = cfg['name']
  4.     self.interval = int(cfg['interval'])
  5.     self.counters = cfg['counters']
  6.     self.publishers = cfg['publishers']
  7.     self.transformer_cfg = cfg['transformers'] or []
  8.     self.publisher_manager = publisher_manager
  9.     self._check_counters()
  10.     self._check_publishers(cfg, publisher_manager)
  11.     self.transformers = self._setup_transformers(cfg, transformer_manager)
复制代码

Pipeline的构造函数如上,它的作用是处理transformer和publisher


Pipeline使用
pipeline的使用位置在agent.py中
  1. def setup_polling_tasks(self):
  2.     polling_tasks = {}
  3.     for pipeline, pollster in itertools.product(
  4.             self.pipeline_manager.pipelines,
  5.             self.pollster_manager.extensions):
  6.         for counter in pollster.obj.get_counter_names():
  7.             if pipeline.support_counter(counter):
  8.                 polling_task = polling_tasks.get(pipeline.interval, None)
  9.                 if not polling_task:
  10.                     polling_task = self.create_polling_task()
  11.                     polling_tasks[pipeline.interval] = polling_task
  12.                 polling_task.add(pollster, [pipeline])
  13.                 break
  14.     return polling_tasks
复制代码


首先通过product生成pipeline和pollster的笛卡尔积,即将每一个pollster都和pipeline配对(一般只有一个pipeline)。

pipeline.support_counter(counter)用来检查这个counter是否同意进入pipeline

另外,每一个polling_task都在构造函数中
  1. self.publish_context = pipeline.PublishContext(
  2.     agent_manager.context,
  3.     cfg.CONF.counter_source)
复制代码

声明了一个pipeline.PublishContext()

在执行task.poll_and_publish前,会先执行
  1. def add(self, pollster, pipelines):
  2.     self.publish_context.add_pipelines(pipelines)
  3.     self.pollsters.update([pollster])
复制代码

即增加一个pipeline管理

最后是publish_context的使用位置
  1. def poll_and_publish_instances(self, instances):
  2.     with self.publish_context as publisher:
  3.         for instance in instances:
  4.             if getattr(instance, 'OS-EXT-STS:vm_state', None) != 'error':
  5.                 for pollster in self.pollsters:
  6.                     publisher(list(pollster.obj.get_counters(
  7.                         self.manager,
  8.                         instance)))
复制代码


这里用了with as作为pipeline的管理

在__enter__()中,定义了一个函数
  1. def p(counters):
  2.     for p in self.pipelines:
  3.         p.publish_counters(self.context,
  4.                            counters,
  5.                            self.source)
复制代码


这个函数执行pipeline中的publish_counters,然后最终的执行代码来自
  1. ext.obj.publish_counters(ctxt, counters, source)
复制代码


即publisher的publish_counters,在这里是ceilometer.publisher.meter:publish_counters,它负责将数据发送到AMQP中去

总结
Pipeline机制一定程度上保证了数据的安全性,并且可以统一数据格式,了解它对于了解Ceilometer的数据流有一定帮助


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条