分享

Openstack Cinder中建立volume过程的源码解析(6)----以及taskflow相关解析

shihailong123 发表于 2014-11-22 13:29:12 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 13965
问题导读:
1.如何来运行已经构建好的flow?
2.run的源码如何实现及实现过程是什么?
3.resume_it的实现过程是什么?
4.类Runner的初始化方法是什么?
5.run_it的源码如何实现?





Openstack Cinder中建立volume过程的源码解析(4)----以及taskflow相关解析中我们曾经说过,在方法/cinder/volume/api.py----class
API----def create中,主要通过4步实现了应用taskflow进行卷的建立操作,也就是:
1.构建字典create_what,实现整合建立卷的具体参数;
2.构建并返回用于建立卷的flow;
3.执行构建的用于建立卷的flow;
4.从flow中获取建立卷的反馈信息;
其中前2个步骤的解析已经完成,这里来看第3个步骤的实现,即如何来运行已经构建好的flow;
我们再来看方法/cinder/volume/api.py----class API----def create的源码实现:
  1. def create(self, context, size, name, description, snapshot=None,   
  2.            image_id=None, volume_type=None, metadata=None,   
  3.            availability_zone=None, source_volume=None,   
  4.            scheduler_hints=None, backup_source_volume=None):   
  5.     """  
  6.     实现建立卷的操作;  
  7.     """   
  8.    
  9.     def check_volume_az_zone(availability_zone):   
  10.         """  
  11.         验证availability_zone是否是可用的(即是否包含在可用zone的列表中);  
  12.         """   
  13.         try:   
  14.             # _valid_availabilty_zone:验证availability_zone是否是可用的(即是否包含在可用zone的列表中);   
  15.             return self._valid_availabilty_zone(availability_zone)   
  16.         except exception.CinderException:   
  17.             LOG.exception(_("Unable to query if %s is in the "   
  18.                             "availability zone set"), availability_zone)   
  19.             return False   
  20.    
  21.     # 所要建立卷的规格数据信息;   
  22.     create_what = {   
  23.         'size': size,   
  24.         'name': name,   
  25.         'description': description,   
  26.         'snapshot': snapshot,   
  27.         'image_id': image_id,   
  28.         'volume_type': volume_type,   
  29.         'metadata': metadata,   
  30.         'availability_zone': availability_zone,   
  31.         'source_volume': source_volume,   
  32.         'scheduler_hints': scheduler_hints,   
  33.         'key_manager': self.key_manager,   
  34.         'backup_source_volume': backup_source_volume,   
  35.     }   
  36.         
  37.     # 构建并返回用于建立卷的flow;   
  38.     # self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI();   
  39.     # self.volume_rpcapi = volume_rpcapi.VolumeAPI();   
  40.     # self.image_service = (image_service or glance.get_default_image_service())   
  41.     # check_volume_az_zone:验证availability_zone是否是可用的(即是否包含在可用zone的列表中);   
  42.     # create_what:所要建立卷的规格数据信息;   
  43.     (flow, uuid) = create_volume.get_api_flow(self.scheduler_rpcapi,   
  44.                                               self.volume_rpcapi,   
  45.                                               self.db,   
  46.                                               self.image_service,   
  47.                                               check_volume_az_zone,   
  48.                                               create_what)   
  49.    
  50.     # 应用assert关键字来声明flow是真的;   
  51.     assert flow, _('Create volume flow not retrieved')   
  52.         
  53.         
  54.     # 运行用于建立卷的flow;   
  55.     flow.run(context)   
  56.         
  57.     # 如果flow的运行状态不为states.SUCCESS,则引发异常;   
  58.     if flow.state != states.SUCCESS:   
  59.         raise exception.CinderException(_("Failed to successfully complete"   
  60.                                           " create volume workflow"))   
  61.    
  62.     # Extract the volume information from the task uuid that was specified   
  63.     # to produce said information.   
  64.     # 通过task的uuid值获取建立卷的信息;   
  65.     volume = None   
  66.     try:   
  67.         volume = flow.results[uuid]['volume']   
  68.     except KeyError:   
  69.         pass   
  70.    
  71.     # Raise an error, nobody provided it??   
  72.     # 应用assert关键字来声明volume是真的;   
  73.     assert volume, _('Expected volume result not found')   
  74.         
  75.     return volume   
复制代码


