分享

Openstack Cinder中建立volume过程的源码解析(9)

shihailong123 发表于 2014-11-22 13:28:29 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 14107
问题导读:


1.如何实现create_volume的源码?

2.Cast如何实现远程调create_volume?
3.如何实现调用方法self.volume_rpcapi.create_volume来实现在目标主机上新卷的建立?









我们在上一篇博客中可以看到,在建立新卷之前,需要获取建立卷的目标主机,经过分析源码我们可以知道,如果没有指定建立新卷的目标主机,需要通过调度器算法实现目标主机的确定,如果指定了建立新卷的目标主机,则直接获取目标主机,无论是哪种情况,都需要调用方法self.volume_rpcapi.create_volume来实现在目标主机上新卷的建立。在这篇博客中,我们就具体来分析这个方法的实现过程。
我们先来看方法create_volume的源码实现:
  1. def create_volume(self, ctxt, volume, host,  
  2.                   request_spec, filter_properties,  
  3.                   allow_reschedule=True,  
  4.                   snapshot_id=None, image_id=None,  
  5.                   source_volid=None):  
  6.     """
  7.     远程调用实现建立并导出卷;
  8.     """  
  9.   
  10.     request_spec_p = jsonutils.to_primitive(request_spec)  
  11.     self.cast(ctxt,  
  12.               self.make_msg('create_volume',  
  13.                             volume_id=volume['id'],  
  14.                             request_spec=request_spec_p,  
  15.                             filter_properties=filter_properties,  
  16.                             allow_reschedule=allow_reschedule,  
  17.                             snapshot_id=snapshot_id,  
  18.                             image_id=image_id,  
  19.                             source_volid=source_volid),  
  20.               # queue_get_for:根据给定的topic和host获取对应的队列名称;  
  21.               topic=rpc.queue_get_for(ctxt,  
  22.                                       self.topic,  
  23.                                       host),  
  24.               version='1.4')  
复制代码



我们可以看到,这里也是应用了广播方法cast实现远程调用方法create_volume,即/cinder/volume/manager.py----class VolumeManager----def create_volume,我们具体来看这个方法的实现源码:
  1. @utils.require_driver_initialized  
  2. def create_volume(self, context, volume_id, request_spec=None,  
  3.                   filter_properties=None, allow_reschedule=True,  
  4.                   snapshot_id=None, image_id=None, source_volid=None):  
  5.     """
  6.     Creates and exports the volume.
  7.     建立并导出卷;
  8.     """  
  9.   
  10.     # 构建并返回用于通过管理器建立卷的flow;  
  11.     flow = create_volume.get_manager_flow(  
  12.         self.db,  
  13.         self.driver,  
  14.         self.scheduler_rpcapi,  
  15.         self.host,  
  16.         volume_id,  
  17.         request_spec=request_spec,  
  18.         filter_properties=filter_properties,  
  19.         allow_reschedule=allow_reschedule,  
  20.         snapshot_id=snapshot_id,  
  21.         image_id=image_id,  
  22.         source_volid=source_volid,  
  23.         reschedule_context=context.deepcopy())  
  24.   
  25.     assert flow, _('Manager volume flow not retrieved')  
  26.   
  27.     # 进行flow的运行操作;  
  28.     flow.run(context.elevated())  
  29.     if flow.state != states.SUCCESS:  
  30.         raise exception.CinderException(_("Failed to successfully complete"  
  31.                                           " manager volume workflow"))  
  32.   
  33.     self._reset_stats()  
  34.     return volume_id  
