问题导读:
1.关于flow中task执行的重要语句的实现基本解析完成,如何实现?
2.如果卷的建立出现异常,则如何执行相关的逆转回滚操作?
在上一篇博客中,方法def run(self, context, *args, **kwargs)中关于flow中task执行的重要语句的实现基本解析完成,这篇博客中,我将重点解析在这个方法中,如果卷的建立出现异常,则如何执行相关的逆转回滚操作。
首先来看方法def run(self, context, *args, **kwargs)的源码实现:
def run(self, context, *args, **kwargs):
"""
工作流(workflow)的执行操作;
context = <cinder.context.RequestContext object at 0x382fd50> //从cinder请求中获取上下文环境信息;
args = ()
kwargs = {}
"""
super(Flow, self).run(context, *args, **kwargs)
def resume_it():
# self._leftoff_at = None
if self._leftoff_at is not None:
return ([], self._leftoff_at)
# self.resumer = None
# 注:这里还没有应用恢复策略,因为类的初始化过程中赋值为None;
if self.resumer:
# self._ordering():获取迭代器包装的任务运行列表;
(finished, leftover) = self.resumer.resume(self, self._ordering())
else:
finished = []
# self._ordering():获取迭代器包装的任务运行列表;
leftover = self._ordering()
# leftover:获取迭代器包装的任务运行列表;
# finished = []
# leftover = <listiterator object at 0x441fa50>
return (finished, leftover)
# 改变目前的flow状态为新的状态STARTED,并执行通知操作;
# flow状态标志为STARTED,表示任务开始运行操作;
self._change_state(context, states.STARTED)
try:
# leftover:获取迭代器包装的任务运行列表;
# those_finished = []
# leftover = <listiterator object at 0x40c1990>
those_finished, leftover = resume_it()
except Exception:
with excutils.save_and_reraise_exception():
self._change_state(context, states.FAILURE)
def run_it(runner, failed=False, result=None, simulate_run=False):
try:
# Add the task to be rolled back *immediately* so that even if
# the task fails while producing results it will be given a
# chance to rollback.
# RollbackTask:实现调用任务对应的可用的逆转回滚方法;
# runner.task = cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0
rb = utils.RollbackTask(context, runner.task, result=None)
# 在回滚方法累加器中添加逆转回滚方法任务;
self._accumulator.add(rb)
self.task_notifier.notify(states.STARTED, details={
'context': context,
'flow': self,
'runner': runner,
})
# simulate_run = False
if not simulate_run:
result = runner(context, *args, **kwargs)
else:
if failed:
if not result:
result = "%s failed running." % (runner.task)
if isinstance(result, basestring):
result = exc.InvalidStateException(result)
if not isinstance(result, Exception):
LOG.warn("Can not raise a non-exception"
" object: %s", result)
result = exc.InvalidStateException()
raise result
rb.result = result
runner.result = result
self.results[runner.uuid] = result
self.task_notifier.notify(states.SUCCESS, details={
'context': context,
'flow': self,
'runner': runner,
})
except Exception as e:
runner.result = e
cause = utils.FlowFailure(runner, self, e)
with excutils.save_and_reraise_exception():
self.task_notifier.notify(states.FAILURE, details={
'context': context,
'flow': self,
'runner': runner,
})
self.rollback(context, cause)
# those_finished = []
if len(those_finished):
self._change_state(context, states.RESUMING)
for (r, details) in those_finished:
failed = states.FAILURE in details.get('states', [])
result = details.get('result')
run_it(r, failed=failed, result=result, simulate_run=True)
# leftover:获取迭代器包装的任务运行列表;
# leftover = <listiterator object at 0x40c1990>
self._leftoff_at = leftover
# 改变目前的flow状态为新的状态RUNNING,并执行通知操作;
self._change_state(context, states.RUNNING)
# 如果状态为中断,则返回;
if self.state == states.INTERRUPTED:
return
# 标志任务运行状态不为states.INTERRUPTED;
was_interrupted = False
# leftover:获取迭代器包装的任务运行列表;
# leftover = <listiterator object at 0x40c1990>
for r in leftover:
r.reset()
run_it(r)
if self.state == states.INTERRUPTED:
was_interrupted = True
break
if not was_interrupted:
# Only gets here if everything went successfully.
self._change_state(context, states.SUCCESS)
self._leftoff_at = None 复制代码
先来看语句:
except Exception as e:
runner.result = e
cause = utils.FlowFailure(runner, self, e)
with excutils.save_and_reraise_exception():
# Notify any listeners that the task has errored.
self.task_notifier.notify(states.FAILURE, details={
'context': context,
'flow': self,
'runner': runner,
})
self.rollback(context, cause)
当执行flow中task出现异常时会引发这个异常,即会执行这部分代码;
先来看看类FlowFailure的初始化方法:
class FlowFailure(object):
def __init__(self, runner, flow, exception):
self.runner = runner
self.flow = flow
self.exc = exception
self.exc_info = sys.exc_info() 复制代码
它记录了关于异常的若干信息;
再来看方法rollback的源码实现:
def rollback(self, context, cause):
"""
context = <cinder.context.RequestContext object at 0x35e8b90>
cause = <cinder.taskflow.utils.FlowFailure object at 0x35f79d0>
"""
self._change_state(context, states.REVERTING)
try:
self._accumulator.rollback(cause)
finally:
self._change_state(context, states.FAILURE)
# Rollback any parents flows if they exist...
# self.parents = ()
for p in self.parents:
p.rollback(context, cause) 复制代码
我们关注这个方法中的语句:
self._accumulator.rollback(cause)
进一步来看源码:
def rollback(self, cause):
LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)
for (i, f) in enumerate(self):
LOG.debug("Calling rollback %s: %s", i + 1, f)
try:
f(cause)
except Exception:
LOG.exception(("Failed rolling back %s: %s due to inner exception."), i + 1, f) 复制代码
这里我们可以得到一些输出实例:
i = 0
f = cinder.volume.flows.base.InjectTask;volume:create==1.0
len(self) = 1
这是我在执行第一个task任务的过程中设置了一个异常之后,而输出的结果,所以我们可以理解,一旦某个task的执行过程中出现异常,就会按照task逆序来调用具体的逆转回滚方法来实现系统状态的恢复;在前面的博客中我们也说过,所要调用的逆转回滚对象也是随着task任务的执行所增加的。
我们回到方法def rollback(self, context, cause)所在类的初始化方法:
class RollbackAccumulator(object):
def __init__(self):
self._rollbacks = []
之前我们说过,这个类的作用就是获取回滚方法累加器类的实例化对象,其中存储了要执行回滚操作的任务对象;
我们回顾语句:
# RollbackTask:实现调用任务对应的可用的逆转回滚方法;
rb = utils.RollbackTask(context, runner.task, result=None)
# 在回滚方法累加器中添加逆转回滚方法任务;
self._accumulator.add(rb)
可见在变量self._rollbacks中存储的表示逆转回滚任务的就是类RollbackTask的实例化对象;
所以在语句:
for (i, f) in enumerate(self):
f(cause)
实现了有序地调用变量self._rollbacks中存储的类RollbackTask的实例化对象,我们之前也曾测试过,这些逆转回滚任务的排列是按照之前task调用顺序的逆序进行排列的,所以这里调用的顺序也就是先调用最近执行的task对应的逆转回滚方法。
这里直接调用了类RollbackTask中的__call__方法,来看具体源码实现:
def __call__(self, cause):
"""
实现调用任务对应的可用的逆转回滚方法;
"""
if ((hasattr(self.task, "revert") and
isinstance(self.task.revert, collections.Callable))):
#注:这里有几种不同的回滚方法;
self.task.revert(self.context, self.result, cause) 复制代码
这里依据变量self.task指定了具体的任务类,从而具体定位到任务类中的逆转回滚方法revert上,对于没有实现revert方法的,在其父类中实现了这个方法,说明执行这个任务类过后,如果遇到异常,不需要进行逆转回滚操作;
具体来看所涉及到的所有任务类中的revert方法(这些方法我们在前面的博客中进行过简单的解析,这里只是列出来看看即可):
class QuotaReserveTask(base.CinderTask):
"""
根据给定的大小值和给定的卷类型信息实现保存单一的卷;
"""
def revert(self, context, result, cause):
"""
回调配额预留资源;
根据result中的reservations保留的信息,恢复数据库中卷的配额信息到建立新卷之前的状态;
"""
if not result:
return
if context.quota_committed:
return
reservations = result['reservations']
# 回调配额预留资源;
try:
QUOTAS.rollback(context, reservations)
except exception.CinderException:
LOG.exception(_("Failed rolling back quota for %s reservations"), reservations)
class EntryCreateTask(base.CinderTask):
"""
在数据库中为给定的卷建立相关条目;
逆转操作:从数据库中删除volume_id建立的条目;
"""
def revert(self, context, result, cause):
"""
删除指定的卷在数据库中的数据条目信息,实现逆转回滚操作;
"""
# 如果result为none,说明从来没有产生任何结果,因此不能删除任何数据信息;
if not result:
return
# quota_committed说明不能执行回滚操作,说明此时卷已经建立;
if context.quota_committed:
return
vol_id = result['volume_id']
# 删除指定的卷在数据库中的数据条目信息;
try:
self.db.volume_destroy(context.elevated(), vol_id)
except exception.CinderException:
LOG.exception(_("Failed destroying volume entry %s"), vol_id)
class QuotaCommitTask(base.CinderTask):
"""
提交新的资源配额的预留信息到数据库中;
"""
def revert(self, context, result, cause):
if not result:
return
volume = result['volume_properties']
try:
reserve_opts = {'volumes': -1, 'gigabytes': -volume['size']}
# 添加卷的类型选项到opts,opts表示保留选项信息的hash;
QUOTAS.add_volume_type_opts(context,
reserve_opts,
volume['volume_type_id'])
# 检测配额信息和并建立相应的资源配额预留资源;
reservations = QUOTAS.reserve(context,
project_id=context.project_id,
**reserve_opts)
# 提交资源配额的预留信息到数据库中;
if reservations:
QUOTAS.commit(context, reservations, project_id=context.project_id)
except Exception:
LOG.exception(_("Failed to update quota for deleting volume: %s"), volume['id'])
class OnFailureChangeStatusTask(base.CinderTask):
"""
这个task实现了当出现错误时,设置指定id的卷的状态为ERROR;
"""
def revert(self, context, result, cause):
volume_spec = result.get('volume_spec')
if not volume_spec:
volume_spec = _find_result_spec(cause.flow)
volume_id = result['volume_id']
_restore_source_status(context, self.db, volume_spec)
_error_out_volume(context, self.db, volume_id, reason=cause.exc)
LOG.error(_("Volume %s: create failed"), volume_id)
exc_info = False
if all(cause.exc_info):
exc_info = cause.exc_info
LOG.error(_('Unexpected build error:'), exc_info=exc_info) 复制代码
OK!到这里为止,在卷的建立出现异常的情况下,如何来执行相关的逆转回滚操作的解析工作也基本完成了;所以,方法def run(self, context, *args, **kwargs)的执行过程也基本完成了解析工作。
所以我们就可以回到方法/cinder/volume/api.py----class API----def create中,我们可以看到,在方法的最后,从flow中获取返回结果作为建立卷操作的反馈信息返回给上层调用,从而完成了应用taskflow模式实现卷建立的整个过程,就是:
1.构建字典create_what,实现整合建立卷的具体参数;
2.构建并返回用于建立卷的flow;
3.执行构建的用于建立卷的flow;
4.从flow中获取建立卷的反馈信息;
在下一篇博客中,我将详细的分析task类VolumeCastTask具体是如何来实现根据请求信息进行卷的建立的。
相关文章:
Openstack Cinder中建立volume过程的源码解析(1)
http://www.aboutyun.com/thread-10217-1-1.html
1. cinder中卷的建立的过程中,客户端传递过来的request的执行过程是怎样的?
2. __call__方法都通过什么方法封装?
3.如何调用指定中间件的__call__方法?
Openstack Cinder中建立volume过程的源码解析(2)
http://www.aboutyun.com/thread-10216-1-1.html
1.如何 获取要执行的action方法及其相关的扩展方法?
2.Resource类中的__call__(self,request)方法如何实现?
3. meth的作用?
4.如何 从request.environ中获取要执行的action方法?
5.如何对body进行反序列化操作?
Openstack Cinder中建立volume过程的源码解析(3)
http://www.aboutyun.com/thread-10215-1-1.html
1.get_serializer的作用?
2.isgeneratorfunction是用来做什么的?
3.什么是特殊的generator方法?
4.进行响应信息的序列化操作的步骤?
5.简述 在cinder模块中实现客户端发送过来的请求信息操作的主要的步骤?
Openstack Cinder中建立volume过程的源码解析(4)----以及taskflow相关解析
http://www.aboutyun.com/thread-10214-1-1.html
1.简述 cinder是如何实现卷的建立的?
2.简述 taskflow库来实现卷的简历过程?
3.如何 根据给定id来检索获取单个的卷的类型?
4.如何 构建并返回用于建立卷的flow?
5.如何声明flow是否是真的?
6.简述 建立卷的flow的步骤?
Openstack Cinder中建立volume过程的源码解析(5)----以及taskflow相关解析
http://www.aboutyun.com/thread-10213-1-1.html
1. 如何实现Flow类的初始化的?
2. 用于卷的建立的flow中都添加了哪些task?
3.ExtractVolumeRequestTask类的作用是什么?
4.如何 完全或部分的重置flow的内部的状态?
5.如何 从给定的卷中提取卷的id信息?
6. OnFailureChangeStatusTask类的作用?
Openstack Cinder中建立volume过程的源码解析(6)----以及taskflow相关解析
http://www.aboutyun.com/thread-10212-1-1.html
1.如何来运行已经构建好的flow?
2.run的源码如何实现及实现过程是什么?
3.resume_it的实现过程是什么?
4.类Runner的初始化方法是什么?
5.run_it的源码如何实现?
Openstack Cinder中建立volume过程的源码解析(8)
http://www.aboutyun.com/thread-10219-1-1.html
1. VolumeCastTask的源码如何实现?
2. 远程调用建立新卷的操作,有哪几个步骤?
3.task类VolumeCastTask具体是如何来实现根据请求信息进行卷的建立的?
Openstack Cinder中建立volume过程的源码解析(9)
http://www.aboutyun.com/thread-10210-1-1.html
1.如何 实现create_volume的源码?
2.Cast如何实现远程调create_volume?
3.如何实现调用方法self.volume_rpcapi.create_volume来实现在目标主机上新卷的建立?
转自:http://blog.csdn.net/gaoxingnengjisuan/article/details/23710551