我们来看语句:
flow.run(context)
就实现了构建的flow的执行操作;
我们具体来看方法run的源码实现:
  1. def run(self, context, *args, **kwargs):  
  2.       """
  3.       工作流(workflow)的执行操作;
  4.       context = <cinder.context.RequestContext object at 0x382fd50> //从cinder请求中获取上下文环境信息;
  5.       args = ()
  6.       kwargs = {}
  7.       """  
  8.         super(Flow, self).run(context, *args, **kwargs)  
  9.   
  10.         def resume_it():  
  11.             # self._leftoff_at = None  
  12.             if self._leftoff_at is not None:  
  13.                 return ([], self._leftoff_at)  
  14.               
  15.             # self.resumer = None  
  16.             # 注:这里还没有应用恢复策略,因为类的初始化过程中赋值为None;  
  17.             if self.resumer:  
  18.                 # self._ordering():获取迭代器包装的任务运行列表;  
  19.                 (finished, leftover) = self.resumer.resume(self, self._ordering())  
  20.             else:  
  21.                 finished = []  
  22.                 # self._ordering():获取迭代器包装的任务运行列表;  
  23.                 leftover = self._ordering()  
  24.               
  25.             # leftover:获取迭代器包装的任务运行列表;  
  26.             # finished = []  
  27.             # leftover = <listiterator object at 0x441fa50>  
  28.             return (finished, leftover)  
  29.   
  30.         # 改变目前的flow状态为新的状态STARTED,并执行通知操作;  
  31.         # flow状态标志为STARTED,表示任务开始运行操作;  
  32.         self._change_state(context, states.STARTED)  
  33.         try:  
  34.             # leftover:获取迭代器包装的任务运行列表;  
  35.             # those_finished = []  
  36.             # leftover = <listiterator object at 0x40c1990>  
  37.             those_finished, leftover = resume_it()  
  38.         except Exception:  
  39.             with excutils.save_and_reraise_exception():  
  40.                 self._change_state(context, states.FAILURE)  
  41.   
  42.         def run_it(runner, failed=False, result=None, simulate_run=False):  
  43.             
  44.             try:  
  45.                 # Add the task to be rolled back *immediately* so that even if  
  46.                 # the task fails while producing results it will be given a  
  47.                 # chance to rollback.  
  48.                   
  49.                 # RollbackTask:实现调用任务对应的可用的逆转回滚方法;  
  50.                 # runner.task = cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0  
  51.                 rb = utils.RollbackTask(context, runner.task, result=None)  
  52.                   
  53.                 # 在回滚方法累加器中添加逆转回滚方法任务;  
  54.                 self._accumulator.add(rb)  
  55.   
  56.                 self.task_notifier.notify(states.STARTED, details={  
  57.                     'context': context,  
  58.                     'flow': self,  
  59.                     'runner': runner,  
  60.                 })  
  61.                   
  62.                 # simulate_run = False  
  63.                 if not simulate_run:  
  64.                     result = runner(context, *args, **kwargs)  
  65.   
  66.                 else:  
  67.                     if failed:  
  68.                         if not result:  
  69.                             result = "%s failed running." % (runner.task)  
  70.                         if isinstance(result, basestring):  
  71.                             result = exc.InvalidStateException(result)  
  72.                         if not isinstance(result, Exception):  
  73.                             LOG.warn("Can not raise a non-exception"  
  74.                                      " object: %s", result)  
  75.                             result = exc.InvalidStateException()  
  76.                         raise result  
  77.                 rb.result = result  
  78.                 runner.result = result  
  79.                 self.results[runner.uuid] = result  
  80.                   
  81.                 self.task_notifier.notify(states.SUCCESS, details={  
  82.                     'context': context,  
  83.                     'flow': self,  
  84.                     'runner': runner,  
  85.                 })  
  86.             except Exception as e:  
  87.                 runner.result = e  
  88.                 cause = utils.FlowFailure(runner, self, e)  
  89.                 with excutils.save_and_reraise_exception():  
  90.                     self.task_notifier.notify(states.FAILURE, details={  
  91.                         'context': context,  
  92.                         'flow': self,  
  93.                         'runner': runner,  
  94.                     })  
  95.                     self.rollback(context, cause)  
  96.   
  97.         # those_finished = []  
  98.         if len(those_finished):  
  99.             self._change_state(context, states.RESUMING)  
  100.             for (r, details) in those_finished:  
  101.                 failed = states.FAILURE in details.get('states', [])  
  102.                 result = details.get('result')  
  103.                 run_it(r, failed=failed, result=result, simulate_run=True)  
  104.   
  105.         # leftover:获取迭代器包装的任务运行列表;  
  106.         # leftover = <listiterator object at 0x40c1990>  
  107.         self._leftoff_at = leftover  
  108.         # 改变目前的flow状态为新的状态RUNNING,并执行通知操作;  
  109.         self._change_state(context, states.RUNNING)  
  110.                  
  111.         # 如果状态为中断,则返回;  
  112.         if self.state == states.INTERRUPTED:  
  113.             return  
  114.   
  115.         # 标志任务运行状态不为states.INTERRUPTED;  
  116.         was_interrupted = False  
  117.          
  118.         # leftover:获取迭代器包装的任务运行列表;  
  119.         # leftover = <listiterator object at 0x40c1990>  
  120.         for r in leftover:  
  121.             r.reset()  
  122.             run_it(r)  
  123.             if self.state == states.INTERRUPTED:  
  124.                 was_interrupted = True  
  125.                 break  
  126.   
  127.         if not was_interrupted:  
  128.             # Only gets here if everything went successfully.  
  129.             self._change_state(context, states.SUCCESS)  
  130.             self._leftoff_at = None  