复制代码

   
可见,这里再一次应用taskflow模式来实现建立并导出卷的操作。我们具体来看方法get_manager_flow的源码实现:
  1. def get_manager_flow(db, driver, scheduler_rpcapi, host, volume_id,  
  2.                      request_spec=None, filter_properties=None,  
  3.                      allow_reschedule=True,  
  4.                      snapshot_id=None, image_id=None, source_volid=None,  
  5.                      reschedule_context=None):  
  6.     """
  7.     Constructs and returns the manager entrypoint flow.
  8.     构建并返回用于通过管理器建立卷的flow;
  9.     flow将会做以下的事情:
  10.     1. 首先要确定我们是否允许进行重新调度,因为这影响了我们如何对出现错误的情况进行处理;
  11.     2. 为相关的task注入keys和values;
  12.     3. 对于出错的task进行处理,发送错误通知,记录错误信息等;
  13.     4. 实现了从输入的参数中提取建立卷的规范信息的操作;
  14.     5. 通知已经开始进行卷的建立操作;
  15.     6. 根据所获取的建立卷的规范信息实现卷的建立操作;
  16.     7. 当成功的建立卷之后,完成卷建立之后的通知操作;
  17.     """  
  18.   
  19.     # flow_name:volume_create_manager;  
  20.     flow_name = ACTION.replace(":", "_") + "_manager"  
  21.     # 获取类Flow的实例化对象;  
  22.     volume_flow = linear_flow.Flow(flow_name)  
  23.   
  24.     # Determine if we are allowed to reschedule since this affects how  
  25.     # failures will be handled.  
  26.     # 首先要确定我们是否允许进行重新调度,因为这影响了我们如何对出现错误的情况进行处理;  
  27.     if not filter_properties:  
  28.         filter_properties = {}  
  29.     if not request_spec and allow_reschedule:  
  30.         LOG.debug(_("No request spec, will not reschedule"))  
  31.         allow_reschedule = False  
  32.     if not filter_properties.get('retry', None) and allow_reschedule:  
  33.         LOG.debug(_("No retry filter property or associated "  
  34.                     "retry info, will not reschedule"))  
  35.         allow_reschedule = False  
  36.   
  37.     # 添加一个给定的task到flow;  
  38.     # 这个类实现了注入字典信息到flow中;  
  39.     volume_flow.add(base.InjectTask({  
  40.         'filter_properties': filter_properties,  
  41.         'image_id': image_id,  
  42.         'request_spec': request_spec,  
  43.         'snapshot_id': snapshot_id,  
  44.         'source_volid': source_volid,  
  45.         'volume_id': volume_id,  
  46.     }, addons=[ACTION]))  
  47.   
  48.     # 如果不允许进行重新调度的操作;  
  49.     if not allow_reschedule:  
  50.         # On failure ensure that we just set the volume status to error.  
  51.         LOG.debug(_("Retry info not present, will not reschedule"))  
  52.         # 添加一个给定的task到flow;  
  53.         # 这个task实现了当出现错误时,设置指定id的卷的状态为ERROR;  
  54.         volume_flow.add(OnFailureChangeStatusTask(db))  
  55.     # 如果允许进行重新调度的操作;  
  56.     else:  
  57.         # 添加一个给定的task到flow;  
  58.         # 触发一个发送进行重新调度的请求,当进行task恢复回滚操作的时候;  
  59.         volume_flow.add(OnFailureRescheduleTask(reschedule_context, db, scheduler_rpcapi))  
  60.   
  61.     # 添加一个给定的task到flow;  
  62.     # 提取一个用于建立卷的通用结构规范;  
  63.     volume_flow.add(ExtractVolumeSpecTask(db))  
  64.     # 添加一个给定的task到flow;  
  65.     # 执行关于给定卷的相关通知操作,获取指定卷的使用率信息,并进行通知操作;  
  66.     volume_flow.add(NotifyVolumeActionTask(db, host, "create.start"))  
  67.     # 添加一个给定的task到flow;  
  68.     # 根据所提供的规范要求实现卷的建立操作;  
  69.     volume_flow.add(CreateVolumeFromSpecTask(db, host, driver))  
  70.     # 添加一个给定的task到flow;  
  71.     # 当成功的建立卷之后,完成卷建立之后的通知操作;  
  72.     volume_flow.add(CreateVolumeOnFinishTask(db, host, "create.end"))  
  73.   
  74.     # 获取flow的调试信息;  
  75.     return flow_utils.attach_debug_listeners(volume_flow)  
复制代码



