desehawk 发表于 2014-11-24 21:52:32

云计算计费:Ceilometer的alarm模块代码分析

本帖最后由 pig2 于 2014-11-25 00:33 编辑


问题导读


1.StatisticsManager中的_list函数是做什么的?
2._sufficient函数式的作用是什么?



static/image/hrline/4.gif




(代码版本:havana2013.2.2,ceilometer的alarm模块还没有实现完全)

Alarm模块的目录结构:




ceilometerclient的目录结构:




alarm的属性:





分析alarm的evaluate实现:

对于有threshold的alarm,evaluate是通过ThresholdEvaluator(ceilometer/alarm/evaluator/threshold.py)中的evaluate函数实现的:

def evaluate(self, alarm):
    query = self._bound_duration(
      alarm,
      alarm.rule['query']
    )

    statistics = self._sanitize(
      alarm,
      self._statistics(alarm, query)
    )

    if self._sufficient(alarm, statistics):
      def _compare(stat):
            op = COMPARATORS]
            value = getattr(stat, alarm.rule['statistic'])
            limit = alarm.rule['threshold']
            LOG.debug(_('comparing value %(value)s against threshold'
                        ' %(limit)s') %
                      {'value': value, 'limit': limit})
            return op(value, limit)

      self._transition(alarm,
                         statistics,
                         map(_compare, statistics))


关键代码段分析:
query = self._bound_duration(
    alarm,
    alarm.rule['query']
)




参照alarm的属性,alarm.rule[‘query’]是metadata.user_metadata.server_group == WebServerGroup;

1-1、查看_bound_duration函数的实现:
@classmethod
def _bound_duration(cls, alarm, constraints):
    """Bound the duration of the statistics query."""
    now = timeutils.utcnow()
    window = (alarm.rule['period'] *
            (alarm.rule['evaluation_periods'] + cls.look_back))
    start = now - datetime.timedelta(seconds=window)
    LOG.debug(_('query stats from %(start)s to '
                '%(now)s') % {'start': start, 'now': now})
    after = dict(field='timestamp', op='ge', value=start.isoformat())
    before = dict(field='timestamp', op='le', value=now.isoformat())
    constraints.extend()
    return constraints




该函数计算evaluate操作的时间段,结束时间是当前时间now,开始时间start是now – period*evaluation_periods(这两个参数在alarm的属性中) + lookback(这个参数是reporting/ingestion的延迟时间,可以忽略的细节),然后在query的内容中添加时间段为的条件,最后返回更改后的query。

2、
statistics = self._sanitize(
    alarm,
    self._statistics(alarm, query)
)



2-1、查看_statistics函数的实现:
def _statistics(self, alarm, query):
    """Retrieve statistics over the current window."""
    LOG.debug(_('stats query %s') % query)
    try:
      return self._client.statistics.list(
            meter_name=alarm.rule['meter_name'], q=query,
            period=alarm.rule['period'])
    except Exception:
      LOG.exception(_('alarm stats retrieval failed'))
      return []




经过(2-1-1至2-1-1-1-1-1)的分析,client_class是Client(ceilometerclient/v2/client.py),从Client的属性可以看出statistics是StatisticsManager(ceilometerclient/v2/statistics.py),它的list函数被调用,传入的参数是alarm的meter_name(在alarm的属性中可以看到),查询条件query,以及alarm的period。经过(2-1-2至2-1-2-2)的分析,最终的返回结果为根据query和period制定的查询条件查询meter_name指定的metrics的结果。



2-1-1、查看其中的_client,在Evaluator(ceilometer/alarm/evaluator/__init__.py)中:
@property
def _client(self):
    """Construct or reuse an authenticated API client."""
    if not self.api_client:
      auth_config = cfg.CONF.service_credentials
      creds = dict(
            os_auth_url=auth_config.os_auth_url,
            os_region_name=auth_config.os_region_name,
            os_tenant_name=auth_config.os_tenant_name,
            os_password=auth_config.os_password,
            os_username=auth_config.os_username,
            cacert=auth_config.os_cacert,
            endpoint_type=auth_config.os_endpoint_type,
      )
      self.api_client = ceiloclient.get_client(2, **creds)
    return self.api_client