复制代码



我们来分析这个方法的实现过程;
首先来看语句:
try:
    # leftover:获取迭代器包装的任务运行列表;
    # those_finished = []
    # leftover =
    those_finished, leftover = resume_it()

这部分语句的功能是实现获取迭代器包装的task列表,以及已经完成的task列表;
我们具体来看方法resume_it的实现过程:
  1. def resume_it():  
  2.     # self._leftoff_at = None  
  3.     if self._leftoff_at is not None:  
  4.         return ([], self._leftoff_at)  
  5.       
  6.     # self.resumer = None  
  7.     # 注:这里还没有应用恢复策略,因为类的初始化过程中赋值为None;  
  8.     if self.resumer:  
  9.         # self._ordering():获取迭代器包装的任务运行列表;  
  10.         (finished, leftover) = self.resumer.resume(self, self._ordering())  
  11.     else:  
  12.         finished = []  
  13.         # self._ordering():获取迭代器包装的任务运行列表;  
  14.         leftover = self._ordering()  
  15.       
  16.     # leftover:获取迭代器包装的任务运行列表;  
  17.     # finished = []  
  18.     # leftover = <listiterator object at 0x441fa50>  
  19.     return (finished, leftover)  
复制代码



我们再来看方法_ordering的源码实现:
  1. def _ordering(self):         
  2.         # 返回迭代器包装的任务运行列表;  
  3.         return iter(self._connect())  
  4.   
  5.     def _connect(self):  
  6.         # self._runners:所有要运行的任务集合;  
  7.         # self._connected = False  
  8.         if self._connected:  
  9.             return self._runners  
  10.          
  11.         for r in self._runners:  
  12.             r.providers = {}  
  13.               
  14.         for r in reversed(self._runners):  
  15.             self._associate_providers(r)  
  16.         self._connected = True  
  17.          
  18.         return self._runners
复制代码


我们可以看到在方法_connect中,由类Flow的初始化方法中可以知道,变量self._runners表示的是所有要运行的任务的集合;我们在语句for r in reversed(self._runners)下面添加了一条输出调试语句,获得以下的输出实例:
  1. r = Runner: cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0;
  2.     r-a41753ed-a9b4-4999-8aab-979d244425d1; 1.0
  3. r = Runner: cinder.volume.flows.create_volume.OnFailureChangeStatusTask;volume:create==1.0;
  4.     r-c689085f-df0a-4459-a8a9-c2518c6e471c; 1.0
  5. r = Runner: cinder.volume.flows.create_volume.QuotaCommitTask;volume:create==1.0;
  6.     r-adea0991-6fc9-4a74-9c2f-33848c77b29c; 1.0
  7. r = Runner: cinder.volume.flows.create_volume.EntryCreateTask;volume:create==1.0;
  8.     r-c05d7892-7cb0-4e80-b17b-5777ca0ae167; 1.0
  9. r = Runner: cinder.volume.flows.create_volume.QuotaReserveTask;volume:create==1.0;
  10.     r-4d1c0c90-b6ac-4406-a3ae-50943fb22cd3; 1.0
  11. r = Runner: cinder.volume.flows.create_volume.ExtractVolumeRequestTask;volume:create==1.0;
  12.     r-d7175a90-de39-40c2-853f-462a4c566d8f; 1.0
  13. r = Runner: cinder.volume.flows.base.InjectTask;volume:create==1.0;
  14.     r-77f89843-25d6-4dc1-b8c5-4d13816b62fa; 1.0
复制代码



我们可以看到此时,self._runners中集合了所有要运行的task,当然这里暂时调用了reversed方法进行了逆序排序的操作,是为了下面的验证操作做准备的,而self._runners中task的排序还是顺序的;输出实例:
  1. self._runners = [<cinder.taskflow.utils.Runner object at 0x2ca86d0>,
  2.                  <cinder.taskflow.utils.Runner object at 0x2ca8750>,
  3.                  <cinder.taskflow.utils.Runner object at 0x2ca87d0>,
  4.                  <cinder.taskflow.utils.Runner object at 0x2ca8850>,
  5.                  <cinder.taskflow.utils.Runner object at 0x2ca88d0>,
  6.                  <cinder.taskflow.utils.Runner object at 0x2ca8950>,
  7.                  <cinder.taskflow.utils.Runner object at 0x28e6490>]
复制代码


所以获取了task的集合的变量self._runners,再经过_ordering方法的迭代器包装,最后返回给变量leftover;
下面我们再来看语句:
for r in leftover:
    r.reset()
    run_it(r)
显然,这里完成的就是按照task集合中的顺序,逐个执行所要执行的task,从而成功实现卷的建立;
从上面的调试输出我们可以知道,self._runners为cinder.taskflow.utils.Runner对象的集合,即为要运行的任务的集合,所以这里我们来看一下类Runner的初始化方法:
  1. class Runner(object):  
  2.     def __init__(self, task, uuid=None):  
  3.         assert isinstance(task, collections.Callable)  
  4.         self.task = task  
  5.         self.providers = {}  
  6.         self.runs_before = []  
  7.         self.result = None  
  8.         if not uuid:  
  9.             self._id = uuidutils.generate_uuid()  
  10.         else:  
  11.             self._id = str(uuid)  
