阅读导读:
1.如何实现Flow类的初始化的?
2.用于卷的建立的flow中都添加了哪些task?
3.ExtractVolumeRequestTask类的作用是什么?
4.如何完全或部分的重置flow的内部的状态?
5.如何从给定的卷中提取卷的id信息?
6.OnFailureChangeStatusTask类的作用?
感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
这里就不写什么开头语了,直接继续上一篇博客!
2.构建并返回用于建立卷的flow(继续)
我们先来关注,这里是如何实现Flow类的初始化的,来看语句:
- api_flow = linear_flow.Flow(flow_name)
复制代码
来看看任务流基类中都定义了哪些方法,这也有助于我们理解任务流的整体概念;
- class Flow(object):
- """
- The base abstract class of all flow implementations.
- Flow抽象类;
- """
- def __init__(self, name, parents=None, uuid=None):
- super(Flow, self).__init__(name, parents, uuid)
- # 获取回滚方法累加器类的实例化对象;
- self._accumulator = utils.RollbackAccumulator()
- self.results = {}
- self._leftoff_at = None
- # 所有要运行的任务集合;
- self._runners = []
- self._connected = False
- # 确定所要使用的恢复策略;
- self.resumer = None
-
- def name(self)flow可读的非唯一的名称;
- def uuid(self)flow唯一的标识;
- def state(self)为flow提供了一个只读的状态信息;
- def _change_state(self, context, new_state)改变目前的flow状态为新的状态new_state,并执行通知操作;
- def add(self, task)添加一个给定的task到工作流中;
- def add_many(self, tasks)添加给定的若干的task到工作流中;
- def interrupt(self)尝试中断当前的flow和当前没有在flow中执行的task;
- def reset(self)完全重置flow的内部的状态,并允许flow再次运行;
- def soft_reset(self)部分地重置flow的内部状态,并允许flow从中止的状态再次运行;
- def run(self, context, *args, **kwargs)工作流(workflow)的执行操作;
- def rollback(self, context, cause)执行workflow和其父workflow的回滚操作;
复制代码
再来看一下这里是如何实现task的添加的,比如语句:
- api_flow.add(base.InjectTask(create_what, addons=[ACTION]))
复制代码
所以来看看方法add的源码实现:
- @decorators.locked
- def add(self, task):
- """
- Adds a given task to this flow.
- 添加一个给定的task到flow;
- """
- assert isinstance(task, collections.Callable)
- r = utils.Runner(task)
- r.runs_before = list(reversed(self._runners))
- self._runners.append(r)
- self._reset_internals()
- return r.uuid
复制代码
所实现的功能就是添加一个给定的task到flow中,具体实现就是添加到self._runners中;其中每一个task都包装成了一个Runner类的对象;其中runs_before中存储了除了当前task的以前的task;这里的变量信息后面的flow运行实现过程中都要用到的;
我们继续来关注用于卷的建立的flow中都添加了哪些task?
2.1 api_flow.add(base.InjectTask(create_what, addons=[ACTION]))
这个类实现了注入字典信息create_what到flow中,create_what是建立卷所要遵守的规范信息(字典);
来看类InjectTask的具体代码:
- class InjectTask(CinderTask):
- """
- 这个类实现了注入字典信息到flow中;
- """
- def __init__(self, inject_what, addons=None):
- super(InjectTask, self).__init__(addons=addons)
- self.provides.update(inject_what.keys())
- self._inject = inject_what
- def __call__(self, context):
- return dict(self._inject)
复制代码
这个类以task的形式添加到flow中,从这个类的__call__方法我们可以看出,这个类实际上就是实现了对create_what的字典化处理;
在这个task类中并没有具体实现revert方法,因为这个task中不需要回滚操作;
2.2 api_flow.add(ExtractVolumeRequestTask(image_service, az_check_functor))
这个task类所实现的功能就是对输入的请求信息中的相关参数进行验证,并将这些参数转换成有效的集合,并返回这些经过验证和转换的参数,这些参数会应用于后续的task的实现过程之中的;
来看类ExtractVolumeRequestTask的具体代码:
- class ExtractVolumeRequestTask(base.CinderTask):
- """
- 实现提取并验证处理卷的请求信息任务类;
- 这个task的主要任务是提取和验证输入的值,这些输入的值将形成一个潜在的卷的请求信息;
- 并且实现根据一组条件对这些输入值进行验证,并将这些输入值转换成有效的集合;
- 并返回这些经过验证和转换的输入值,这些输入值将会应用于其他task中。
- """
- # image_service:获取默认的镜像服务类
- # az_check_functor:验证availability_zone是否是可用的(即是否包含在可用zone的列表中);
- def __init__(self, image_service, az_check_functor=None):
- super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION])
- self.provides.update(['availability_zone', 'size', 'snapshot_id',
- 'source_volid', 'volume_type', 'volume_type_id',
- 'encryption_key_id'])
- self.requires.update(['availability_zone', 'image_id', 'metadata',
- 'size', 'snapshot', 'source_volume',
- 'volume_type', 'key_manager',
- 'backup_source_volume'])
- self.image_service = image_service
- self.az_check_functor = az_check_functor
- if not self.az_check_functor:
- self.az_check_functor = lambda az: True
- def __call__(self, context, size, snapshot, image_id, source_volume,
- availability_zone, volume_type, metadata,
- key_manager, backup_source_volume):
- """
- 这个task的主要任务是提取和验证输入的值,这些输入的值将形成一个潜在的卷的请求信息;
- 并且实现根据一组条件对这些输入值进行验证,并将这些输入值转换成有效的集合;
- 并返回这些经过验证和转换的输入值,这些输入值将会应用于其他task中。
- """
- # 实现对所提供的输入参数进行是否为空的验证操作;
- utils.check_exclusive_options(snapshot=snapshot,
- imageRef=image_id,
- source_volume=source_volume)
- # 验证在指定的上下文环境中ACTION是有效的;
- # ACTION = 'volume:create';
- policy.enforce_action(context, ACTION)
-
- # 从给定的快照中提取快照的id信息;
- snapshot_id = self._extract_snapshot(snapshot)
- # 从给定的卷中提取卷的id信息;
- source_volid = self._extract_source_volume(source_volume)
- # 提取并验证卷的大小;
- size = self._extract_size(size, source_volume, snapshot)
- # 检测镜像的存在性,并验证镜像的元数据中镜像大小的属性信息;
- self._check_image_metadata(context, image_id, size)
- # 提取并返回一个经过验证的可用的zone;
- availability_zone = self._extract_availability_zone(availability_zone,
- snapshot,
- source_volume)
-
- # 如果没有定义volume_type,则获取默认的卷的类型;
- if not volume_type and not source_volume and not snapshot:
- # 获取默认的卷的类型;
- volume_type = volume_types.get_default_volume_type()
- # 获取卷类型信息的id值;
- volume_type_id = self._get_volume_type_id(volume_type,
- source_volume, snapshot,
- backup_source_volume)
-
- # 获取加密密钥信息的id值;
- encryption_key_id = self._get_encryption_key_id(key_manager,
- context,
- volume_type_id,
- snapshot,
- source_volume,
- backup_source_volume)
- specs = {}
- # 根据给定的卷类型获取所有QOS功能相关的信息;
- if volume_type_id:
- qos_specs = volume_types.get_volume_type_qos_specs(volume_type_id)
- specs = qos_specs['qos_specs']
- if not specs:
- specs = None
- # 检测卷的元数据属性是有效的;
- self._check_metadata_properties(metadata)
- # 返回经过验证的参数信息;
- # 将会用于其他task的操作;
- return {
- 'size': size,
- 'snapshot_id': snapshot_id,
- 'source_volid': source_volid,
- 'availability_zone': availability_zone,
- 'volume_type': volume_type,
- 'volume_type_id': volume_type_id,
- 'encryption_key_id': encryption_key_id,
- 'qos_specs': specs,
- }
复制代码
具体功能的实现过程可以看我在代码中的注释信息,这里不再赘述,这个task类中也是不需要回滚操作的;
2.3 api_flow.add(QuotaReserveTask())
在这个task类中主要实现了几个步骤的内容,也就是说:
- 根据给定的建立新卷的大小和类型,对资源配额信息进行检测,检测建立卷的可行性;
- 根据给定的建立新卷的大小和类型,实现对数据库中资源配额信息的更新;
- 保留建立新卷之前的相关资源配额信息,用于一旦卷建立的执行出现异常时,调用逆转回滚方法实现资源配额信息的恢复;
来看类QuotaReserveTask的具体代码:
- class QuotaReserveTask(base.CinderTask):
- """
- 根据给定的大小值和给定的卷类型信息实现保存单一的卷;
- """
- def __init__(self):
- super(QuotaReserveTask, self).__init__(addons=[ACTION])
- self.requires.update(['size', 'volume_type_id'])
- self.provides.update(['reservations'])
- def __call__(self, context, size, volume_type_id):
- """
- 根据给定的大小值和给定的卷类型信息实现保存单一的卷;
- 根据给定的建立新卷的大小和类型,对资源配额信息进行检测,检测建立卷的可行性;
- 根据给定的建立新卷的大小和类型,实现对数据库中资源配额信息的更新;
- 保留建立新卷之前的相关资源配额信息,用于一旦卷建立的执行出现异常时,调用逆转回滚方法实现资源配额信息的恢复;
- """
- try:
- reserve_opts = {'volumes': 1, 'gigabytes': size}
- # 添加卷的类型信息到reserve_opts,reserve_opts表示保留选项信息;
- QUOTAS.add_volume_type_opts(context, reserve_opts, volume_type_id)
- # 检测配额信息和并建立相应的资源配额预留资源;
- reservations = QUOTAS.reserve(context, **reserve_opts)
- return {
- 'reservations': reservations,
- }
- except exception.OverQuota as e:
- overs = e.kwargs['overs']
- quotas = e.kwargs['quotas']
- usages = e.kwargs['usages']
- def _consumed(name):
- return (usages[name]['reserved'] + usages[name]['in_use'])
- def _is_over(name):
- for over in overs:
- if name in over:
- return True
- return False
- if _is_over('gigabytes'):
- msg = _("Quota exceeded for %(s_pid)s, tried to create "
- "%(s_size)sG volume (%(d_consu:med)dG "
- "of %(d_quota)dG already consumed)")
- LOG.warn(msg % {'s_pid': context.project_id,
- 's_size': size,
- 'd_consumed': _consumed('gigabytes'),
- 'd_quota': quotas['gigabytes']})
- raise exception.VolumeSizeExceedsAvailableQuota()
- elif _is_over('volumes'):
- msg = _("Quota exceeded for %(s_pid)s, tried to create "
- "volume (%(d_consumed)d volumes "
- "already consumed)")
- LOG.warn(msg % {'s_pid': context.project_id,
- 'd_consumed': _consumed('volumes')})
- allowed = quotas['volumes']
- raise exception.VolumeLimitExceeded(allowed=quotas['volumes'])
- else:
- # If nothing was reraised, ensure we reraise the initial error
- raise
- def revert(self, context, result, cause):
- """
- 根据result中的reservations保留的信息,恢复数据库中卷的配额信息到建立新卷之前的状态;
- """
- if not result:
- return
- if context.quota_committed:
- return
- reservations = result['reservations']
-
- # 根据reservations保留的信息,恢复数据库中卷的配额信息到建立新卷之前的状态;
- try:
- QUOTAS.rollback(context, reservations)
- except exception.CinderException:
- LOG.exception(_("Failed rolling back quota for"
- " %s reservations"), reservations)
复制代码
具体功能的实现过程可以看我在代码中的注释信息,这里不再赘述;在这个类中实现了revert方法,用于当建立卷的过程中出现异常时,实现根据result中的reservations保留的信息,恢复数据库中卷的配额信息到建立新卷之前的状态;在方法revert中,具体调用了方法QUOTAS.rollback,我们简单来看一下代码的实现过程:
- def rollback(self, context, reservations, project_id=None):
- """
- 回调配额预留资源;
- 根据reservations保留的信息,恢复数据库中卷的配额信息到建立新卷之前的状态;
- """
- # 回调配额预留资源;
- # 根据reservations保留的信息,恢复数据库中卷的配额信息到建立新卷之前的状态;
- try:
- self._driver.rollback(context, reservations, project_id=project_id)
- except Exception:
- LOG.exception(_("Failed to roll back reservations "
- "%s") % reservations)
- def rollback(self, context, reservations, project_id=None):
- """
- 回调配额预留资源;
- 根据reservations保留的信息,恢复数据库中卷的配额信息到建立新卷之前的状态;
- """
- if project_id is None:
- project_id = context.project_id
- db.reservation_rollback(context, reservations, project_id=project_id)
- def reservation_rollback(context, reservations, project_id=None):
- """
- 回调配额预留资源;
- 根据reservations保留的信息,恢复数据库中卷的配额信息到建立新卷之前的状态;
- """
- return IMPL.reservation_rollback(context, reservations,
- project_id=project_id)
- @require_context
- def reservation_rollback(context, reservations, project_id=None):
- """
- 回调配额预留资源;
- 根据reservations保留的信息,恢复数据库中卷的配额信息到建立新卷之前的状态;
- """
- session = get_session()
- with session.begin():
- usages = _get_quota_usages(context, session, project_id)
- for reservation in _quota_reservations(session, context, reservations):
- usage = usages[reservation.resource]
- if reservation.delta >= 0:
- usage.reserved -= reservation.delta
- reservation.delete(session=session)
- for usage in usages.values():
- usage.save(session=session)
- def _quota_reservations(session, context, reservations):
- """Return the relevant reservations."""
- # Get the listed reservations
- return model_query(context, models.Reservation,
- read_deleted="no",
- session=session).\
- filter(models.Reservation.uuid.in_(reservations)).\
- with_lockmode('update').\
- all()
复制代码
我们可以看到,到最后方法根据reservations在数据表Reservation中查询到对应的保留的资源配额信息,来实现恢复数据库中卷的资源配额信息到建立新卷之前的状态。
2.4 v_uuid = api_flow.add(EntryCreateTask(db))
这个task类所实现的功能是在数据库中为给定的要建立的卷来建立相关条目;
来看类EntryCreateTask的具体代码:
- class EntryCreateTask(base.CinderTask):
- """
- 在数据库中为给定的卷建立条目;
- 逆转操作:从数据库中删除volume_id建立的条目;
- """
- def __init__(self, db):
- super(EntryCreateTask, self).__init__(addons=[ACTION])
- self.db = db
- self.requires.update(['availability_zone', 'description', 'metadata',
- 'name', 'reservations', 'size', 'snapshot_id',
- 'source_volid', 'volume_type_id',
- 'encryption_key_id'])
- self.provides.update(['volume_properties', 'volume_id'])
- def __call__(self, context, **kwargs):
- """
- 为给定的输入在数据库中建立数据条目,并返回详细信息;
- 从kwargs中获取卷的相关属性值,并根据卷的相关属性值在数据库中实现新卷的建立;
- """
- volume_properties = {
- 'size': kwargs.pop('size'),
- 'user_id': context.user_id,
- 'project_id': context.project_id,
- 'status': 'creating',
- 'attach_status': 'detached',
- 'encryption_key_id': kwargs.pop('encryption_key_id'),
- # Rename these to the internal name.
- 'display_description': kwargs.pop('description'),
- 'display_name': kwargs.pop('name'),
- }
- volume_properties.update(kwargs)
- # 根据卷的相关属性volume_properties的值来建立新卷;
- volume = self.db.volume_create(context, volume_properties)
- return {
- 'volume_id': volume['id'],
- 'volume_properties': volume_properties,
- 'volume': volume,
- }
- def revert(self, context, result, cause):
- """
- 删除指定的卷在数据库中的数据条目信息,实现逆转回滚操作;
- """
-
- # 如果result为none,说明从来没有产生任何结果,因此不能删除任何数据信息;
- if not result:
- return
-
- # quota_committed说明不能执行回滚操作,说明此时卷已经建立;
- if context.quota_committed:
- return
- vol_id = result['volume_id']
-
- # 删除指定的卷在数据库中的数据条目信息;
- try:
- self.db.volume_destroy(context.elevated(), vol_id)
- except exception.CinderException:
- LOG.exception(_("Failed destroying volume entry %s"), vol_id)
复制代码
这个task类实现的功能很简单,就是在数据库中为新建立的卷添加相关的条目信息,如果建立卷的操作出现异常,可以调用方法revert来实现删除卷在数据库中新建立的数据条目信息,从而实现逆转回滚操作;
2.5 api_flow.add(QuotaCommitTask())
这个task类所实现的功能就是,暂时假设卷的建立是成功的,此时需要改变资源配额信息,这里就是提交新的资源配额信息到数据库中;
如果卷的建立出现异常,这个task中也实现了revert方法,这个方法就是根据新建立的卷的大小等信息,从改变后的资源配额预留信息中减去新建立卷的大小等信息,就可以实现恢复卷的资源配额预留信息到新卷的建立之前的状态;
来看类QuotaCommitTask的具体代码:
- class QuotaCommitTask(base.CinderTask):
- """
- 提交新的资源配额的预留信息到数据库中;
- """
- def __init__(self):
- super(QuotaCommitTask, self).__init__(addons=[ACTION])
- self.requires.update(['reservations', 'volume_properties'])
- def __call__(self, context, reservations, volume_properties):
- # 提交新的资源配额的预留信息到数据库中;
- QUOTAS.commit(context, reservations)
- context.quota_committed = True
- return {'volume_properties': volume_properties}
- def revert(self, context, result, cause):
- """
- 如果建立卷出现异常,则执行回滚操作,实现恢复数据库中原有的资源配额预留信息;
- """
- if not result:
- return
- volume = result['volume_properties']
- try:
- reserve_opts = {'volumes': -1, 'gigabytes': -volume['size']}
- # 添加卷的类型选项到reserve_opts;
- QUOTAS.add_volume_type_opts(context,
- reserve_opts,
- volume['volume_type_id'])
- # 检测配额信息和并建立相应的资源配额预留资源;
- reservations = QUOTAS.reserve(context,
- project_id=context.project_id,
- **reserve_opts)
- # 提交资源配额的预留信息到数据库中;
- if reservations:
- QUOTAS.commit(context, reservations,
- project_id=context.project_id)
- except Exception:
- LOG.exception(_("Failed to update quota for deleting volume: %s"),
- volume['id'])
复制代码
上述功能的实现是很好理解的,也就是数据库相关的操作;
2.6 api_flow.add(OnFailureChangeStatusTask(db))
这个task类所实现的功能是当出现错误异常时,设置指定id的卷的状态为ERROR;
来看类OnFailureChangeStatusTask的具体代码:
- class OnFailureChangeStatusTask(base.CinderTask):
- """
- 这个task实现了当出现错误时,设置指定id的卷的状态为ERROR;
- """
- def __init__(self, db):
- super(OnFailureChangeStatusTask, self).__init__(addons=[ACTION])
- self.db = db
- self.requires.update(['volume_id'])
- self.optional.update(['volume_spec'])
- def __call__(self, context, volume_id, volume_spec=None):
- return {
- 'volume_id': volume_id,
- 'volume_spec': volume_spec,
- }
- def revert(self, context, result, cause):
- volume_spec = result.get('volume_spec')
- if not volume_spec:
- volume_spec = _find_result_spec(cause.flow)
- volume_id = result['volume_id']
- _restore_source_status(context, self.db, volume_spec)
- _error_out_volume(context, self.db, volume_id, reason=cause.exc)
- LOG.error(_("Volume %s: create failed"), volume_id)
- exc_info = False
- if all(cause.exc_info):
- exc_info = cause.exc_info
- LOG.error(_('Unexpected build error:'), exc_info=exc_info)
复制代码
2.7 api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db))
这个task类所实现的功能就是根据具体需求调用相关的方法实现卷的建立。这里我先不做具体的分析,后面我会专门写一篇新的博客来对这里的实现进行详细的解析。当然,这个task类中就没有revert方法啦。
OK!上面我对添加到flow中的相关task进行了简单的解析,我们发现这些task类都继承自父类CinderTask,以及祖父类(哈哈哈)Task,我们来看看它们的相关实现:
- class CinderTask(task.Task):
- """
- 所有cinder任务的基类;
- """
- def __init__(self, addons=None):
- # _make_task_name:获取任务类的名称;
- super(CinderTask, self).__init__(_make_task_name(self.__class__, addons))
复制代码
这里我们看一个输出实例,是添加第一个task之后进行测试输出的:
- _make_task_name(self.__class__, addons) = cinder.volume.flows.base.InjectTask;volume:create
复制代码
可见,实现的功能就是获取了当前任务类的类名和任务;
- class Task(object):
- """
- 这里定义了task的抽象的概念,可以用于恢复进程到没有执行操作的状态;
- """
- __metaclass__ = abc.ABCMeta
- def __init__(self, name):
- self.name = name
- self.requires = set()
- self.optional = set()
- self.provides = set()
- self.version = (1, 0)
- def __str__(self):
- return "%s==%s" % (self.name, utils.join(self.version, with_what="."))
- @abc.abstractmethod
- def __call__(self, context, *args, **kwargs):
- raise NotImplementedError()
- def revert(self, context, result, cause):
- pass
复制代码
好了,具体如何实现用于建立新卷的flow的构建的过程已经简单的进行了解析,在下一篇博客中,我会进行如何执行构建好的flow的解析工作,也就是上一篇博客中的第3步骤!
相关文章:
Openstack Cinder中建立volume过程的源码解析(1)
http://www.aboutyun.com/thread-10217-1-1.html
1.cinder中卷的建立的过程中,客户端传递过来的request的执行过程是怎样的?
2.__call__方法都通过什么方法封装?
3.如何调用指定中间件的__call__方法?
Openstack Cinder中建立volume过程的源码解析(2)
http://www.aboutyun.com/thread-10216-1-1.html
1.如何获取要执行的action方法及其相关的扩展方法?
2.Resource类中的__call__(self,request)方法如何实现?
3.meth的作用?
4.如何从request.environ中获取要执行的action方法?
5.如何对body进行反序列化操作?
Openstack Cinder中建立volume过程的源码解析(3)
http://www.aboutyun.com/thread-10215-1-1.html
1.get_serializer的作用?
2.isgeneratorfunction是用来做什么的?
3.什么是特殊的generator方法?
4.进行响应信息的序列化操作的步骤?
5.简述在cinder模块中实现客户端发送过来的请求信息操作的主要的步骤?
Openstack Cinder中建立volume过程的源码解析(4)----以及taskflow相关解析
http://www.aboutyun.com/thread-10214-1-1.html
1.简述cinder是如何实现卷的建立的?
2.简述taskflow库来实现卷的简历过程?
3.如何根据给定id来检索获取单个的卷的类型?
4.如何构建并返回用于建立卷的flow?
5.如何声明flow是否是真的?
6.简述建立卷的flow的步骤?
Openstack Cinder中建立volume过程的源码解析(6)----以及taskflow相关解析
http://www.aboutyun.com/thread-10212-1-1.html
1.如何来运行已经构建好的flow?
2.run的源码如何实现及实现过程是什么?
3.resume_it的实现过程是什么?
4.类Runner的初始化方法是什么?
5.run_it的源码如何实现?
Openstack Cinder中建立volume过程的源码解析(7)----以及taskflow相关解析
http://www.aboutyun.com/thread-10211-1-1.html
1.关于flow中task执行的重要语句的实现基本解析完成,如何实现?
2.如果卷的建立出现异常,则如何执行相关的逆转回滚操作?
Openstack Cinder中建立volume过程的源码解析(8)
http://www.aboutyun.com/thread-10219-1-1.html
1.VolumeCastTask的源码如何实现?
2.远程调用建立新卷的操作,有哪几个步骤?
3.task类VolumeCastTask具体是如何来实现根据请求信息进行卷的建立的?
Openstack Cinder中建立volume过程的源码解析(9)
http://www.aboutyun.com/thread-10210-1-1.html
1.如何实现create_volume的源码?
2.Cast如何实现远程调create_volume?
3.如何实现调用方法self.volume_rpcapi.create_volume来实现在目标主机上新卷的建立?
好晚了啊,明天继续吧,加油!如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
|