本帖最后由 坎蒂丝_Swan 于 2014-12-14 19:42 编辑
问题导读
问题1:如何实现对报警器的分配操作?
问题2:拥有主控权的partition,会根据不同的情况实现哪些不同形式的报警器分配操作?
在前面的帖子中,我们分析过在/ceilometer/alarm/service.py中实现了类PartitionedAlarmService,它的主要功能是实现了分布式报警系统服务的启动和其他操作,当时我们只是从整体上分析了分布式报警系统的启动和实现,而分布式报警系统各个操作的具体实现则是在/ceilometer/alarm/partition/coordination.py中的类PartitionCoordinator中,所以这篇帖子将会详细地分析分布式报警系统的具体实现,即分析类PartitionCoordinator中各个方法的源码实现。
1 def _distribute(self, alarms, rebalance)
- def _distribute(self, alarms, rebalance):
- """
- 实现对报警器的分配操作;
- 如果需要对全部报警器执行重新分配操作,self.coordination_rpc.assign(对所有的报警器进行分配操作):
- 通过广播发送执行assign操作的通知给所有的Partition,在所有的Partition执行接收操作;在接收操作中,如果uuid和某Partition相应的uuid值相匹配,则该Partition实现赋值本alarms的ID值到其本地;
- 如果不需要对全部报警器执行重新分配操作,self.coordination_rpc.allocate(只对部分(新增)的报警器进行分配操作,所以某个Partition上可有多余1个的报警器):
- 通过广播发送执行allocate操作的通知给所有的Partition,在所有的Partition执行接收操作;在接收操作中,如果uuid和某Partition相应的uuid值相匹配,则该Partition实现添加本alarms的ID值到其本地;
- """
- verb = 'assign' if rebalance else 'allocate'
-
- """
- 根据rebalance的值,来确定所要执行的分配报警器的方法;
- 如果需要对全部报警器重新分配操作,则调用self.coordination_rpc.assign;
- 如果不需要对全部报警器重新分配操作,则调用self.coordination_rpc.allocate;
-
- self.coordination_rpc.assign:
- 通过广播发送执行assign操作的通知给所有的Partition,
- 在所有的Partition执行接收操作;
- 在接收操作中,如果uuid和某Partition相应的uuid值相匹配,
- 则该Partition实现赋值本alarms的ID值到其本地;
- self.coordination_rpc.allocate:
- 通过广播发送执行allocate操作的通知给所有的Partition,
- 在所有的Partition执行接收操作;
- 在接收操作中,如果uuid和某Partition相应的uuid值相匹配,
- 则该Partition实现添加本alarms的ID值到其本地;
- """
-
- method = (self.coordination_rpc.assign if rebalance
- else self.coordination_rpc.allocate)
- LOG.debug(_('triggering %s') % verb)
- LOG.debug(_('known evaluators %s') % self.reports)
-
- """
- 计算每个evaluator上所要分配的报警器数目;
- """
- per_evaluator = int(math.ceil(len(alarms) /
- float(len(self.reports) + 1)))
- LOG.debug(_('per evaluator allocation %s') % per_evaluator)
-
- """
- 获取所有的evaluator;
- 对所有的evaluator进行洗牌操作;
- """
- evaluators = self.reports.keys()
- random.shuffle(evaluators)
- offset = 0
-
- """
- 遍历所有的evaluator;
- """
- for evaluator in evaluators:
- if self.oldest < self.this:
- LOG.warn(_('%(this)s bailing on distribution cycle '
- 'as older partition detected: %(older)s') %
- dict(this=self.this, older=self.oldest))
- return False
-
- """
- 从所有报警器集合中获取一个报警器;
- """
- allocation = alarms[offset:offset + per_evaluator]
-
- """
- 调用之前确定的分配方法,实现对报警器的分配操作;
- self.coordination_rpc.assign:
- 通过广播发送执行assign操作的通知给所有的Partition,
- 在所有的Partition执行接收操作;
- 在接收操作中,如果uuid和某Partition相应的uuid值相匹配,
- 则该Partition实现赋值本alarms的ID值到其本地;
- self.coordination_rpc.allocate:
- 通过广播发送执行allocate操作的通知给所有的Partition,
- 在所有的Partition执行接收操作;
- 在接收操作中,如果uuid和某Partition相应的uuid值相匹配,
- 则该Partition实现添加本alarms的ID值到其本地;
- """
- if allocation:
- LOG.debug(_('%(verb)s-ing %(alloc)s to %(eval)s') %
- dict(verb=verb, alloc=allocation, eval=evaluator))
- method(evaluator.uuid, allocation)
-
- """
- 为下一个报警器的获取做准备;
- """
- offset += per_evaluator
- LOG.debug(_('master taking %s for self') % alarms[offset:])
-
- """
- 对于本Partition所分配的报警器的实现;
- """
- if rebalance:
- self.assignment = alarms[offset:]
- else:
- self.assignment.extend(alarms[offset:])
- return True
复制代码
方法小结:
实现对报警器的分配操作;
1.如果需要对全部报警器执行重新分配操作,self.coordination_rpc.assign(对所有的报警器进行分配操作):
通过广播发送执行assign操作的通知给所有的Partition,在所有的Partition执行接收操作;在接收操作中,如果uuid和某Partition相应的uuid值相匹配,则该Partition实现赋值本alarms的ID值到其本地;
2.如果不需要对全部报警器执行重新分配操作,self.coordination_rpc.allocate(只对部分(新增)的报警器进行分配操作,所以某个Partition上可有多余1个的报警器):
通过广播发送执行allocate操作的通知给所有的Partition,在所有的Partition执行接收操作;在接收操作中,如果uuid和某Partition相应的uuid值相匹配,则该Partition实现添加本alarms的ID值到其本地;
2 def _deletion_requires_rebalance(self, alarms)
- def _deletion_requires_rebalance(self, alarms):
- """
- 通过获取最新已删除的报警器数据,来确定是否需要执行rebalance操作;
- 如果已删除的报警器数据多余当前报警器数据的五分之一,则返回True,说明需要执行rebalance操作;
- """
- # 获取最新已经删除的报警器集合;
- deleted_alarms = self.last_alarms - set(alarms)
- LOG.debug(_('newly deleted alarms %s') % deleted_alarms)
-
- # 存储最新的已删除的报警器数据;
- self.deleted_alarms.update(deleted_alarms)
-
- # 如果已删除的报警器数据多余当前报警器数据的五分之一,则返回True,说明需要执行rebalance操作;
- if len(self.deleted_alarms) > len(alarms) / 5:
- LOG.debug(_('alarm deletion activity requires rebalance'))
- self.deleted_alarms = set()
- return True
- return False
复制代码
方法小结:
通过获取最新已删除的报警器数据,来确定是否需要执行rebalance操作;
如果已删除的报警器数据多余当前报警器数据的五分之一,则返回True,说明需要执行rebalance操作;
3 def _is_master(self, interval)
- def _is_master(self, interval):
- """
- 确定当前的partition是否是主控角色;
- """
- now = timeutils.utcnow()
- if timeutils.delta_seconds(self.start, now) < interval * 2:
- LOG.debug(_('%s still warming up') % self.this)
- return False
-
- is_master = True
- for partition, last_heard in self.reports.items():
- delta = timeutils.delta_seconds(last_heard, now)
- LOG.debug(_('last heard from %(report)s %(delta)s seconds ago') %
- dict(report=partition, delta=delta))
- if delta > interval * 2:
- del self.reports[partition]
- self._record_oldest(partition, stale=True)
- LOG.debug(_('%(this)s detects stale evaluator: %(stale)s') %
- dict(this=self.this, stale=partition))
- self.presence_changed = True
- elif partition < self.this:
- is_master = False
- LOG.info(_('%(this)s sees older potential master: %(older)s')
- % dict(this=self.this, older=partition))
- LOG.info(_('%(this)s is master?: %(is_master)s') %
- dict(this=self.this, is_master=is_master))
- return is_master
复制代码
4 def _master_role(self, assuming, api_client)
- def _master_role(self, assuming, api_client):
- """
- 作为拥有主控权的partition,根据不同的情况实现不同形式的报警器分配操作;
- 1.需要整个分布式报警系统的重平衡操作
- 如果assuming为True,说明此报警器为新进的分布式报警系统的主控节点(所以需
- 要整个分布式报警系统的重平衡操作);
- 如果sufficient_deletion为True,说明已删除的报警器数据多余当前报警器数据的
- 五分之一(变化过多,所以需要整个分布式报警系统的重平衡操作);
- 如果presence_changed为True,则说明需要执行整个分布式报警系统的重平衡操作;
- 2.对部分报警器(新建立报警器)实现重平衡操作;
- 3.对于其他情况,不需要进行报警器的分配操作;
- """
-
- """
- 通过客户端获取当前所有partition的报警器;
- """
- alarms = [a.alarm_id for a in api_client.alarms.list()]
-
- """
- 获取新建立的报警器;
- 当前的报警器集合减去之前的报警器集合,得到新建立的报警器;
- """
- created_alarms = list(set(alarms) - self.last_alarms)
- LOG.debug(_('newly created alarms %s') % created_alarms)
-
- """
- 通过获取最新已删除的报警器数据,来确定是否需要执行rebalance操作;
- 如果已删除的报警器数据多余当前报警器数据的五分之一,则返回True,说明需要执
- 行rebalance操作;
- """
- sufficient_deletion = self._deletion_requires_rebalance(alarms)
-
- """
- 如果assuming为True,说明此报警器为新进的分布式报警系统的主控节点(所以需要
- 整个分布式报警系统的重平衡操作);
- 如果sufficient_deletion为True,说明已删除的报警器数据多余当前报警器数据的五
- 分之一(变化过多,所以需要整个分布式报警系统的重平衡操作);
- 如果presence_changed为True,则说明需要执行整个分布式报警系统的重平衡操作;
- """
- # _distribute:实现对报警器的分配操作;
- if (assuming or sufficient_deletion or self.presence_changed):
- still_ahead = self._distribute(alarms, rebalance=True)
-
- """
- 对于新建立报警器,实现对报警器分配操作,不需要进行所有报警器的重新分配操作;
- """
- elif created_alarms:
- still_ahead = self._distribute(list(created_alarms),
- rebalance=False)
-
- """
- 对于其他情况,不需要进行报警器的分配操作;
- """
- else:
- still_ahead = self.this < self.oldest
-
- """
- 实现更新报警器的集合;
- """
- self.last_alarms = set(alarms)
- LOG.info(_('%(this)s not overtaken as master? %(still_ahead)s') %
- ({'this': self.this, 'still_ahead': still_ahead}))
- return still_ahead
复制代码
方法小结:
作为拥有主控权的partition,根据不同的情况实现不同形式的报警器分配操作;
1.需要整个分布式报警系统的重平衡操作
如果assuming为True,说明此报警器为新进的分布式报警系统的主控节点(所以需要整个分布式报警系统的重平衡操作);
如果sufficient_deletion为True,说明已删除的报警器数据多余当前报警器数据的五分之一(变化过多,所以需要整个分布式报警系统的重平衡操作);
如果presence_changed为True,则说明需要执行整个分布式报警系统的重平衡操作;
2.对部分报警器(新建立报警器)实现重平衡操作;
3.对于其他情况,不需要进行报警器的分配操作;
5 def check_mastership(self, eval_interval, api_client)
- def check_mastership(self, eval_interval, api_client):
- """
- 实现定期检测主控权角色;
- 确定当前的partition是否是主控角色;
- 如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
- """
- LOG.debug(_('%s checking mastership status') % self.this)
- try:
- assuming = not self.is_master
-
- """
- _is_master:确定当前的partition是否是主控角色;
- _master_role:作为拥有主控权的partition,根据不同的情况实现不同形式的报警器分配操作;
- """
- self.is_master = (self._is_master(eval_interval) and
- self._master_role(assuming, api_client))
- self.presence_changed = False
- except Exception:
- LOG.exception(_('mastership check failed'))
复制代码
方法小结:
实现定期检测主控权角色;
确定当前的partition是否是主控角色;
如果为拥有主控权的partition,则根据不同的情况实现不同形式的报警器分配操作;
6 def presence(self, uuid, priority)
- def presence(self, uuid, priority):
- """
- 实现接收某一个Partition广播的其存在性信息;
- """
- report = PartitionIdentity(uuid, priority)
- if report != self.this:
- if report not in self.reports:
- self.presence_changed = True
- self._record_oldest(report)
- self.reports[report] = timeutils.utcnow()
- LOG.debug(_('%(this)s knows about %(reports)s') %
- dict(this=self.this, reports=self.reports))
复制代码
7 def report_presence(self)
- def report_presence(self):
- """
- 通过方法fanout_cast实现广播当前partition的存在性
- (包括uuid和优先级信息)通知给所有的Partition;
- """
- LOG.debug(_('%s reporting presence') % self.this)
-
- """
- 广播当前partition的存在性(包括uuid和优先级信息)通知给所有的Partition;
- """
- try:
- self.coordination_rpc.presence(self.this.uuid, self.this.priority)
- except Exception:
- LOG.exception(_('presence reporting failed'))
复制代码
8 def assigned_alarms(self, api_client)
- def assigned_alarms(self, api_client):
- """
- 通过指定客户端获取分配给当前partition的报警器;
- """
- if not self.assignment:
- LOG.debug(_('%s has no assigned alarms to evaluate') % self.this)
- return []
-
- try:
- LOG.debug(_('%(this)s alarms for evaluation: %(alarms)s') %
- dict(this=self.this, alarms=self.assignment))
- return [a for a in api_client.alarms.list(q=[{'field': 'enabled',
- 'value': True}])
- if a.alarm_id in self.assignment]
- except Exception:
- LOG.exception(_('assignment retrieval failed'))
- return []
复制代码
好了,分布式报警系统的源码具体实现解析完成,下面对分布式报警系统做总体的小结:
对于PartitionedAlarmService,它通过rpc实现了一套多个evaluator进程之间的协作协议(PartitionCoordinator),使得可以通过水平扩展来不断增大alarm service的处理能力,这样不仅实现了一个简单的负载均衡,还实现了高可用。下面我们就重点来说一下PartitionCoordinator这个协议。
PartitionCoordinator允许启动多个ceilometer-alarm-evaluator进程,这多个进程之间的关系是互相协作的关系,他们中最早启动的进程会被选为master进程,master进程主要做的事情就是给其他进程分配alarm,每个进程都在周期性的执行三个任务:
1.通过rpc,向其它进程广播自己的状态,来告知其他进程,我是活着的,每个进程中都保存有其他进程的最后活跃时间;
2.争抢master,每个进程都会不断的更新自己所维护的其它进程的状态列表,根据这个状态列表,来判断是否应该由自己来当master,判断一个进程是否是master的条件只有一个,那就是看谁启动的早;
3.检查本进程负责的alarm,会去调用ceilometer的api,来获取该alarm的监控指标对应的监控数据,然后进行判断,发送报警等;
当一个进程被确定为master之后,如果它不挂掉,那么它的master是不会被抢走的,该进程就会一直在履行master的职责:
1.当有新的alarm被创建时,master会将这些新创建的alarm平均的分配给其它worker进程,如果不能平均分配的,剩下的零头就由master自己来负责;
2.当有新的evaluator进程添加进来,或者是现有的evaluator进程被kill掉,那么master就会重新洗牌一次,把所有的alarm再平均的分配给现有的evaluator进程;
3.当master挂掉咋办呢?那么就会由第二个最早启动的进程接替master的位置,然后重新洗牌
通过这个协议,就实现了一个简单的分布式alarm服务;
好了,到此为止,分布式报警系统的具体源码实现分析和总结工作到此为止~~~~谢谢~~~~
Ceilometer项目源码分析----ceilometer项目源码结构分析
Ceilometer项目源码分析----ceilometer报警器服务的实现概览
Ceilometer项目源码分析----ceilometer报警器状态评估方式
Ceilometer项目源码分析----ceilometer分布式报警系统的具体实现
Ceilometer项目源码分析----ceilometer-alarm-notifier服务的初始化和启动
Ceilometer项目源码分析----ceilometer-alarm-evaluator服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-central服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-compute服务的初始化和启动
Ceilometer项目源码分析----ceilometer-agent-notification服务的初始化和启动
Ceilometer项目源码分析----ceilometer-collector服务的初始化和启动
|