云计算计费:Ceilometer的alarm模块代码分析
本帖最后由 pig2 于 2014-11-25 00:33 编辑问题导读
1.StatisticsManager中的_list函数是做什么的?
2._sufficient函数式的作用是什么?
static/image/hrline/4.gif
(代码版本:havana2013.2.2,ceilometer的alarm模块还没有实现完全)
Alarm模块的目录结构:
ceilometerclient的目录结构:
alarm的属性:
分析alarm的evaluate实现:
对于有threshold的alarm,evaluate是通过ThresholdEvaluator(ceilometer/alarm/evaluator/threshold.py)中的evaluate函数实现的:
def evaluate(self, alarm):
query = self._bound_duration(
alarm,
alarm.rule['query']
)
statistics = self._sanitize(
alarm,
self._statistics(alarm, query)
)
if self._sufficient(alarm, statistics):
def _compare(stat):
op = COMPARATORS]
value = getattr(stat, alarm.rule['statistic'])
limit = alarm.rule['threshold']
LOG.debug(_('comparing value %(value)s against threshold'
' %(limit)s') %
{'value': value, 'limit': limit})
return op(value, limit)
self._transition(alarm,
statistics,
map(_compare, statistics))
关键代码段分析:
query = self._bound_duration(
alarm,
alarm.rule['query']
)
参照alarm的属性,alarm.rule[‘query’]是metadata.user_metadata.server_group == WebServerGroup;
1-1、查看_bound_duration函数的实现:
@classmethod
def _bound_duration(cls, alarm, constraints):
"""Bound the duration of the statistics query."""
now = timeutils.utcnow()
window = (alarm.rule['period'] *
(alarm.rule['evaluation_periods'] + cls.look_back))
start = now - datetime.timedelta(seconds=window)
LOG.debug(_('query stats from %(start)s to '
'%(now)s') % {'start': start, 'now': now})
after = dict(field='timestamp', op='ge', value=start.isoformat())
before = dict(field='timestamp', op='le', value=now.isoformat())
constraints.extend()
return constraints
该函数计算evaluate操作的时间段,结束时间是当前时间now,开始时间start是now – period*evaluation_periods(这两个参数在alarm的属性中) + lookback(这个参数是reporting/ingestion的延迟时间,可以忽略的细节),然后在query的内容中添加时间段为的条件,最后返回更改后的query。
2、
statistics = self._sanitize(
alarm,
self._statistics(alarm, query)
)
2-1、查看_statistics函数的实现:
def _statistics(self, alarm, query):
"""Retrieve statistics over the current window."""
LOG.debug(_('stats query %s') % query)
try:
return self._client.statistics.list(
meter_name=alarm.rule['meter_name'], q=query,
period=alarm.rule['period'])
except Exception:
LOG.exception(_('alarm stats retrieval failed'))
return []
经过(2-1-1至2-1-1-1-1-1)的分析,client_class是Client(ceilometerclient/v2/client.py),从Client的属性可以看出statistics是StatisticsManager(ceilometerclient/v2/statistics.py),它的list函数被调用,传入的参数是alarm的meter_name(在alarm的属性中可以看到),查询条件query,以及alarm的period。经过(2-1-2至2-1-2-2)的分析,最终的返回结果为根据query和period制定的查询条件查询meter_name指定的metrics的结果。
2-1-1、查看其中的_client,在Evaluator(ceilometer/alarm/evaluator/__init__.py)中:
@property
def _client(self):
"""Construct or reuse an authenticated API client."""
if not self.api_client:
auth_config = cfg.CONF.service_credentials
creds = dict(
os_auth_url=auth_config.os_auth_url,
os_region_name=auth_config.os_region_name,
os_tenant_name=auth_config.os_tenant_name,
os_password=auth_config.os_password,
os_username=auth_config.os_username,
cacert=auth_config.os_cacert,
endpoint_type=auth_config.os_endpoint_type,
)
self.api_client = ceiloclient.get_client(2, **creds)
return self.api_client
2-1-1-1、查看其中get_client(ceilometerclient/client.py):
def get_client(api_version, **kwargs):
"""Get an authtenticated client, based on the credentials
in the keyword args.
:param api_version: the API version to use ('1' or '2')
:param kwargs: keyword args containing credentials, either:
* os_auth_token: pre-existing token to re-use
* ceilometer_url: ceilometer API endpoint
or:
* os_username: name of user
* os_password: user's password
* os_auth_url: endpoint to authenticate against
* os_cacert: path of CA TLS certificate
* insecure: allow insecure SSL (no cert verification)
* os_tenant_{name|id}: name or ID of tenant
"""
if kwargs.get('os_auth_token') and kwargs.get('ceilometer_url'):
token = kwargs.get('os_auth_token')
endpoint = kwargs.get('ceilometer_url')
elif (kwargs.get('os_username') and
kwargs.get('os_password') and
kwargs.get('os_auth_url') and
(kwargs.get('os_tenant_id') or kwargs.get('os_tenant_name'))):
ks_kwargs = {
'username': kwargs.get('os_username'),
'password': kwargs.get('os_password'),
'tenant_id': kwargs.get('os_tenant_id'),
'tenant_name': kwargs.get('os_tenant_name'),
'auth_url': kwargs.get('os_auth_url'),
'region_name': kwargs.get('os_region_name'),
'service_type': kwargs.get('os_service_type'),
'endpoint_type': kwargs.get('os_endpoint_type'),
'cacert': kwargs.get('os_cacert'),
'insecure': kwargs.get('insecure'),
}
_ksclient = _get_ksclient(**ks_kwargs)
token = ((lambda: kwargs.get('os_auth_token'))
if kwargs.get('os_auth_token')
else (lambda: _ksclient.auth_token))
endpoint = kwargs.get('ceilometer_url') or \
_get_endpoint(_ksclient, **ks_kwargs)
cli_kwargs = {
'token': token,
'insecure': kwargs.get('insecure'),
'timeout': kwargs.get('timeout'),
'cacert': kwargs.get('cacert'),
'cert_file': kwargs.get('cert_file'),
'key_file': kwargs.get('key_file'),
}
return Client(api_version, endpoint, **cli_kwargs)
2-1-1-1-1、上面的get_client最后返回的是Client,继续看Client(ceilometerclient/client.py):
def Client(version, *args, **kwargs):
module = utils.import_versioned_module(version, 'client')
client_class = getattr(module, 'Client')
return client_class(*args, **kwargs)
经过(2-1-1-1-1-1)的分析,client_class是Client(ceilometerclient/v2/client.py).
2-1-1-1-1-1、查看utils.import_versioned_module(ceilometerclient/common/utils.py):
def import_versioned_module(version, submodule=None):
module = 'ceilometerclient.v%s' % version
if submodule:
module = '.'.join((module, submodule))
return importutils.import_module(module)
它导入了ceilometerclient.v2.client:
class Client(http.HTTPClient):
"""Client for the Ceilometer v2 API.
:param string endpoint: A user-supplied endpoint URL for the ceilometer
service.
:param function token: Provides token for authentication.
:param integer timeout: Allows customization of the timeout for client
http requests. (optional)
"""
def __init__(self, *args, **kwargs):
"""Initialize a new client for the Ceilometer v1 API."""
super(Client, self).__init__(*args, **kwargs)
self.meters = meters.MeterManager(self)
self.samples = samples.SampleManager(self)
self.statistics = statistics.StatisticsManager(self)
self.resources = resources.ResourceManager(self)
self.alarms = alarms.AlarmManager(self)
2-1-2、StatisticsManager(ceilometerclient/v2/statistics.py):
class StatisticsManager(base.Manager):
resource_class = Statistics
def list(self, meter_name, q=None, period=None):
p = ['period=%s' % period] if period else None
return self._list(options.build_url(
'/v2/meters/' + meter_name + '/statistics',
q, p))
2-1-2-1、其中build_url(ceilometerclient/v2/options.py)是根据meter_name,query,period构造查询url,构造方式为:
def build_url(path, q, params=None):
'''''This converts from a list of dicts and a list of params to
what the rest api needs, so from:
"[{field=this,op=le,value=34},{field=that,op=eq,value=foo}],
['foo=bar','sna=fu']"
to:
"?q.field=this&q.op=le&q.value=34&
q.field=that&q.op=eq&q.value=foo&
foo=bar&sna=fu"
'''
if q:
query_params = {'q.field': [],
'q.value': [],
'q.op': []}
for query in q:
for name in ['field', 'op', 'value']:
query_params['q.%s' % name].append(query.get(name, ''))
# Transform the dict to a sequence of two-element tuples in fixed
# order, then the encoded string will be consistent in Python 2&3.
new_qparams = sorted(query_params.items(), key=lambda x: x)
path += "?" + urlutils.urlencode(new_qparams, doseq=True)
if params:
for p in params:
path += '&%s' % p
elif params:
path += '?%s' % params
for p in params:
path += '&%s' % p
return path
2-1-2-2、StatisticsManager中的_list函数实际上是base.Manager(ceilometerclient/common/base.py)中的_list函数,它GET传入的查询url后,将结果做处理,然后返回:
class Manager(object):
"""Managers interact with a particular type of API
(samples, meters, alarms, etc.) and provide CRUD operations for them.
"""
resource_class = None
def __init__(self, api):
self.api = api
def _create(self, url, body):
resp, body = self.api.json_request('POST', url, body=body)
if body:
return self.resource_class(self, body)
def _list(self, url, response_key=None, obj_class=None, body=None,
expect_single=False):
resp, body = self.api.json_request('GET', url)
if obj_class is None:
obj_class = self.resource_class
if response_key:
try:
data = body
except KeyError:
return []
else:
data = body
if expect_single:
data =
return
3、evaluate函数的最后一段:
if self._sufficient(alarm, statistics):
def _compare(stat):
op = COMPARATORS]
value = getattr(stat, alarm.rule['statistic'])
limit = alarm.rule['threshold']
LOG.debug(_('comparing value %(value)s against threshold'
' %(limit)s') %
{'value': value, 'limit': limit})
return op(value, limit)
self._transition(alarm,
statistics,
map(_compare, statistics))
_sufficient函数式用来判断返回的查询结果statistics是否够支持evaluate,其中的quorum为statistics的最小长度,默认为1:
def _sufficient(self, alarm, statistics):
"""Ensure there is sufficient data for evaluation,
transitioning to unknown otherwise.
"""
sufficient = len(statistics) >= self.quorum
if not sufficient and alarm.state != UNKNOWN:
reason = _('%d datapoints are unknown') % alarm.rule[
'evaluation_periods']
self._refresh(alarm, UNKNOWN, reason)
return sufficient
如果前面_sufficient对metrics的完整性判断通过,则后面会调用最重要的一部分_transition:
def _transition(self, alarm, statistics, compared):
"""Transition alarm state if necessary.
The transition rules are currently hardcoded as:
- transitioning from a known state requires an unequivocal
set of datapoints
- transitioning from unknown is on the basis of the most
recent datapoint if equivocal
Ultimately this will be policy-driven.
"""
#distilled为true,则alarm定义的所有statistics均满足
distilled = all(compared)
#unequivocal为true,则alarm定义的所有statistics均满足或均不满足
unequivocal = distilled or not any(compared)
#当前alarm的state是否是unknown
unknown = alarm.state == evaluator.UNKNOWN
#alarm的repeat_actions属性是否被设置
continuous = alarm.repeat_actions
#unequivocal为true的情况
if unequivocal:
#如果alarm定义的所有statistics均满足,则alarm需要被触发,状态转换为ALARM;如果alarm定义的所有statistics均不满足,则当前状态属于正常状态,设置为OK
state = evaluator.ALARM if distilled else evaluator.OK
reason, reason_data = self._reason(alarm, statistics,
distilled, state)
#如果当前状态与目标状态不同,则需要转换;另外,如果alarm的repeat_actions被设置,则状态相同也要进行转换
if alarm.state != state or continuous:
#进行状态转换
self._refresh(alarm, state, reason, reason_data)
#unequivocal为false,且当前状态为UNKNOWN,或者repeat_actions被设置的情况
elif unknown or continuous:
#如果当前状态为UNKNOWN,则根据趋势改变状态,即观察最后一个statistics,如果它为true,则说明当前有转换为ALARM状态的趋势,如果它为false,则说明当前有转换为OK状态的趋势
trending_state = evaluator.ALARM if compared[-1] else evaluator.OK
state = trending_state if unknown else alarm.state
reason, reason_data = self._reason(alarm, statistics,
distilled, state)
#进行状态转换
self._refresh(alarm, state, reason, reason_data)
最终对于alarm状态的更新在_refresh函数中:
def _refresh(self, alarm, state, reason, reason_data):
"""Refresh alarm state."""
try:
previous = alarm.state
if previous != state:
#将状态转换输出到log中
LOG.info(_('alarm %(id)s transitioning to %(state)s because '
'%(reason)s') % {'id': alarm.alarm_id,
'state': state,
'reason': reason})
self._client.alarms.set_state(alarm.alarm_id, state=state)
alarm.state = state
if self.notifier:
#调用Notifier的notify进行通知
self.notifier.notify(alarm, previous, reason, reason_data)
except Exception:
# retry will occur naturally on the next evaluation
# cycle (unless alarm state reverts in the meantime)
LOG.exception(_('alarm state update failed'))
注意其中最重要的一部分:
self.notifier.notify(alarm, previous, reason, reason_data)
Notifier会根据alarm状态做出相应通知。
页:
[1]