复制代码



所以我们在这里可以做以下的调试输出,具体看一下各个变量的作用,这有利于后续代码的理解:
  1. ========================================================================================
  2. r = Runner: cinder.volume.flows.base.InjectTask;
  3.     volume:create==1.0;
  4.     r-dfc38a9a-a67f-44b9-8297-48dc586806ce; 1.0
  5. r.task = cinder.volume.flows.base.InjectTask;volume:create==1.0 # 表示当前任务的类名和任务描述;
  6. r.providers = {}
  7. r.runs_before = [] # 表示当前任务之前的所有任务集合;
  8. r.result = None
  9. r._id = dfc38a9a-a67f-44b9-8297-48dc586806ce
  10. ========================================================================================
  11. r = Runner: cinder.volume.flows.create_volume.ExtractVolumeRequestTask;
  12.     volume:create==1.0;
  13.     r-920b4ad9-edd8-436b-bfce-18f940d714a7; 1.0
  14. r.task = cinder.volume.flows.create_volume.ExtractVolumeRequestTask;volume:create==1.0
  15. r.providers = {'backup_source_volume': <cinder.taskflow.utils.Runner object at 0x3a28650>,
  16.                'availability_zone': <cinder.taskflow.utils.Runner object at 0x3a28650>,
  17.                'source_volume': <cinder.taskflow.utils.Runner object at 0x3a28650>,
  18.                'volume_type': <cinder.taskflow.utils.Runner object at 0x3a28650>,
  19.                'image_id': <cinder.taskflow.utils.Runner object at 0x3a28650>,
  20.                'snapshot': <cinder.taskflow.utils.Runner object at 0x3a28650>,
  21.                'size': <cinder.taskflow.utils.Runner object at 0x3a28650>,
  22.                'key_manager': <cinder.taskflow.utils.Runner object at 0x3a28650>,
  23.                'metadata': <cinder.taskflow.utils.Runner object at 0x3a28650>}
  24. r.runs_before = [<cinder.taskflow.utils.Runner object at 0x3a28650>]
  25. r.result = None
  26. r._id = 920b4ad9-edd8-436b-bfce-18f940d714a7
  27. ========================================================================================
  28. r = Runner: cinder.volume.flows.create_volume.QuotaReserveTask;
  29.     volume:create==1.0;
  30.     r-49eb28ff-4da0-4e17-bf89-c9d82abc4eb8; 1.0
  31. r.task = cinder.volume.flows.create_volume.QuotaReserveTask;volume:create==1.0
  32. r.providers = {'volume_type_id': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
  33.                'size': <cinder.taskflow.utils.Runner object at 0x3a286d0>}
  34. r.runs_before = [<cinder.taskflow.utils.Runner object at 0x3a286d0>,
  35.                  <cinder.taskflow.utils.Runner object at 0x3a28650>]
  36. r.result = None
  37. r._id = 49eb28ff-4da0-4e17-bf89-c9d82abc4eb8
  38. ========================================================================================
  39. r = Runner: cinder.volume.flows.create_volume.EntryCreateTask;
  40.     volume:create==1.0;
  41.     r-b338cd2d-282b-4da2-9f10-7cd79f9de18d; 1.0
  42. r.task = cinder.volume.flows.create_volume.EntryCreateTask;volume:create==1.0
  43. r.providers = {'size': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
  44.                'volume_type_id': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
  45.                'description': <cinder.taskflow.utils.Runner object at 0x3a28650>,
  46.                'availability_zone': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
  47.                'reservations': <cinder.taskflow.utils.Runner object at 0x3a28750>,
  48.                'source_volid': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
  49.                'encryption_key_id': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
  50.                'snapshot_id': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
  51.                'metadata': <cinder.taskflow.utils.Runner object at 0x3a28650>,
  52.                'name': <cinder.taskflow.utils.Runner object at 0x3a28650>}
  53. r.runs_before = [<cinder.taskflow.utils.Runner object at 0x3a28750>,
  54.                  <cinder.taskflow.utils.Runner object at 0x3a286d0>,
  55.                  <cinder.taskflow.utils.Runner object at 0x3a28650>]
  56. r.result = None
  57. r._id = b338cd2d-282b-4da2-9f10-7cd79f9de18d
  58. ========================================================================================
  59. r = Runner: cinder.volume.flows.create_volume.QuotaCommitTask;
  60.     volume:create==1.0;
  61.     r-11bfeedf-6cc7-46f6-b257-472114132cb5; 1.0
  62. r.task = cinder.volume.flows.create_volume.QuotaCommitTask;volume:create==1.0
  63. r.providers = {'reservations': <cinder.taskflow.utils.Runner object at 0x3a28750>,
  64.                'volume_properties': <cinder.taskflow.utils.Runner object at 0x3a287d0>}
  65. r.runs_before = [<cinder.taskflow.utils.Runner object at 0x3a287d0>,
  66.                  <cinder.taskflow.utils.Runner object at 0x3a28750>,
  67.                  <cinder.taskflow.utils.Runner object at 0x3a286d0>,
  68.                  <cinder.taskflow.utils.Runner object at 0x3a28650>]
  69. r.result = None
  70. r._id = 11bfeedf-6cc7-46f6-b257-472114132cb5
  71. ========================================================================================
  72. r = Runner: cinder.volume.flows.create_volume.OnFailureChangeStatusTask;
  73.     volume:create==1.0;
  74.     r-255b5031-abb0-4c02-9c1d-ddb59fe4066b; 1.0
  75. r.task = cinder.volume.flows.create_volume.OnFailureChangeStatusTask;volume:create==1.0
  76. r.providers = {'volume_id': <cinder.taskflow.utils.Runner object at 0x3a287d0>}
  77. r.runs_before = [<cinder.taskflow.utils.Runner object at 0x3a28850>,
  78.                  <cinder.taskflow.utils.Runner object at 0x3a287d0>,
  79.                  <cinder.taskflow.utils.Runner object at 0x3a28750>,
  80.                  <cinder.taskflow.utils.Runner object at 0x3a286d0>,
  81.                  <cinder.taskflow.utils.Runner object at 0x3a28650>]
  82. r.result = None
  83. r._id = 255b5031-abb0-4c02-9c1d-ddb59fe4066b
  84. ========================================================================================
  85. r = Runner: cinder.volume.flows.create_volume.VolumeCastTask;
  86.     volume:create==1.0;
  87.     r-68318d88-aecb-4ac4-b80d-26f6e91b9b95; 1.0
  88. r.task = cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0
  89. r.providers = {'image_id': <cinder.taskflow.utils.Runner object at 0x3a28650>,
  90.                'snapshot_id': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
  91.                'source_volid': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
  92.                'volume_id': <cinder.taskflow.utils.Runner object at 0x3a287d0>,
  93.                'volume_properties': <cinder.taskflow.utils.Runner object at 0x3a287d0>,
  94.                'volume_type': <cinder.taskflow.utils.Runner object at 0x3a286d0>,
  95.                'scheduler_hints': <cinder.taskflow.utils.Runner object at 0x3a28650>}
  96. r.runs_before = [<cinder.taskflow.utils.Runner object at 0x3a288d0>,
  97.                  <cinder.taskflow.utils.Runner object at 0x3a28850>,
  98.                  <cinder.taskflow.utils.Runner object at 0x3a287d0>,
  99.                  <cinder.taskflow.utils.Runner object at 0x3a28750>,
  100.                  <cinder.taskflow.utils.Runner object at 0x3a286d0>,
  101.                  <cinder.taskflow.utils.Runner object at 0x3a28650>]
  102. r.result = None
  103. r._id = 68318d88-aecb-4ac4-b80d-26f6e91b9b95
  104. ========================================================================================
