Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:42 编辑问题导读
问题1:如何实现对报警器的分配操作?
问题2:拥有主控权的partition,会根据不同的情况实现哪些不同形式的报警器分配操作?
static/image/hrline/4.gif
在前面的帖子中,我们分析过在/ceilometer/alarm/service.py中实现了类PartitionedAlarmService,它的主要功能是实现了分布式报警系统服务的启动和其他操作,当时我们只是从整体上分析了分布式报警系统的启动和实现,而分布式报警系统各个操作的具体实现则是在/ceilometer/alarm/partition/coordination.py中的类PartitionCoordinator中,所以这篇帖子将会详细地分析分布式报警系统的具体实现,即分析类PartitionCoordinator中各个方法的源码实现。
1 def _distribute(self, alarms, rebalance)
def _distribute(self, alarms, rebalance):
"""
实现对报警器的分配操作;
如果需要对全部报警器执行重新分配操作,self.coordination_rpc.assign(对所有的报警器进行分配操作):
通过广播发送执行assign操作的通知给所有的Partition,在所有的Partition执行接收操作;在接收操作中,如果uuid和某Partition相应的uuid值相匹配,则该Partition实现赋值本alarms的ID值到其本地;
如果不需要对全部报警器执行重新分配操作,self.coordination_rpc.allocate(只对部分(新增)的报警器进行分配操作,所以某个Partition上可有多余1个的报警器):
通过广播发送执行allocate操作的通知给所有的Partition,在所有的Partition执行接收操作;在接收操作中,如果uuid和某Partition相应的uuid值相匹配,则该Partition实现添加本alarms的ID值到其本地;
"""
verb = 'assign' if rebalance else 'allocate'
"""
根据rebalance的值,来确定所要执行的分配报警器的方法;
如果需要对全部报警器重新分配操作,则调用self.coordination_rpc.assign;
如果不需要对全部报警器重新分配操作,则调用self.coordination_rpc.allocate;
self.coordination_rpc.assign:
通过广播发送执行assign操作的通知给所有的Partition,
在所有的Partition执行接收操作;
在接收操作中,如果uuid和某Partition相应的uuid值相匹配,
则该Partition实现赋值本alarms的ID值到其本地;
self.coordination_rpc.allocate:
通过广播发送执行allocate操作的通知给所有的Partition,
在所有的Partition执行接收操作;
在接收操作中,如果uuid和某Partition相应的uuid值相匹配,
则该Partition实现添加本alarms的ID值到其本地;
"""
method = (self.coordination_rpc.assign if rebalance
else self.coordination_rpc.allocate)
LOG.debug(_('triggering %s') % verb)
LOG.debug(_('known evaluators %s') % self.reports)
"""
计算每个evaluator上所要分配的报警器数目;
"""
per_evaluator = int(math.ceil(len(alarms) /
float(len(self.reports) + 1)))
LOG.debug(_('per evaluator allocation %s') % per_evaluator)
"""
获取所有的evaluator;
对所有的evaluator进行洗牌操作;
"""
evaluators = self.reports.keys()
random.shuffle(evaluators)
offset = 0
"""
遍历所有的evaluator;
"""
for evaluator in evaluators:
if self.oldest < self.this:
LOG.warn(_('%(this)s bailing on distribution cycle '
'as older partition detected: %(older)s') %
dict(this=self.this, older=self.oldest))
return False
"""
从所有报警器集合中获取一个报警器;
"""
allocation = alarms
"""
调用之前确定的分配方法,实现对报警器的分配操作;
self.coordination_rpc.assign:
通过广播发送执行assign操作的通知给所有的Partition,
在所有的Partition执行接收操作;
在接收操作中,如果uuid和某Partition相应的uuid值相匹配,
则该Partition实现赋值本alarms的ID值到其本地;
self.coordination_rpc.allocate:
通过广播发送执行allocate操作的通知给所有的Partition,
在所有的Partition执行接收操作;
在接收操作中,如果uuid和某Partition相应的uuid值相匹配,
则该Partition实现添加本alarms的ID值到其本地;
"""
if allocation:
LOG.debug(_('%(verb)s-ing %(alloc)s to %(eval)s') %
dict(verb=verb, alloc=allocation, eval=evaluator))
method(evaluator.uuid, allocation)
"""
为下一个报警器的获取做准备;
"""
offset += per_evaluator
LOG.debug(_('master taking %s for self') % alarms)
"""
对于本Partition所分配的报警器的实现;
"""
if rebalance:
self.assignment = alarms
else:
self.assignment.extend(alarms)
return True
方法小结:
实现对报警器的分配操作;
1.如果需要对全部报警器执行重新分配操作,self.coordination_rpc.assign(对所有的报警器进行分配操作):
通过广播发送执行assign操作的通知给所有的Partition,在所有的Partition执行接收操作;在接收操作中,如果uuid和某Partition相应的uuid值相匹配,则该Partition实现赋值本alarms的ID值到其本地;
2.如果不需要对全部报警器执行重新分配操作,self.coordination_rpc.allocate(只对部分(新增)的报警器进行分配操作,所以某个Partition上可有多余1个的报警器):
通过广播发送执行allocate操作的通知给所有的Partition,在所有的Partition执行接收操作;在接收操作中,如果uuid和某Partition相应的uuid值相匹配,则该Partition实现添加本alarms的ID值到其本地;
2 def _deletion_requires_rebalance(self, alarms)
def _deletion_requires_rebalance(self, alarms):
"""
通过获取最新已删除的报警器数据,来确定是否需要执行rebalance操作;
如果已删除的报警器数据多余当前报警器数据的五分之一,则返回True,说明需要执行rebalance操作;
"""
# 获取最新已经删除的报警器集合;
deleted_alarms = self.last_alarms - set(alarms)
LOG.debug(_('newly deleted alarms %s') % deleted_alarms)
# 存储最新的已删除的报警器数据;
self.deleted_alarms.update(deleted_alarms)
# 如果已删除的报警器数据多余当前报警器数据的五分之一,则返回True,说明需要执行rebalance操作;
if len(self.deleted_alarms) > len(alarms) / 5:
LOG.debug(_('alarm deletion activity requires rebalance'))
self.deleted_alarms = set()
return True
return False
方法小结:
通过获取最新已删除的报警器数据,来确定是否需要执行rebalance操作;
如果已删除的报警器数据多余当前报警器数据的五分之一,则返回True,说明需要执行rebalance操作;
3 def _is_master(self, interval)
def _is_master(self, interval):
"""
确定当前的partition是否是主控角色;
"""
now = timeutils.utcnow()
if timeutils.delta_seconds(self.start, now) < interval * 2:
LOG.debug(_('%s still warming up') % self.this)
return False
is_master = True
for partition, last_heard in self.reports.items():
delta = timeutils.delta_seconds(last_heard, now)
LOG.debug(_('last heard from %(report)s %(delta)s seconds ago') %
dict(report=partition, delta=delta))
if delta > interval * 2:
del self.reports
self._record_oldest(partition, stale=True)
LOG.debug(_('%(this)s detects stale evaluator: %(stale)s') %
dict(this=self.this, stale=partition))
self.presence_changed = True
elif partition < self.this:
is_master = False
LOG.info(_('%(this)s sees older potential master: %(older)s')
% dict(this=self.this, older=partition))
LOG.info(_('%(this)s is master?: %(is_master)s') %
dict(this=self.this, is_master=is_master))
return is_master
4 def _master_role(self, assuming, api_client)
def _master_role(self, assuming, api_client):
"""
作为拥有主控权的partition,根据不同的情况实现不同形式的报警器分配操作;
1.需要整个分布式报警系统的重平衡操作
如果assuming为True,说明此报警器为新进的分布式报警系统的主控节点(所以需
要整个分布式报警系统的重平衡操作);
如果sufficient_deletion为True,说明已删除的报警器数据多余当前报警器数据的
五分之一(变化过多,所以需要整个分布式报警系统的重平衡操作);
如果presence_changed为True,则说明需要执行整个分布式报警系统的重平衡操作;
2.对部分报警器(新建立报警器)实现重平衡操作;
3.对于其他情况,不需要进行报警器的分配操作;
"""
"""
通过客户端获取当前所有partition的报警器;
"""
alarms =
"""
获取新建立的报警器;
当前的报警器集合减去之前的报警器集合,得到新建立的报警器;
"""
created_alarms = list(set(alarms) - self.last_alarms)
LOG.debug(_('newly created alarms %s') % created_alarms)
"""
通过获取最新已删除的报警器数据,来确定是否需要执行rebalance操作;
如果已删除的报警器数据多余当前报警器数据的五分之一,则返回True,说明需要执
行rebalance操作;
"""
sufficient_deletion = self._deletion_requires_rebalance(alarms)
"""
如果assuming为True,说明此报警器为新进的分布式报警系统的主控节点(所以需要
整个分布式报警系统的重平衡操作);
如果sufficient_deletion为True,说明已删除的报警器数据多余当前报警器数据的五
分之一(变化过多,所以需要整个分布式报警系统的重平衡操作);
如果presence_changed为True,则说明需要执行整个分布式报警系统的重平衡操作;
"""
# _distribute:实现对报警器的分配操作;
if (assuming or sufficient_deletion or self.presence_changed):
still_ahead = self._distribute(alarms, rebalance=True)
"""
对于新建立报警器,实现对报警器分配操作,不需要进行所有报警器的重新分配操作;
"""
elif created_alarms:
still_ahead = self._distribute(list(created_alarms),
rebalance=False)
"""
对于其他情况,不需要进行报警器的分配操作;
"""
else:
still_ahead = self.this < self.oldest
"""
实现更新报警器的集合;
"""
self.last_alarms = set(alarms)
LOG.info(_('%(this)s not overtaken as master? %(still_ahead)s') %
({'this': self.this, 'still_ahead': still_ahead}))
return still_ahead
方法小结:
作为拥有主控权的partition,根据不同的情况实现不同形式的报警器分配操作;
1.需要整个分布式报警系统的重平衡操作
如果assuming为True,说明此报警器为新进的分布式报警系统的主控节点(所以需要整个分布式报警系统的重平衡操作);
如果sufficient_deletion为True,说明已删除的报警器数据多余当前报警器数据的五分之一(变化过多,所以需要整个分布式报警系统的重平衡操作);
如果presence_changed为True,则说明需要执行整个分布式报警系统的重平衡操作;
2.对部分报警器(新建立报警器)实现重平衡操作;
3.对于其他情况,不需要进行报警器的分配操作;
5 def check_mastership(self, eval_interval, api_client)
def check_mastership(self, eval_interval, api_client):
"""
实现定期检测主控权角色;
确定当前的partition是否是主控角色;
如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
"""
LOG.debug(_('%s checking mastership status') % self.this)
try:
assuming = not self.is_master
"""
_is_master:确定当前的partition是否是主控角色;
_master_role:作为拥有主控权的partition,根据不同的情况实现不同形式的报警器分配操作;
"""
self.is_master = (self._is_master(eval_interval) and
self._master_role(assuming, api_client))
self.presence_changed = False
except Exception:
LOG.exception(_('mastership check failed'))
方法小结:
实现定期检测主控权角色;
确定当前的partition是否是主控角色;
如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
6 def presence(self, uuid, priority)
def presence(self, uuid, priority):
"""
实现接收某一个Partition广播的其存在性信息;
"""
report = PartitionIdentity(uuid, priority)
if report != self.this:
if report not in self.reports:
self.presence_changed = True
self._record_oldest(report)
self.reports = timeutils.utcnow()
LOG.debug(_('%(this)s knows about %(reports)s') %
dict(this=self.this, reports=self.reports))
7 def report_presence(self)
def report_presence(self):
"""
通过方法fanout_cast实现广播当前partition的存在性
(包括uuid和优先级信息)通知给所有的Partition;
"""
LOG.debug(_('%s reporting presence') % self.this)
"""
广播当前partition的存在性(包括uuid和优先级信息)通知给所有的Partition;
"""
try:
self.coordination_rpc.presence(self.this.uuid, self.this.priority)
except Exception:
LOG.exception(_('presence reporting failed'))
8 def assigned_alarms(self, api_client)
def assigned_alarms(self, api_client):
"""
通过指定客户端获取分配给当前partition的报警器;
"""
if not self.assignment:
LOG.debug(_('%s has no assigned alarms to evaluate') % self.this)
return []
try:
LOG.debug(_('%(this)s alarms for evaluation: %(alarms)s') %
dict(this=self.this, alarms=self.assignment))
return [a for a in api_client.alarms.list(q=[{'field': 'enabled',
'value': True}])
if a.alarm_id in self.assignment]
except Exception:
LOG.exception(_('assignment retrieval failed'))
return []
好了,分布式报警系统的源码具体实现解析完成,下面对分布式报警系统做总体的小结:
对于PartitionedAlarmService,它通过rpc实现了一套多个evaluator进程之间的协作协议(PartitionCoordinator),使得可以通过水平扩展来不断增大alarm service的处理能力,这样不仅实现了一个简单的负载均衡,还实现了高可用。下面我们就重点来说一下PartitionCoordinator这个协议。
PartitionCoordinator允许启动多个ceilometer-alarm-evaluator进程,这多个进程之间的关系是互相协作的关系,他们中最早启动的进程会被选为master进程,master进程主要做的事情就是给其他进程分配alarm,每个进程都在周期性的执行三个任务:
1.通过rpc,向其它进程广播自己的状态,来告知其他进程,我是活着的,每个进程中都保存有其他进程的最后活跃时间;
2.争抢master,每个进程都会不断的更新自己所维护的其它进程的状态列表,根据这个状态列表,来判断是否应该由自己来当master,判断一个进程是否是master的条件只有一个,那就是看谁启动的早;
3.检查本进程负责的alarm,会去调用ceilometer的api,来获取该alarm的监控指标对应的监控数据,然后进行判断,发送报警等;
当一个进程被确定为master之后,如果它不挂掉,那么它的master是不会被抢走的,该进程就会一直在履行master的职责:
1.当有新的alarm被创建时,master会将这些新创建的alarm平均的分配给其它worker进程,如果不能平均分配的,剩下的零头就由master自己来负责;
2.当有新的evaluator进程添加进来,或者是现有的evaluator进程被kill掉,那么master就会重新洗牌一次,把所有的alarm再平均的分配给现有的evaluator进程;
3.当master挂掉咋办呢?那么就会由第二个最早启动的进程接替master的位置,然后重新洗牌
通过这个协议,就实现了一个简单的分布式alarm服务;
好了,到此为止,分布式报警系统的具体源码实现分析和总结工作到此为止~~~~谢谢~~~~
Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动
页:
[1]