2-1-1-1、查看其中get_client(ceilometerclient/client.py):
def get_client(api_version, **kwargs):
    """Get an authtenticated client, based on the credentials
       in the keyword args.

    :param api_version: the API version to use ('1' or '2')
    :param kwargs: keyword args containing credentials, either:
            * os_auth_token: pre-existing token to re-use
            * ceilometer_url: ceilometer API endpoint
            or:
            * os_username: name of user
            * os_password: user's password
            * os_auth_url: endpoint to authenticate against
            * os_cacert: path of CA TLS certificate
            * insecure: allow insecure SSL (no cert verification)
            * os_tenant_{name|id}: name or ID of tenant
    """
    if kwargs.get('os_auth_token') and kwargs.get('ceilometer_url'):
      token = kwargs.get('os_auth_token')
      endpoint = kwargs.get('ceilometer_url')
    elif (kwargs.get('os_username') and
          kwargs.get('os_password') and
          kwargs.get('os_auth_url') and
          (kwargs.get('os_tenant_id') or kwargs.get('os_tenant_name'))):

      ks_kwargs = {
            'username': kwargs.get('os_username'),
            'password': kwargs.get('os_password'),
            'tenant_id': kwargs.get('os_tenant_id'),
            'tenant_name': kwargs.get('os_tenant_name'),
            'auth_url': kwargs.get('os_auth_url'),
            'region_name': kwargs.get('os_region_name'),
            'service_type': kwargs.get('os_service_type'),
            'endpoint_type': kwargs.get('os_endpoint_type'),
            'cacert': kwargs.get('os_cacert'),
            'insecure': kwargs.get('insecure'),
      }
      _ksclient = _get_ksclient(**ks_kwargs)
      token = ((lambda: kwargs.get('os_auth_token'))
               if kwargs.get('os_auth_token')
               else (lambda: _ksclient.auth_token))

      endpoint = kwargs.get('ceilometer_url') or \
            _get_endpoint(_ksclient, **ks_kwargs)

    cli_kwargs = {
      'token': token,
      'insecure': kwargs.get('insecure'),
      'timeout': kwargs.get('timeout'),
      'cacert': kwargs.get('cacert'),
      'cert_file': kwargs.get('cert_file'),
      'key_file': kwargs.get('key_file'),
    }

    return Client(api_version, endpoint, **cli_kwargs)







2-1-1-1-1、上面的get_client最后返回的是Client,继续看Client(ceilometerclient/client.py):
def Client(version, *args, **kwargs):
    module = utils.import_versioned_module(version, 'client')
    client_class = getattr(module, 'Client')
    return client_class(*args, **kwargs)




经过(2-1-1-1-1-1)的分析,client_class是Client(ceilometerclient/v2/client.py).

2-1-1-1-1-1、查看utils.import_versioned_module(ceilometerclient/common/utils.py):
def import_versioned_module(version, submodule=None):
    module = 'ceilometerclient.v%s' % version
    if submodule:
      module = '.'.join((module, submodule))
    return importutils.import_module(module)




它导入了ceilometerclient.v2.client:
class Client(http.HTTPClient):
    """Client for the Ceilometer v2 API.

    :param string endpoint: A user-supplied endpoint URL for the ceilometer
                            service.
    :param function token: Provides token for authentication.
    :param integer timeout: Allows customization of the timeout for client
                            http requests. (optional)
    """

    def __init__(self, *args, **kwargs):
      """Initialize a new client for the Ceilometer v1 API."""
      super(Client, self).__init__(*args, **kwargs)
      self.meters = meters.MeterManager(self)
      self.samples = samples.SampleManager(self)
      self.statistics = statistics.StatisticsManager(self)
      self.resources = resources.ResourceManager(self)
      self.alarms = alarms.AlarmManager(self)









2-1-2、StatisticsManager(ceilometerclient/v2/statistics.py):
class StatisticsManager(base.Manager):
    resource_class = Statistics

    def list(self, meter_name, q=None, period=None):
      p = ['period=%s' % period] if period else None
      return self._list(options.build_url(
            '/v2/meters/' + meter_name + '/statistics',
            q, p))






2-1-2-1、其中build_url(ceilometerclient/v2/options.py)是根据meter_name,query,period构造查询url,构造方式为:
def build_url(path, q, params=None):
    '''''This converts from a list of dicts and a list of params to
       what the rest api needs, so from:
    "[{field=this,op=le,value=34},{field=that,op=eq,value=foo}],
   ['foo=bar','sna=fu']"
    to:
    "?q.field=this&q.op=le&q.value=34&
      q.field=that&q.op=eq&q.value=foo&
      foo=bar&sna=fu"
    '''
    if q:
      query_params = {'q.field': [],
                        'q.value': [],
                        'q.op': []}

      for query in q:
            for name in ['field', 'op', 'value']:
                query_params['q.%s' % name].append(query.get(name, ''))

      # Transform the dict to a sequence of two-element tuples in fixed
      # order, then the encoded string will be consistent in Python 2&3.
      new_qparams = sorted(query_params.items(), key=lambda x: x)
      path += "?" + urlutils.urlencode(new_qparams, doseq=True)

      if params:
            for p in params:
                path += '&%s' % p
    elif params:
      path += '?%s' % params
      for p in params:
            path += '&%s' % p
    return path