复制代码


我们接着来看方法run_it的源码实现:
  1. def run_it(runner, failed=False, result=None, simulate_run=False):  
  2. """
  3. 以其中之一为例:
  4. runner = Runner: cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0;  
  5.           r-68318d88-aecb-4ac4-b80d-26f6e91b9b95; 1.0
  6. runner.task = cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0
  7. runner.providers = {'image_id': <cinder.taskflow.utils.Runner object at 0x3a28650>,  
  8.                    'snapshot_id': <cinder.taskflow.utils.Runner object at 0x3a286d0>,  
  9.                    'source_volid': <cinder.taskflow.utils.Runner object at 0x3a286d0>,  
  10.                    'volume_id': <cinder.taskflow.utils.Runner object at 0x3a287d0>,  
  11.                    'volume_properties': <cinder.taskflow.utils.Runner object at 0x3a287d0>,  
  12.                    'volume_type': <cinder.taskflow.utils.Runner object at 0x3a286d0>,  
  13.                    'scheduler_hints': <cinder.taskflow.utils.Runner object at 0x3a28650>}
  14. runner.runs_before = [<cinder.taskflow.utils.Runner object at 0x3a288d0>,  
  15.                      <cinder.taskflow.utils.Runner object at 0x3a28850>,  
  16.                      <cinder.taskflow.utils.Runner object at 0x3a287d0>,  
  17.                      <cinder.taskflow.utils.Runner object at 0x3a28750>,  
  18.                      <cinder.taskflow.utils.Runner object at 0x3a286d0>,  
  19.                      <cinder.taskflow.utils.Runner object at 0x3a28650>]
  20. runner.result = None
  21. runner._id = 68318d88-aecb-4ac4-b80d-26f6e91b9b95
  22. runner = Runner: cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0;  
  23.           r-6865ee0d-c5dc-4b62-972b-6ad3af46d147; 1.0
  24. failed = False
  25. result = None
  26. simulate_run = False
  27. """  
  28.     try:                  
  29.         # RollbackTask:实现调用任务对应的可用的逆转回滚方法;  
  30.         # runner.task = cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0  
  31.         rb = utils.RollbackTask(context, runner.task, result=None)  
  32.          
  33.         # 在回滚方法累加器中添加逆转回滚方法任务;  
  34.         self._accumulator.add(rb)  
  35.          
  36.         self.task_notifier.notify(states.STARTED, details={  
  37.             'context': context,  
  38.             'flow': self,  
  39.             'runner': runner,  
  40.              })  
  41.          
  42.         # simulate_run = False  
  43.         if not simulate_run:  
  44.             result = runner(context, *args, **kwargs)  
  45. yle="font-family:KaiTi_GB2312;">                </span>else:  
  46.             if failed:  
  47.                 if not result:  
  48.                     result = "%s failed running." % (runner.task)  
  49.                 if isinstance(result, basestring):  
  50.                     result = exc.InvalidStateException(result)  
  51.                 if not isinstance(result, Exception):  
  52.                     LOG.warn("Can not raise a non-exception"  
  53.                              " object: %s", result)  
  54.                     result = exc.InvalidStateException()  
  55.                 raise result  
  56.         rb.result = result  
  57.         runner.result = result  
  58.         self.results[runner.uuid] = result  
  59.          
  60.         self.task_notifier.notify(states.SUCCESS, details={  
  61.             'context': context,  
  62.             'flow': self,  
  63.             'runner': runner,  
  64.               })  
  65.     except Exception as e:  
  66.         runner.result = e  
  67.         cause = utils.FlowFailure(runner, self, e)  
  68.         with excutils.save_and_reraise_exception():  
  69.             # Notify any listeners that the task has errored.  
  70.             self.task_notifier.notify(states.FAILURE, details={  
  71.                 'context': context,  
  72.                 'flow': self,  
  73.                 'runner': runner,  
  74.                    })  
  75.             self.rollback(context, cause)  