这里最重要的一个task就是CreateVolumeFromSpecTask,它所实现的操作就是根据所提供的规范要求实现卷的建立。我们具体来看这个类的源码实现:
  1. class CreateVolumeFromSpecTask(base.CinderTask):  
  2.     """
  3.     根据所提供的规范要求实现卷的建立操作;
  4.     """  
  5.   
  6.     def __init__(self, db, host, driver):  
  7.         super(CreateVolumeFromSpecTask, self).__init__(addons=[ACTION])  
  8.         self.db = db  
  9.         self.driver = driver  
  10.         self.requires.update(['volume_spec', 'volume_ref'])  
  11.         self._create_func_mapping = {  
  12.             'raw': self._create_raw_volume,  
  13.             'snap': self._create_from_snapshot,  
  14.             'source_vol': self._create_from_source_volume,  
  15.             'image': self._create_from_image,  
  16.         }  
  17.         self.host = host  
  18.   
  19.     def __call__(self, context, volume_ref, volume_spec):  
  20.         """
  21.         根据所提供的规范要求实现卷的建立操作;
  22.         """  
  23.         if not self.driver.initialized:  
  24.             LOG.error(_("Unable to create volume, driver not initialized"))  
  25.             driver_name = self.driver.__class__.__name__  
  26.             raise exception.DriverNotInitialized(driver=driver_name)  
  27.   
  28.         # 获取建立卷的类型信息;  
  29.         create_type = volume_spec.pop('type', None)  
  30.          
  31.         # 根据具体的建立卷的类型,获取对应的建立卷的方法;  
  32.         # self._create_func_mapping = {  
  33.         #    'raw': self._create_raw_volume,  
  34.         #    'snap': self._create_from_snapshot,  
  35.         #    'source_vol': self._create_from_source_volume,  
  36.         #    'image': self._create_from_image,  
  37.         # }  
  38.         create_functor = self._create_func_mapping.get(create_type)  
  39.          
  40.         if not create_functor:  
  41.             raise exception.VolumeTypeNotFound(volume_type_id=create_type)  
  42.   
  43.         volume_spec = dict(volume_spec)  
  44.         volume_id = volume_spec.pop('volume_id', None)  
  45.         if not volume_id:  
  46.             volume_id = volume_ref['id']  
  47.         LOG.info(_("Volume %(volume_id)s: being created using %(functor)s "  
  48.                    "with specification: %(volume_spec)s") %  
  49.                  {'volume_spec': volume_spec, 'volume_id': volume_id,  
  50.                   'functor': _make_pretty_name(create_functor)})  
  51.   
  52.         volume_ref['host'] = self.host  
  53.   
  54.         # 根据确定的要调用的建立卷的方法,调用这个方法实现指定类型的卷的建立操作;  
  55.         model_update = create_functor(context, volume_ref=volume_ref,  
  56.                                       **volume_spec)  
  57.   
  58.         try:  
  59.             if model_update:  
  60.                 volume_ref = self.db.volume_update(context, volume_ref['id'], model_update)  
  61.         except exception.CinderException as ex:  
  62.             if model_update:  
  63.                 LOG.exception(_("Failed updating model of volume %(volume_id)s"  
  64.                                 " with creation provided model %(model)s") %  
  65.                               {'volume_id': volume_id, 'model': model_update})  
  66.                 raise exception.ExportFailure(reason=ex)  
  67.   
  68.         model_update = None  
  69.         try:  
  70.             LOG.debug(_("Volume %s: creating export"), volume_ref['id'])  
  71.             # 为逻辑卷创建导出接口;  
  72.             model_update = self.driver.create_export(context, volume_ref)  
  73.             if model_update:  
  74.                 self.db.volume_update(context, volume_ref['id'], model_update)  
  75.         except exception.CinderException as ex:  
  76.             if model_update:  
  77.                 LOG.exception(_("Failed updating model of volume %(volume_id)s"  
  78.                               " with driver provided model %(model)s") %  
  79.                               {'volume_id': volume_id, 'model': model_update})  
  80.                 raise exception.ExportFailure(reason=ex)  
复制代码

我们在这个类的初始化方法中可以看到:

        self._create_func_mapping = {
            'raw': self._create_raw_volume,
            'snap': self._create_from_snapshot,
            'source_vol': self._create_from_source_volume,
            'image': self._create_from_image,
        }