2-1-2-2、StatisticsManager中的_list函数实际上是base.Manager(ceilometerclient/common/base.py)中的_list函数,它GET传入的查询url后,将结果做处理,然后返回:
class Manager(object):
    """Managers interact with a particular type of API
    (samples, meters, alarms, etc.) and provide CRUD operations for them.
    """
    resource_class = None

    def __init__(self, api):
      self.api = api

    def _create(self, url, body):
      resp, body = self.api.json_request('POST', url, body=body)
      if body:
            return self.resource_class(self, body)

    def _list(self, url, response_key=None, obj_class=None, body=None,
            expect_single=False):
      resp, body = self.api.json_request('GET', url)

      if obj_class is None:
            obj_class = self.resource_class

      if response_key:
            try:
                data = body
            except KeyError:
                return []
      else:
            data = body
      if expect_single:
            data =
      return






3、evaluate函数的最后一段:
if self._sufficient(alarm, statistics):
    def _compare(stat):
      op = COMPARATORS]
      value = getattr(stat, alarm.rule['statistic'])
      limit = alarm.rule['threshold']
      LOG.debug(_('comparing value %(value)s against threshold'
                  ' %(limit)s') %
                  {'value': value, 'limit': limit})
      return op(value, limit)

    self._transition(alarm,
                     statistics,
                     map(_compare, statistics))




_sufficient函数式用来判断返回的查询结果statistics是否够支持evaluate,其中的quorum为statistics的最小长度,默认为1:
def _sufficient(self, alarm, statistics):
    """Ensure there is sufficient data for evaluation,
       transitioning to unknown otherwise.
    """
    sufficient = len(statistics) >= self.quorum
    if not sufficient and alarm.state != UNKNOWN:
      reason = _('%d datapoints are unknown') % alarm.rule[
            'evaluation_periods']
      self._refresh(alarm, UNKNOWN, reason)
    return sufficient



如果前面_sufficient对metrics的完整性判断通过,则后面会调用最重要的一部分_transition:
def _transition(self, alarm, statistics, compared):
    """Transition alarm state if necessary.

       The transition rules are currently hardcoded as:

       - transitioning from a known state requires an unequivocal
         set of datapoints

       - transitioning from unknown is on the basis of the most
         recent datapoint if equivocal

       Ultimately this will be policy-driven.
    """
    #distilled为true,则alarm定义的所有statistics均满足
    distilled = all(compared)
    #unequivocal为true,则alarm定义的所有statistics均满足或均不满足
    unequivocal = distilled or not any(compared)
    #当前alarm的state是否是unknown
    unknown = alarm.state == evaluator.UNKNOWN
    #alarm的repeat_actions属性是否被设置
    continuous = alarm.repeat_actions
    #unequivocal为true的情况
    if unequivocal:
      #如果alarm定义的所有statistics均满足,则alarm需要被触发,状态转换为ALARM;如果alarm定义的所有statistics均不满足,则当前状态属于正常状态,设置为OK
      state = evaluator.ALARM if distilled else evaluator.OK
      reason, reason_data = self._reason(alarm, statistics,
                                           distilled, state)
      #如果当前状态与目标状态不同,则需要转换;另外,如果alarm的repeat_actions被设置,则状态相同也要进行转换
      if alarm.state != state or continuous:
            #进行状态转换
            self._refresh(alarm, state, reason, reason_data)
    #unequivocal为false,且当前状态为UNKNOWN,或者repeat_actions被设置的情况
    elif unknown or continuous:
      #如果当前状态为UNKNOWN,则根据趋势改变状态,即观察最后一个statistics,如果它为true,则说明当前有转换为ALARM状态的趋势,如果它为false,则说明当前有转换为OK状态的趋势
      trending_state = evaluator.ALARM if compared[-1] else evaluator.OK
      state = trending_state if unknown else alarm.state
      reason, reason_data = self._reason(alarm, statistics,
                                           distilled, state)
      #进行状态转换
      self._refresh(alarm, state, reason, reason_data)




最终对于alarm状态的更新在_refresh函数中:
def _refresh(self, alarm, state, reason, reason_data):
    """Refresh alarm state."""
    try:
      previous = alarm.state
      if previous != state:
            #将状态转换输出到log中
            LOG.info(_('alarm %(id)s transitioning to %(state)s because '
                     '%(reason)s') % {'id': alarm.alarm_id,
                                        'state': state,
                                        'reason': reason})

            self._client.alarms.set_state(alarm.alarm_id, state=state)
      alarm.state = state
      if self.notifier:
            #调用Notifier的notify进行通知
            self.notifier.notify(alarm, previous, reason, reason_data)
    except Exception:
      # retry will occur naturally on the next evaluation
      # cycle (unless alarm state reverts in the meantime)
      LOG.exception(_('alarm state update failed'))




注意其中最重要的一部分:
self.notifier.notify(alarm, previous, reason, reason_data)
Notifier会根据alarm状态做出相应通知。


页: [1]
查看完整版本: 云计算计费:Ceilometer的alarm模块代码分析