复制代码

        
首先来看语句:
rb = utils.RollbackTask(context, runner.task, result=None)
其中类RollbackTask实现调用任务对应的可用的逆转回滚方法;我们具体来看类RollbackTask的实现代码:
  1. class RollbackTask(object):  
  2.     """
  3.     A helper task that on being called will call the underlying callable
  4.     tasks revert method (if said method exists).
  5.     实现调用任务对应的可用的逆转回滚方法;
  6.     """  
  7.   
  8.     def __init__(self, context, task, result):  
  9.         # task = cinder.volume.flows.create_volume.VolumeCastTask;volume:create==1.0  
  10.         self.task = task  
  11.         self.result = result  
  12.         self.context = context  
  13.   
  14.     def __str__(self):  
  15.         return str(self.task)  
  16.   
  17.     def __call__(self, cause):  
  18.         """
  19.         实现调用任务对应的可用的逆转回滚方法;
  20.         """  
  21.         if ((hasattr(self.task, "revert") and  
  22.              isinstance(self.task.revert, collections.Callable))):  
  23.             #注:这里有几种不同的回滚方法;  
  24.             self.task.revert(self.context, self.result, cause)  
复制代码


类RollbackTask实现调用任务对应的可用的逆转回滚方法,调用task类中对应的revert方法都需要经过这个类中的__call__方法来实现,后面我们会解析到的。
我们再来看语句:
self._accumulator.add(rb)
这条语句实现了在回滚方法累加器中添加逆转回滚方法任务;在类Flow的初始化方法中可以看到,self._accumulator = utils.RollbackAccumulator(),所以我们先来看类RollbackAccumulator的源码实现:
  1. class RollbackAccumulator(object):  
  2.     def __init__(self):  
  3.         self._rollbacks = []  
  4.   
  5.     def add(self, *callables):  
  6.         self._rollbacks.extend(callables)  
  7.   
  8.     def reset(self):  
  9.         self._rollbacks = []  
  10.   
  11.     def __len__(self):  
  12.         return len(self._rollbacks)  
  13.   
  14.     def __iter__(self):  
  15.         # Rollbacks happen in the reverse order that they were added.  
  16.         return reversed(self._rollbacks)  
  17.   
  18.     def __enter__(self):  
  19.         return self  
  20.   
  21.     def rollback(self, cause):  
  22.         LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)  
  23.          
  24.         for (i, f) in enumerate(self):  
  25.             LOG.debug("Calling rollback %s: %s", i + 1, f)  
  26.             try:  
  27.                 f(cause)  
  28.             except Exception:  
  29.                 LOG.exception(("Failed rolling back %s: %s due "  
  30.                                "to inner exception."), i + 1, f)  
  31.   
  32.     def __exit__(self, type, value, tb):  
  33.         if any((value, type, tb)):  
  34.             self.rollback(value)  
复制代码