这里就指明了建立新卷的四种途径,即直接建立raw格式的新卷、从快照建立新卷、从已有的卷建立新卷和从镜像建立新卷。在上述类的__call__方法中,根据具体情况分别调用了不用的方法实现了新卷的建立,我们来看看这几个建立新卷的方法的源码:


  1. <font color="#000"><font face="微软雅黑"><font size="2">def _create_raw_volume(self, context, volume_ref, **kwargs):  
  2.     """
  3.     实现raw格式卷的建立;
  4.     """  
  5.     return self.driver.create_volume(volume_ref)  
  6.   
  7. def _create_from_snapshot(self, context, volume_ref, snapshot_id,  
  8.                           **kwargs):  
  9.     """
  10.     实现从快照建立卷的操作,并根据具体情况实现对指定卷的glance元数据进行更新操作;
  11.     """  
  12.     volume_id = volume_ref['id']  
  13.     # 获取指定卷的快照;  
  14.     snapshot_ref = self.db.snapshot_get(context, snapshot_id)  
  15.     # 调用具体驱动中的create_volume_from_snapshot方法,实现从快照建立卷;  
  16.     model_update = self.driver.create_volume_from_snapshot(volume_ref,  
  17.                                                            snapshot_ref)  
  18.     make_bootable = False  
  19.     try:  
  20.         # 根据volume_id获取volume;  
  21.         originating_vref = self.db.volume_get(context, snapshot_ref['volume_id'])  
  22.         make_bootable = originating_vref.bootable  
  23.     except exception.CinderException as ex:  
  24.         LOG.exception(_("Failed fetching snapshot %(snapshot_id)s bootable"  
  25.                         " flag using the provided glance snapshot "  
  26.                         "%(snapshot_ref_id)s volume reference") %  
  27.                       {'snapshot_id': snapshot_id,  
  28.                        'snapshot_ref_id': snapshot_ref['volume_id']})  
  29.         raise exception.MetadataUpdateFailure(reason=ex)  
  30.     if make_bootable:  
  31.         # 根据具体情况实现对指定卷的glance元数据进行更新操作;  
  32.         self._handle_bootable_volume_glance_meta(context, volume_id,  
  33.                                                  snapshot_id=snapshot_id)  
  34.     return model_update  
  35.   
  36. def _create_from_source_volume(self, context, volume_ref,  
  37.                                source_volid, **kwargs):  
  38.     """
  39.     实现从源卷建立(实际上就是直接拷贝)卷的操作;
  40.     """         
  41.     # 根据source_volid获取卷的信息;  
  42.     srcvol_ref = self.db.volume_get(context, source_volid)  
  43.     # 创建指定卷的克隆;  
  44.     model_update = self.driver.create_cloned_volume(volume_ref, srcvol_ref)  
  45.       
  46.     # 根据具体情况实现对指定卷的glance元数据进行更新操作;  
  47.     if srcvol_ref.bootable:  
  48.         self._handle_bootable_volume_glance_meta(context, volume_ref['id'],  
  49.                                                  source_volid=source_volid)  
  50.     return model_update  
  51.   
  52. def _create_from_image(self, context, volume_ref,  
  53.                        image_location, image_id, image_meta,  
  54.                        image_service, **kwargs):  
  55.     """
  56.     从镜像实现卷的建立;
  57.     """  
  58.     LOG.debug(_("Cloning %(volume_id)s from image %(image_id)s "  
  59.                 " at location %(image_location)s") %  
  60.               {'volume_id': volume_ref['id'],  
  61.                'image_location': image_location, 'image_id': image_id})  
  62.       
  63.     # 从现有的镜像有效的建立一个卷;  
  64.     model_update, cloned = self.driver.clone_image(volume_ref, image_location, image_id)  
  65.       
  66.     # 如果没有实现克隆,说明没有指定的镜像;  
  67.     # 实现建立卷,并下载镜像数据到卷中;  
  68.     if not cloned:  
  69.         # 实现建立卷,并下载镜像数据到卷中;  
  70.         model_update = self.driver.create_volume(volume_ref)  
  71.         updates = dict(model_update or dict(), status='downloading')  
  72.         # 更新卷的状态;  
  73.         try:  
  74.             volume_ref = self.db.volume_update(context,  
  75.                                                volume_ref['id'], updates)  
  76.         except exception.CinderException:  
  77.             LOG.exception(_("Failed updating volume %(volume_id)s with "  
  78.                             "%(updates)s") %  
  79.                           {'volume_id': volume_ref['id'],  
  80.                            'updates': updates})  
  81.         # 下载glance镜像数据到指定的卷;  
  82.         self._copy_image_to_volume(context, volume_ref, image_id, image_location, image_service)  
  83.   
  84.     # 根据具体情况实现对指定卷的glance元数据进行更新操作;  
  85.     self._handle_bootable_volume_glance_meta(context, volume_ref['id'],  
  86.                                              image_id=image_id,  
  87.                                              image_meta=image_meta)  
  88.     return model_update  </font></font></font>
