问题导读:
1.如何来运行已经构建好的flow?
2.run的源码如何实现及实现过程是什么?
3.resume_it的实现过程是什么?
4.类Runner的初始化方法是什么?
5.run_it的源码如何实现?
在Openstack Cinder中建立volume过程的源码解析(4)----以及taskflow相关解析 中我们曾经说过,在方法/cinder/volume/api.py----class
API----def create中,主要通过4步实现了应用taskflow进行卷的建立操作,也就是:
1.构建字典create_what,实现整合建立卷的具体参数;
2.构建并返回用于建立卷的flow;
3.执行构建的用于建立卷的flow;
4.从flow中获取建立卷的反馈信息;
其中前2个步骤的解析已经完成,这里来看第3个步骤的实现,即如何来运行已经构建好的flow;
我们再来看方法/cinder/volume/api.py----class API----def create的源码实现:
def create(self, context, size, name, description, snapshot=None,
image_id=None, volume_type=None, metadata=None,
availability_zone=None, source_volume=None,
scheduler_hints=None, backup_source_volume=None):
"""
实现建立卷的操作;
"""
def check_volume_az_zone(availability_zone):
"""
验证availability_zone是否是可用的(即是否包含在可用zone的列表中);
"""
try:
# _valid_availabilty_zone:验证availability_zone是否是可用的(即是否包含在可用zone的列表中);
return self._valid_availabilty_zone(availability_zone)
except exception.CinderException:
LOG.exception(_("Unable to query if %s is in the "
"availability zone set"), availability_zone)
return False
# 所要建立卷的规格数据信息;
create_what = {
'size': size,
'name': name,
'description': description,
'snapshot': snapshot,
'image_id': image_id,
'volume_type': volume_type,
'metadata': metadata,
'availability_zone': availability_zone,
'source_volume': source_volume,
'scheduler_hints': scheduler_hints,
'key_manager': self.key_manager,
'backup_source_volume': backup_source_volume,
}
# 构建并返回用于建立卷的flow;
# self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI();
# self.volume_rpcapi = volume_rpcapi.VolumeAPI();
# self.image_service = (image_service or glance.get_default_image_service())
# check_volume_az_zone:验证availability_zone是否是可用的(即是否包含在可用zone的列表中);
# create_what:所要建立卷的规格数据信息;
(flow, uuid) = create_volume.get_api_flow(self.scheduler_rpcapi,
self.volume_rpcapi,
self.db,
self.image_service,
check_volume_az_zone,
create_what)
# 应用assert关键字来声明flow是真的;
assert flow, _('Create volume flow not retrieved')
# 运行用于建立卷的flow;
flow.run(context)
# 如果flow的运行状态不为states.SUCCESS,则引发异常;
if flow.state != states.SUCCESS:
raise exception.CinderException(_("Failed to successfully complete"
" create volume workflow"))
# Extract the volume information from the task uuid that was specified
# to produce said information.
# 通过task的uuid值获取建立卷的信息;
volume = None
try:
volume = flow.results[uuid]['volume']
except KeyError:
pass
# Raise an error, nobody provided it??
# 应用assert关键字来声明volume是真的;
assert volume, _('Expected volume result not found')
return volume 复制代码
我们来看语句:
flow.run(context)
就实现了构建的flow的执行操作;
我们具体来看方法run的源码实现:
def run(self, context, *args, **kwargs):
"""
工作流(workflow)的执行操作;
context = <cinder.context.RequestContext object at 0x382fd50> //从cinder请求中获取上下文环境信息;
args = ()
kwargs = {}
"""
super(Flow, self).run(context, *args, **kwargs)
def resume_it():
# self._leftoff_at = None
if self._leftoff_at is not None:
return ([], self._leftoff_at)
# self.resumer = None
# 注:这里还没有应用恢复策略,因为类的初始化过程中赋值为None;
if self.resumer:
# self._ordering():获取迭代器包装的任务运行列表;
(finished, leftover) = self.resumer.resume(self, self._ordering())
else:
finished = []
# self._ordering():获取迭代器包装的任务运行列表;
leftover = self._ordering()
# leftover:获取迭代器包装的任务运行列表;
# finished = []
# leftover = <listiterator object at 0x441fa50>
return (finished, leftover)
# 改变目前的flow状态为新的状态STARTED,并执行通知操作;
# flow状态标志为STARTED,表示任务开始运行操作;
self._change_state(context, states.STARTED)
try:
# leftover:获取迭代器包装的任务运行列表;
# those_finished = []
# leftover = <listiterator object at 0x40c1990>
those_finished, leftover = resume_it()
except Exception:
with excutils.save_and_reraise_exception():
self._change_state(context, states.FAILURE)
def run_it(runner, failed=False, result=None, simulate_run=False):
try:
# Add the task to be rolled back *immediately* so that even if
# the task fails while producing results it will be given a
# chance to rollback.
# RollbackTask:实现调用任务对应的可用的逆转回滚方法;
# runner.task = cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0
rb = utils.RollbackTask(context, runner.task, result=None)
# 在回滚方法累加器中添加逆转回滚方法任务;
self._accumulator.add(rb)
self.task_notifier.notify(states.STARTED, details={
'context': context,
'flow': self,
'runner': runner,
})
# simulate_run = False
if not simulate_run:
result = runner(context, *args, **kwargs)
else:
if failed:
if not result:
result = "%s failed running." % (runner.task)
if isinstance(result, basestring):
result = exc.InvalidStateException(result)
if not isinstance(result, Exception):
LOG.warn("Can not raise a non-exception"
" object: %s", result)
result = exc.InvalidStateException()
raise result
rb.result = result
runner.result = result
self.results[runner.uuid] = result
self.task_notifier.notify(states.SUCCESS, details={
'context': context,
'flow': self,
'runner': runner,
})
except Exception as e:
runner.result = e
cause = utils.FlowFailure(runner, self, e)
with excutils.save_and_reraise_exception():
self.task_notifier.notify(states.FAILURE, details={
'context': context,
'flow': self,
'runner': runner,
})
self.rollback(context, cause)
# those_finished = []
if len(those_finished):
self._change_state(context, states.RESUMING)
for (r, details) in those_finished:
failed = states.FAILURE in details.get('states', [])
result = details.get('result')
run_it(r, failed=failed, result=result, simulate_run=True)
# leftover:获取迭代器包装的任务运行列表;
# leftover = <listiterator object at 0x40c1990>
self._leftoff_at = leftover
# 改变目前的flow状态为新的状态RUNNING,并执行通知操作;
self._change_state(context, states.RUNNING)
# 如果状态为中断,则返回;
if self.state == states.INTERRUPTED:
return
# 标志任务运行状态不为states.INTERRUPTED;
was_interrupted = False
# leftover:获取迭代器包装的任务运行列表;
# leftover = <listiterator object at 0x40c1990>
for r in leftover:
r.reset()
run_it(r)
if self.state == states.INTERRUPTED:
was_interrupted = True
break
if not was_interrupted:
# Only gets here if everything went successfully.
self._change_state(context, states.SUCCESS)
self._leftoff_at = None 复制代码
我们来分析这个方法的实现过程;
首先来看语句:
try:
# leftover:获取迭代器包装的任务运行列表;
# those_finished = []
# leftover =
those_finished, leftover = resume_it()
这部分语句的功能是实现获取迭代器包装的task列表,以及已经完成的task列表;
我们具体来看方法resume_it的实现过程:
def resume_it():
# self._leftoff_at = None
if self._leftoff_at is not None:
return ([], self._leftoff_at)
# self.resumer = None
# 注:这里还没有应用恢复策略,因为类的初始化过程中赋值为None;
if self.resumer:
# self._ordering():获取迭代器包装的任务运行列表;
(finished, leftover) = self.resumer.resume(self, self._ordering())
else:
finished = []
# self._ordering():获取迭代器包装的任务运行列表;
leftover = self._ordering()
# leftover:获取迭代器包装的任务运行列表;
# finished = []
# leftover = <listiterator object at 0x441fa50>
return (finished, leftover) 复制代码
我们再来看方法_ordering的源码实现:
def _ordering(self):
# 返回迭代器包装的任务运行列表;
return iter(self._connect())
def _connect(self):
# self._runners:所有要运行的任务集合;
# self._connected = False
if self._connected:
return self._runners
for r in self._runners:
r.providers = {}
for r in reversed(self._runners):
self._associate_providers(r)
self._connected = True
return self._runners 复制代码
我们可以看到在方法_connect中,由类Flow的初始化方法中可以知道,变量self._runners表示的是所有要运行的任务的集合;我们在语句for r in reversed(self._runners)下面添加了一条输出调试语句,获得以下的输出实例:
r = Runner: cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0;
r-a41753ed-a9b4-4999-8aab-979d244425d1; 1.0
r = Runner: cinder.volume.flows.create_volume.OnFailureChangeStatusTask;volume:create==1.0;
r-c689085f-df0a-4459-a8a9-c2518c6e471c; 1.0
r = Runner: cinder.volume.flows.create_volume.QuotaCommitTask;volume:create==1.0;
r-adea0991-6fc9-4a74-9c2f-33848c77b29c; 1.0
r = Runner: cinder.volume.flows.create_volume.EntryCreateTask;volume:create==1.0;
r-c05d7892-7cb0-4e80-b17b-5777ca0ae167; 1.0
r = Runner: cinder.volume.flows.create_volume.QuotaReserveTask;volume:create==1.0;
r-4d1c0c90-b6ac-4406-a3ae-50943fb22cd3; 1.0
r = Runner: cinder.volume.flows.create_volume.ExtractVolumeRequestTask;volume:create==1.0;
r-d7175a90-de39-40c2-853f-462a4c566d8f; 1.0
r = Runner: cinder.volume.flows.base.InjectTask;volume:create==1.0;
r-77f89843-25d6-4dc1-b8c5-4d13816b62fa; 1.0 复制代码
我们可以看到此时,self._runners中集合了所有要运行的task,当然这里暂时调用了reversed方法进行了逆序排序的操作,是为了下面的验证操作做准备的,而self._runners中task的排序还是顺序的;输出实例:
self._runners = [<cinder.taskflow.utils.Runner object at 0x2ca86d0>,
<cinder.taskflow.utils.Runner object at 0x2ca8750>,
<cinder.taskflow.utils.Runner object at 0x2ca87d0>,
<cinder.taskflow.utils.Runner object at 0x2ca8850>,
<cinder.taskflow.utils.Runner object at 0x2ca88d0>,
<cinder.taskflow.utils.Runner object at 0x2ca8950>,
<cinder.taskflow.utils.Runner object at 0x28e6490>] 复制代码
所以获取了task的集合的变量self._runners,再经过_ordering方法的迭代器包装,最后返回给变量leftover;
下面我们再来看语句:
for r in leftover:
r.reset()
run_it(r)
显然,这里完成的就是按照task集合中的顺序,逐个执行所要执行的task,从而成功实现卷的建立;
从上面的调试输出我们可以知道,self._runners为cinder.taskflow.utils.Runner对象的集合,即为要运行的任务的集合,所以这里我们来看一下类Runner的初始化方法:
class Runner(object):
def __init__(self, task, uuid=None):
assert isinstance(task, collections.Callable)
self.task = task
self.providers = {}
self.runs_before = []
self.result = None
if not uuid:
self._id = uuidutils.generate_uuid()
else:
self._id = str(uuid) 复制代码
所以我们在这里可以做以下的调试输出,具体看一下各个变量的作用,这有利于后续代码的理解:
========================================================================================
r = Runner: cinder.volume.flows.base.InjectTask;
volume:create==1.0;
r-dfc38a9a-a67f-44b9-8297-48dc586806ce; 1.0
r.task = cinder.volume.flows.base.InjectTask;volume:create==1.0 # 表示当前任务的类名和任务描述;
r.providers = {}
r.runs_before = [] # 表示当前任务之前的所有任务集合;
r.result = None
r._id = dfc38a9a-a67f-44b9-8297-48dc586806ce
========================================================================================
r = Runner: cinder.volume.flows.create_volume.ExtractVolumeRequestTask;
volume:create==1.0;
r-920b4ad9-edd8-436b-bfce-18f940d714a7; 1.0
r.task = cinder.volume.flows.create_volume.ExtractVolumeRequestTask;volume:create==1.0
r.providers = {'backup_source_volume': <cinder.taskflow.utils.Runner object at 0x3a28650>,
'availability_zone': <cinder.taskflow.utils.Runner object at 0x3a28650>,
'source_volume': <cinder.taskflow.utils.Runner object at 0x3a28650>,
'volume_type': <cinder.taskflow.utils.Runner object at 0x3a28650>,
'image_id': <cinder.taskflow.utils.Runner object at 0x3a28650>,
'snapshot': <cinder.taskflow.utils.Runner object at 0x3a28650>,
'size': <cinder.taskflow.utils.Runner object at 0x3a28650>,
'key_manager': <cinder.taskflow.utils.Runner object at 0x3a28650>,
'metadata': <cinder.taskflow.utils.Runner object at 0x3a28650>}
r.runs_before = [<cinder.taskflow.utils.Runner object at 0x3a28650>]
r.result = None
r._id = 920b4ad9-edd8-436b-bfce-18f940d714a7
========================================================================================
r = Runner: cinder.volume.flows.create_volume.QuotaReserveTask;
volume:create==1.0;
r-49eb28ff-4da0-4e17-bf89-c9d82abc4eb8; 1.0
r.task = cinder.volume.flows.create_volume.QuotaReserveTask;volume:create==1.0
r.providers = {'volume_type_id': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
'size': <cinder.taskflow.utils.Runner object at 0x3a286d0>}
r.runs_before = [<cinder.taskflow.utils.Runner object at 0x3a286d0>,
<cinder.taskflow.utils.Runner object at 0x3a28650>]
r.result = None
r._id = 49eb28ff-4da0-4e17-bf89-c9d82abc4eb8
========================================================================================
r = Runner: cinder.volume.flows.create_volume.EntryCreateTask;
volume:create==1.0;
r-b338cd2d-282b-4da2-9f10-7cd79f9de18d; 1.0
r.task = cinder.volume.flows.create_volume.EntryCreateTask;volume:create==1.0
r.providers = {'size': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
'volume_type_id': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
'description': <cinder.taskflow.utils.Runner object at 0x3a28650>,
'availability_zone': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
'reservations': <cinder.taskflow.utils.Runner object at 0x3a28750>,
'source_volid': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
'encryption_key_id': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
'snapshot_id': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
'metadata': <cinder.taskflow.utils.Runner object at 0x3a28650>,
'name': <cinder.taskflow.utils.Runner object at 0x3a28650>}
r.runs_before = [<cinder.taskflow.utils.Runner object at 0x3a28750>,
<cinder.taskflow.utils.Runner object at 0x3a286d0>,
<cinder.taskflow.utils.Runner object at 0x3a28650>]
r.result = None
r._id = b338cd2d-282b-4da2-9f10-7cd79f9de18d
========================================================================================
r = Runner: cinder.volume.flows.create_volume.QuotaCommitTask;
volume:create==1.0;
r-11bfeedf-6cc7-46f6-b257-472114132cb5; 1.0
r.task = cinder.volume.flows.create_volume.QuotaCommitTask;volume:create==1.0
r.providers = {'reservations': <cinder.taskflow.utils.Runner object at 0x3a28750>,
'volume_properties': <cinder.taskflow.utils.Runner object at 0x3a287d0>}
r.runs_before = [<cinder.taskflow.utils.Runner object at 0x3a287d0>,
<cinder.taskflow.utils.Runner object at 0x3a28750>,
<cinder.taskflow.utils.Runner object at 0x3a286d0>,
<cinder.taskflow.utils.Runner object at 0x3a28650>]
r.result = None
r._id = 11bfeedf-6cc7-46f6-b257-472114132cb5
========================================================================================
r = Runner: cinder.volume.flows.create_volume.OnFailureChangeStatusTask;
volume:create==1.0;
r-255b5031-abb0-4c02-9c1d-ddb59fe4066b; 1.0
r.task = cinder.volume.flows.create_volume.OnFailureChangeStatusTask;volume:create==1.0
r.providers = {'volume_id': <cinder.taskflow.utils.Runner object at 0x3a287d0>}
r.runs_before = [<cinder.taskflow.utils.Runner object at 0x3a28850>,
<cinder.taskflow.utils.Runner object at 0x3a287d0>,
<cinder.taskflow.utils.Runner object at 0x3a28750>,
<cinder.taskflow.utils.Runner object at 0x3a286d0>,
<cinder.taskflow.utils.Runner object at 0x3a28650>]
r.result = None
r._id = 255b5031-abb0-4c02-9c1d-ddb59fe4066b
========================================================================================
r = Runner: cinder.volume.flows.create_volume.VolumeCastTask;
volume:create==1.0;
r-68318d88-aecb-4ac4-b80d-26f6e91b9b95; 1.0
r.task = cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0
r.providers = {'image_id': <cinder.taskflow.utils.Runner object at 0x3a28650>,
'snapshot_id': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
'source_volid': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
'volume_id': <cinder.taskflow.utils.Runner object at 0x3a287d0>,
'volume_properties': <cinder.taskflow.utils.Runner object at 0x3a287d0>,
'volume_type': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
'scheduler_hints': <cinder.taskflow.utils.Runner object at 0x3a28650>}
r.runs_before = [<cinder.taskflow.utils.Runner object at 0x3a288d0>,
<cinder.taskflow.utils.Runner object at 0x3a28850>,
<cinder.taskflow.utils.Runner object at 0x3a287d0>,
<cinder.taskflow.utils.Runner object at 0x3a28750>,
<cinder.taskflow.utils.Runner object at 0x3a286d0>,
<cinder.taskflow.utils.Runner object at 0x3a28650>]
r.result = None
r._id = 68318d88-aecb-4ac4-b80d-26f6e91b9b95
======================================================================================== 复制代码
我们接着来看方法run_it的源码实现:
def run_it(runner, failed=False, result=None, simulate_run=False):
"""
以其中之一为例:
runner = Runner: cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0;
r-68318d88-aecb-4ac4-b80d-26f6e91b9b95; 1.0
runner.task = cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0
runner.providers = {'image_id': <cinder.taskflow.utils.Runner object at 0x3a28650>,
'snapshot_id': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
'source_volid': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
'volume_id': <cinder.taskflow.utils.Runner object at 0x3a287d0>,
'volume_properties': <cinder.taskflow.utils.Runner object at 0x3a287d0>,
'volume_type': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
'scheduler_hints': <cinder.taskflow.utils.Runner object at 0x3a28650>}
runner.runs_before = [<cinder.taskflow.utils.Runner object at 0x3a288d0>,
<cinder.taskflow.utils.Runner object at 0x3a28850>,
<cinder.taskflow.utils.Runner object at 0x3a287d0>,
<cinder.taskflow.utils.Runner object at 0x3a28750>,
<cinder.taskflow.utils.Runner object at 0x3a286d0>,
<cinder.taskflow.utils.Runner object at 0x3a28650>]
runner.result = None
runner._id = 68318d88-aecb-4ac4-b80d-26f6e91b9b95
runner = Runner: cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0;
r-6865ee0d-c5dc-4b62-972b-6ad3af46d147; 1.0
failed = False
result = None
simulate_run = False
"""
try:
# RollbackTask:实现调用任务对应的可用的逆转回滚方法;
# runner.task = cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0
rb = utils.RollbackTask(context, runner.task, result=None)
# 在回滚方法累加器中添加逆转回滚方法任务;
self._accumulator.add(rb)
self.task_notifier.notify(states.STARTED, details={
'context': context,
'flow': self,
'runner': runner,
})
# simulate_run = False
if not simulate_run:
result = runner(context, *args, **kwargs)
yle="font-family:KaiTi_GB2312;"> </span>else:
if failed:
if not result:
result = "%s failed running." % (runner.task)
if isinstance(result, basestring):
result = exc.InvalidStateException(result)
if not isinstance(result, Exception):
LOG.warn("Can not raise a non-exception"
" object: %s", result)
result = exc.InvalidStateException()
raise result
rb.result = result
runner.result = result
self.results[runner.uuid] = result
self.task_notifier.notify(states.SUCCESS, details={
'context': context,
'flow': self,
'runner': runner,
})
except Exception as e:
runner.result = e
cause = utils.FlowFailure(runner, self, e)
with excutils.save_and_reraise_exception():
# Notify any listeners that the task has errored.
self.task_notifier.notify(states.FAILURE, details={
'context': context,
'flow': self,
'runner': runner,
})
self.rollback(context, cause) 复制代码
首先来看语句:
rb = utils.RollbackTask(context, runner.task, result=None)
其中类RollbackTask实现调用任务对应的可用的逆转回滚方法;我们具体来看类RollbackTask的实现代码:
class RollbackTask(object):
"""
A helper task that on being called will call the underlying callable
tasks revert method (if said method exists).
实现调用任务对应的可用的逆转回滚方法;
"""
def __init__(self, context, task, result):
# task = cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0
self.task = task
self.result = result
self.context = context
def __str__(self):
return str(self.task)
def __call__(self, cause):
"""
实现调用任务对应的可用的逆转回滚方法;
"""
if ((hasattr(self.task, "revert") and
isinstance(self.task.revert, collections.Callable))):
#注:这里有几种不同的回滚方法;
self.task.revert(self.context, self.result, cause) 复制代码
类RollbackTask实现调用任务对应的可用的逆转回滚方法,调用task类中对应的revert方法都需要经过这个类中的__call__方法来实现,后面我们会解析到的。
我们再来看语句:
self._accumulator.add(rb)
这条语句实现了在回滚方法累加器中添加逆转回滚方法任务;在类Flow的初始化方法中可以看到,self._accumulator = utils.RollbackAccumulator(),所以我们先来看类RollbackAccumulator的源码实现:
class RollbackAccumulator(object):
def __init__(self):
self._rollbacks = []
def add(self, *callables):
self._rollbacks.extend(callables)
def reset(self):
self._rollbacks = []
def __len__(self):
return len(self._rollbacks)
def __iter__(self):
# Rollbacks happen in the reverse order that they were added.
return reversed(self._rollbacks)
def __enter__(self):
return self
def rollback(self, cause):
LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)
for (i, f) in enumerate(self):
LOG.debug("Calling rollback %s: %s", i + 1, f)
try:
f(cause)
except Exception:
LOG.exception(("Failed rolling back %s: %s due "
"to inner exception."), i + 1, f)
def __exit__(self, type, value, tb):
if any((value, type, tb)):
self.rollback(value) 复制代码
我们可以看到语句self._accumulator.add(rb)实现了把将要执行的回滚任务添加到变量self._rollbacks中;所以这里可以测试self._accumulator._rollbacks:
self._accumulator._rollbacks = [<cinder.taskflow.utils.RollbackTask object at 0x3ae6d10>]
self._accumulator._rollbacks = [<cinder.taskflow.utils.RollbackTask object at 0x3ae6d10>,
<cinder.taskflow.utils.RollbackTask object at 0x3ae6e50>]
......
self._accumulator._rollbacks = [<cinder.taskflow.utils.RollbackTask object at 0x3ae6d10>,
<cinder.taskflow.utils.RollbackTask object at 0x3ae6e50>,
<cinder.taskflow.utils.RollbackTask object at 0x3749150>,
<cinder.taskflow.utils.RollbackTask object at 0x3ae6dd0>,
<cinder.taskflow.utils.RollbackTask object at 0x3dac3d0>,
<cinder.taskflow.utils.RollbackTask object at 0x3df2f50>,
<cinder.taskflow.utils.RollbackTask object at 0x3df2610>] 复制代码
我们可以看到,随着执行任务的增加,其对应的回滚任务也是对应增加的,并存储在变量self._accumulator._rollbacks中;
我们再来看语句:
# simulate_run = False
if not simulate_run:
result = runner(context, *args, **kwargs)
这里实现的就是执行指定的task任务,并获取返回结果;
这里我们可以得到如下的输出实例:
========================================================================================
runner = Runner: cinder.volume.flows.base.InjectTask;
volume:create==1.0;
r-bf0f8b30-c2a0-4c22-ada2-22698adc71b9; 1.0
result = {'backup_source_volume': None,
'description': None,
'availability_zone': None,
'source_volume': None,
'volume_type': None,
'name': u'shinian01',
'image_id': None,
'metadata': {},
'snapshot': None,
'size': 1,
'key_manager': <cinder.keymgr.conf_key_mgr.ConfKeyManager object at 0x373f790>,
'scheduler_hints': None}
======================================================================================== runner = Runner:cinder.volume.flows.create_volume.ExtractVolumeRequestTask;
volume:create==1.0;
r-c88edccb-d78b-4cfa-ae68-5d8b72bd173d;1.0
result = {'volume_type_id': None,
'qos_specs': None,
'encryption_key_id': None,
'volume_type': {},
'snapshot_id': None,
'availability_zone': 'nova',
'source_volid': None,
'size': 1}
========================================================================================
runner = Runner: cinder.volume.flows.create_volume.QuotaReserveTask;
volume:create==1.0;
r-4b407b9a-c1a4-4778-8125-751ebe8e3c2b; 1.0
result = {'reservations': ['6fd57073-e9b6-49b9-bed4-a57b82fced51',
'365c158b-d291-4ac5-9d2d-5b7fb8a63dd6']}
========================================================================================
runner = Runner: cinder.volume.flows.create_volume.EntryCreateTask;
volume:create==1.0;
r-9ee48341-945d-4905-9b1d-31122cc9745e; 1.0
result = {'volume': <cinder.db.sqlalchemy.models.Volume object at 0x3ae6f10>,
'volume_properties': {'status': 'creating',
'volume_type_id': None,
'user_id': u'ef073287176048bd861dcd9d9c4d9808',
'availability_zone': 'nova',
'reservations': ['6fd57073-e9b6-49b9-bed4-a57b82fced51',
'365c158b-d291-4ac5-9d2d-5b7fb8a63dd6'],
'volume_admin_metadata': [],
'attach_status': 'detached',
'display_description': None,
'volume_metadata': [],
'metadata': {},
'encryption_key_id': None,
'source_volid': None,
'snapshot_id': None,
'display_name': u'shinian01',
'project_id': u'6c3c74779a614d3b81dd75518824e25c',
'id': '431b1c64-4e43-4731-b052-ff10aefae850',
'size': 1},
'volume_id': '431b1c64-4e43-4731-b052-ff10aefae850'}
========================================================================================
runner = Runner: cinder.volume.flows.create_volume.QuotaCommitTask;
volume:create==1.0;
r-50d91e70-67fe-42f3-911f-1f59d4173b8b; 1.0
result = {'volume_properties': {'status': 'creating',
'volume_type_id': None,
'user_id': u'ef073287176048bd861dcd9d9c4d9808',
'availability_zone': 'nova',
'reservations': ['6fd57073-e9b6-49b9-bed4-a57b82fced51',
'365c158b-d291-4ac5-9d2d-5b7fb8a63dd6'],
'volume_admin_metadata': [],
'attach_status': 'detached',
'display_description': None,
'volume_metadata': [],
'metadata': {},
'encryption_key_id': None,
'source_volid': None,
'snapshot_id': None,
'display_name': u'shinian01',
'project_id': u'6c3c74779a614d3b81dd75518824e25c',
'id': '431b1c64-4e43-4731-b052-ff10aefae850',
'size': 1}}
========================================================================================
runner = Runner:cinder.volume.flows.create_volume.OnFailureChangeStatusTask;
volume:create==1.0;
r-9cd4bfa9-3170-4713-a6ac-2daf984185b2;1.0
result = {'volume_spec': None,
'volume_id': '431b1c64-4e43-4731-b052-ff10aefae850'}
========================================================================================
runner = Runner: cinder.volume.flows.create_volume.VolumeCastTask;
volume:create==1.0;
r-6865ee0d-c5dc-4b62-972b-6ad3af46d147; 1.0
result = None
======================================================================================== 复制代码
可见这里runner指明了要执行的类的__call__方法,即运行先前添加的task任务;
result是运行task任务后获取的结果信息;
具体的task的实现过程需解析各个task任务类中的__call__方法,之前的博客中我们已经进行了简单的解析,这里不再进行赘述;
好了,到这里方法def run(self, context, *args, **kwargs)中关于flow中task执行的重要语句的实现基本解析完成,在下一篇博客中,我将重点解析在这个方法中,如果卷的建立出现异常,则如何执行相关的逆转回滚操作。
相关文章:
Openstack Cinder中建立volume过程的源码解析(1)
http://www.aboutyun.com/thread-10217-1-1.html
1. cinder中卷的建立的过程中,客户端传递过来的request的执行过程是怎样的?
2. __call__方法都通过什么方法封装?
3.如何调用指定中间件的__call__方法?
Openstack Cinder中建立volume过程的源码解析(2)
http://www.aboutyun.com/thread-10216-1-1.html
1.如何 获取要执行的action方法及其相关的扩展方法?
2.Resource类中的__call__(self,request)方法如何实现?
3. meth的作用?
4.如何 从request.environ中获取要执行的action方法?
5.如何对body进行反序列化操作?
Openstack Cinder中建立volume过程的源码解析(3)
http://www.aboutyun.com/thread-10215-1-1.html
1.get_serializer的作用?
2.isgeneratorfunction是用来做什么的?
3.什么是特殊的generator方法?
4.进行响应信息的序列化操作的步骤?
5.简述 在cinder模块中实现客户端发送过来的请求信息操作的主要的步骤?
Openstack Cinder中建立volume过程的源码解析(4)----以及taskflow相关解析
http://www.aboutyun.com/thread-10214-1-1.html
1.简述 cinder是如何实现卷的建立的?
2.简述 taskflow库来实现卷的简历过程?
3.如何 根据给定id来检索获取单个的卷的类型?
4.如何 构建并返回用于建立卷的flow?
5.如何声明flow是否是真的?
6.简述 建立卷的flow的步骤?
Openstack Cinder中建立volume过程的源码解析(5)----以及taskflow相关解析
http://www.aboutyun.com/thread-10213-1-1.html
1. 如何实现Flow类的初始化的?
2. 用于卷的建立的flow中都添加了哪些task?
3.ExtractVolumeRequestTask类的作用是什么?
4.如何 完全或部分的重置flow的内部的状态?
5.如何 从给定的卷中提取卷的id信息?
6. OnFailureChangeStatusTask类的作用?
Openstack Cinder中建立volume过程的源码解析(7)----以及taskflow相关解析
http://www.aboutyun.com/thread-10211-1-1.html
1.关于flow中task执行的重要语句的实现基本解析完成,如何实现?
2.如果卷的建立出现异常,则如何执行相关的逆转回滚操作?
Openstack Cinder中建立volume过程的源码解析(8)
http://www.aboutyun.com/thread-10219-1-1.html
1. VolumeCastTask的源码如何实现?
2. 远程调用建立新卷的操作,有哪几个步骤?
3.task类VolumeCastTask具体是如何来实现根据请求信息进行卷的建立的?
Openstack Cinder中建立volume过程的源码解析(9)
http://www.aboutyun.com/thread-10210-1-1.html
1.如何 实现create_volume的源码?
2.Cast如何实现远程调create_volume?
3.如何实现调用方法self.volume_rpcapi.create_volume来实现在目标主机上新卷的建立?
转自:http://blog.csdn.net/gaoxingnengjisuan/article/details/23678149