我们可以看到语句self._accumulator.add(rb)实现了把将要执行的回滚任务添加到变量self._rollbacks中;所以这里可以测试self._accumulator._rollbacks:
  1. self._accumulator._rollbacks = [<cinder.taskflow.utils.RollbackTask object at 0x3ae6d10>]
  2. self._accumulator._rollbacks = [<cinder.taskflow.utils.RollbackTask object at 0x3ae6d10>,
  3.                                 <cinder.taskflow.utils.RollbackTask object at 0x3ae6e50>]
  4. ......
  5. self._accumulator._rollbacks = [<cinder.taskflow.utils.RollbackTask object at 0x3ae6d10>,
  6.                                 <cinder.taskflow.utils.RollbackTask object at 0x3ae6e50>,
  7.                                 <cinder.taskflow.utils.RollbackTask object at 0x3749150>,
  8.                                 <cinder.taskflow.utils.RollbackTask object at 0x3ae6dd0>,
  9.                                 <cinder.taskflow.utils.RollbackTask object at 0x3dac3d0>,
  10.                                 <cinder.taskflow.utils.RollbackTask object at 0x3df2f50>,
  11.                                 <cinder.taskflow.utils.RollbackTask object at 0x3df2610>]
复制代码


我们可以看到,随着执行任务的增加,其对应的回滚任务也是对应增加的,并存储在变量self._accumulator._rollbacks中;
我们再来看语句:
# simulate_run = False
if not simulate_run:
    result = runner(context, *args, **kwargs)

这里实现的就是执行指定的task任务,并获取返回结果;
这里我们可以得到如下的输出实例:
  1. ========================================================================================
  2. runner = Runner: cinder.volume.flows.base.InjectTask;
  3.          volume:create==1.0;
  4.          r-bf0f8b30-c2a0-4c22-ada2-22698adc71b9; 1.0
  5. result = {'backup_source_volume': None,
  6.           'description': None,
  7.           'availability_zone': None,
  8.           'source_volume': None,
  9.           'volume_type': None,
  10.           'name': u'shinian01',
  11.           'image_id': None,
  12.           'metadata': {},
  13.           'snapshot': None,
  14.           'size': 1,
  15.           'key_manager': <cinder.keymgr.conf_key_mgr.ConfKeyManager object at 0x373f790>,
  16.           'scheduler_hints': None}                    
  17. ========================================================================================                            runner = Runner:cinder.volume.flows.create_volume.ExtractVolumeRequestTask;
  18.          volume:create==1.0;
  19.          r-c88edccb-d78b-4cfa-ae68-5d8b72bd173d;1.0
  20. result = {'volume_type_id': None,
  21.           'qos_specs': None,
  22.           'encryption_key_id': None,
  23.           'volume_type': {},
  24.           'snapshot_id': None,
  25.           'availability_zone': 'nova',
  26.           'source_volid': None,
  27.           'size': 1}
  28. ========================================================================================                    
  29. runner = Runner: cinder.volume.flows.create_volume.QuotaReserveTask;
  30.          volume:create==1.0;
  31.          r-4b407b9a-c1a4-4778-8125-751ebe8e3c2b; 1.0
  32. result = {'reservations': ['6fd57073-e9b6-49b9-bed4-a57b82fced51',
  33.                            '365c158b-d291-4ac5-9d2d-5b7fb8a63dd6']}
  34. ========================================================================================                    
  35. runner = Runner: cinder.volume.flows.create_volume.EntryCreateTask;
  36.          volume:create==1.0;
  37.          r-9ee48341-945d-4905-9b1d-31122cc9745e; 1.0
  38. result = {'volume': <cinder.db.sqlalchemy.models.Volume object at 0x3ae6f10>,
  39.           'volume_properties': {'status': 'creating',
  40.                                 'volume_type_id': None,
  41.                                 'user_id': u'ef073287176048bd861dcd9d9c4d9808',
  42.                                 'availability_zone': 'nova',
  43.                                 'reservations': ['6fd57073-e9b6-49b9-bed4-a57b82fced51',
  44.                                                  '365c158b-d291-4ac5-9d2d-5b7fb8a63dd6'],
  45.                                 'volume_admin_metadata': [],
  46.                                 'attach_status': 'detached',
  47.                                 'display_description': None,
  48.                                 'volume_metadata': [],
  49.                                 'metadata': {},
  50.                                 'encryption_key_id': None,
  51.                                 'source_volid': None,
  52.                                 'snapshot_id': None,
  53.                                 'display_name': u'shinian01',
  54.                                 'project_id': u'6c3c74779a614d3b81dd75518824e25c',
  55.                                 'id': '431b1c64-4e43-4731-b052-ff10aefae850',
  56.                                 'size': 1},
  57.           'volume_id': '431b1c64-4e43-4731-b052-ff10aefae850'}
  58. ========================================================================================        
  59. runner = Runner: cinder.volume.flows.create_volume.QuotaCommitTask;
  60.          volume:create==1.0;
  61.          r-50d91e70-67fe-42f3-911f-1f59d4173b8b; 1.0
  62. result = {'volume_properties': {'status': 'creating',
  63.                                 'volume_type_id': None,
  64.                                 'user_id': u'ef073287176048bd861dcd9d9c4d9808',
  65.                                 'availability_zone': 'nova',
  66.                                 'reservations': ['6fd57073-e9b6-49b9-bed4-a57b82fced51',
  67.                                                  '365c158b-d291-4ac5-9d2d-5b7fb8a63dd6'],
  68.                                 'volume_admin_metadata': [],
  69.                                 'attach_status': 'detached',
  70.                                 'display_description': None,
  71.                                 'volume_metadata': [],
  72.                                 'metadata': {},
  73.                                 'encryption_key_id': None,
  74.                                 'source_volid': None,
  75.                                 'snapshot_id': None,
  76.                                 'display_name': u'shinian01',
  77.                                 'project_id': u'6c3c74779a614d3b81dd75518824e25c',
  78.                                 'id': '431b1c64-4e43-4731-b052-ff10aefae850',
  79.                                 'size': 1}}
  80. ========================================================================================      
  81. runner = Runner:cinder.volume.flows.create_volume.OnFailureChangeStatusTask;
  82.          volume:create==1.0;
  83.          r-9cd4bfa9-3170-4713-a6ac-2daf984185b2;1.0
  84. result = {'volume_spec': None,
  85.           'volume_id': '431b1c64-4e43-4731-b052-ff10aefae850'}
  86. ========================================================================================      
  87. runner = Runner: cinder.volume.flows.create_volume.VolumeCastTask;
  88.          volume:create==1.0;
  89.          r-6865ee0d-c5dc-4b62-972b-6ad3af46d147; 1.0
  90. result = None
  91. ========================================================================================