复制代码



再来看这几个方法的源码,不同的方法中会进一步调用不同的方法来实现新卷的建立,这就直接与/cinder/volume/drivers中的不同的块存储后端实现直接联系到一起了,具体调用的是那一种块存储器中的建立卷的方法,就是由self.driver所确定的。


OK!到此为止,cinder中建立新卷的整体流程的源码分析已经全部完成,其实我想说的一句话就是,如果真的把这个流程的实现过程搞清楚,那么cinder模块的源码也就基本掌握了



相关文章:


Openstack Cinder中建立volume过程的源码解析(1)
http://www.aboutyun.com/thread-10217-1-1.html
1.cinder中卷的建立的过程中,客户端传递过来的request的执行过程是怎样的?
2.__call__方法都通过什么方法封装?
3.如何调用指定中间件的__call__方法?


Openstack Cinder中建立volume过程的源码解析(2)
http://www.aboutyun.com/thread-10216-1-1.html
1.如何获取要执行的action方法及其相关的扩展方法?
2.Resource类中的__call__(self,request)方法如何实现?
3.meth的作用?
4.如何从request.environ中获取要执行的action方法?
5.如何对body进行反序列化操作?


Openstack Cinder中建立volume过程的源码解析(3)
http://www.aboutyun.com/thread-10215-1-1.html
1.get_serializer的作用?
2.isgeneratorfunction是用来做什么的?
3.什么是特殊的generator方法?
4.进行响应信息的序列化操作的步骤?
5.简述在cinder模块中实现客户端发送过来的请求信息操作的主要的步骤?

Openstack Cinder中建立volume过程的源码解析(4)----以及taskflow相关解析
http://www.aboutyun.com/thread-10214-1-1.html
1.简述cinder是如何实现卷的建立的?
2.简述taskflow库来实现卷的简历过程?
3.如何根据给定id来检索获取单个的卷的类型?
4.如何构建并返回用于建立卷的flow?
5.如何声明flow是否是真的?
6.简述建立卷的flow的步骤?


Openstack Cinder中建立volume过程的源码解析(5)----以及taskflow相关解析
http://www.aboutyun.com/thread-10213-1-1.html
1.如何实现Flow类的初始化的?
2.用于卷的建立的flow中都添加了哪些task?
3.ExtractVolumeRequestTask类的作用是什么?
4.如何完全或部分的重置flow的内部的状态?
5.如何从给定的卷中提取卷的id信息?
6.OnFailureChangeStatusTask类的作用?

Openstack Cinder中建立volume过程的源码解析(6)----以及taskflow相关解析
http://www.aboutyun.com/thread-10212-1-1.html
1.如何来运行已经构建好的flow?
2.run的源码如何实现及实现过程是什么?
3.resume_it的实现过程是什么?
4.类Runner的初始化方法是什么?
5.run_it的源码如何实现?

Openstack Cinder中建立volume过程的源码解析(7)----以及taskflow相关解析
http://www.aboutyun.com/thread-10211-1-1.html
1.关于flow中task执行的重要语句的实现基本解析完成,如何实现?
2.如果卷的建立出现异常,则如何执行相关的逆转回滚操作?

Openstack Cinder中建立volume过程的源码解析(8)
http://www.aboutyun.com/thread-10219-1-1.html
1.VolumeCastTask的源码如何实现?
2.远程调用建立新卷的操作,有哪几个步骤?
3.task类VolumeCastTask具体是如何来实现根据请求信息进行卷的建立的?






转自:http://blog.csdn.net/gaoxingnengjisuan/article/details/23794279






没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条