复制代码


可见这里runner指明了要执行的类的__call__方法,即运行先前添加的task任务;
result是运行task任务后获取的结果信息;
具体的task的实现过程需解析各个task任务类中的__call__方法,之前的博客中我们已经进行了简单的解析,这里不再进行赘述;
好了,到这里方法def run(self, context, *args, **kwargs)中关于flow中task执行的重要语句的实现基本解析完成,在下一篇博客中,我将重点解析在这个方法中,如果卷的建立出现异常,则如何执行相关的逆转回滚操作。

相关文章:
Openstack Cinder中建立volume过程的源码解析(1)
http://www.aboutyun.com/thread-10217-1-1.html
1.cinder中卷的建立的过程中,客户端传递过来的request的执行过程是怎样的?
2.__call__方法都通过什么方法封装?
3.如何调用指定中间件的__call__方法?


Openstack Cinder中建立volume过程的源码解析(2)
http://www.aboutyun.com/thread-10216-1-1.html
1.如何获取要执行的action方法及其相关的扩展方法?
2.Resource类中的__call__(self,request)方法如何实现?
3.meth的作用?
4.如何从request.environ中获取要执行的action方法?
5.如何对body进行反序列化操作?


Openstack Cinder中建立volume过程的源码解析(3)
http://www.aboutyun.com/thread-10215-1-1.html
1.get_serializer的作用?
2.isgeneratorfunction是用来做什么的?
3.什么是特殊的generator方法?
4.进行响应信息的序列化操作的步骤?
5.简述在cinder模块中实现客户端发送过来的请求信息操作的主要的步骤?

Openstack Cinder中建立volume过程的源码解析(4)----以及taskflow相关解析
http://www.aboutyun.com/thread-10214-1-1.html
1.简述cinder是如何实现卷的建立的?
2.简述taskflow库来实现卷的简历过程?
3.如何根据给定id来检索获取单个的卷的类型?
4.如何构建并返回用于建立卷的flow?
5.如何声明flow是否是真的?
6.简述建立卷的flow的步骤?


Openstack Cinder中建立volume过程的源码解析(5)----以及taskflow相关解析
http://www.aboutyun.com/thread-10213-1-1.html
1.如何实现Flow类的初始化的?
2.用于卷的建立的flow中都添加了哪些task?
3.ExtractVolumeRequestTask类的作用是什么?
4.如何完全或部分的重置flow的内部的状态?
5.如何从给定的卷中提取卷的id信息?
6.OnFailureChangeStatusTask类的作用?



Openstack Cinder中建立volume过程的源码解析(7)----以及taskflow相关解析
http://www.aboutyun.com/thread-10211-1-1.html
1.关于flow中task执行的重要语句的实现基本解析完成,如何实现?
2.如果卷的建立出现异常,则如何执行相关的逆转回滚操作?

Openstack Cinder中建立volume过程的源码解析(8)
http://www.aboutyun.com/thread-10219-1-1.html
1.VolumeCastTask的源码如何实现?
2.远程调用建立新卷的操作,有哪几个步骤?
3.task类VolumeCastTask具体是如何来实现根据请求信息进行卷的建立的?


Openstack Cinder中建立volume过程的源码解析(9)
http://www.aboutyun.com/thread-10210-1-1.html
1.如何实现create_volume的源码?
2.Cast如何实现远程调create_volume?
3.如何实现调用方法self.volume_rpcapi.create_volume来实现在目标主机上新卷的建立?






转自:http://blog.csdn.net/gaoxingnengjisuan/article/details/23678149


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条