分享

OpenStack基于Libvirt的虚拟化平台调度实现----Nova虚拟机动态迁移源码分析

tntzbzc 发表于 2014-11-18 18:04:21 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 22191
问题导读
1.实现虚拟机动态迁移主要实现的语句是什么?
2.方法_update实现了哪方面的内容?
3.live_migration方法的作用是什么?






我们来解析一下Nova虚拟机动态迁移的实现源码。这里不会再像前面那样对代码进行逐行的详细解析,目的是来整理Nova虚拟机动态迁移的实现步骤和相关源码部分架构。

首先我们来看虚拟机动态迁移在Opentack Nova中的API请求处理函数/nova/api/openstack/compute/contrib/admin_actions.py----def _migrate_live:
  1. def _migrate_live(self, req, id, body):  
  2.         """
  3.         Permit admins to (live) migrate a server to a new host.
  4.         允许管理员实时迁移一个服务到新的主机;
  5.         """  
  6.         context = req.environ["nova.context"]  
  7.         authorize(context, 'migrateLive')  
  8.   
  9.         try:  
  10.             block_migration = body["os-migrateLive"]["block_migration"]  
  11.             disk_over_commit = body["os-migrateLive"]["disk_over_commit"]  
  12.             # 确定服务迁移的目标主机;  
  13.             host = body["os-migrateLive"]["host"]  
  14.         except (TypeError, KeyError):  
  15.             msg = _("host and block_migration must be specified.")  
  16.             raise exc.HTTPBadRequest(explanation=msg)  
  17.   
  18.         try:  
  19.             instance = self.compute_api.get(context, id)  
  20.             # live_migrate:实施地迁移服务到新的主机;  
  21.             self.compute_api.live_migrate(context, instance, block_migration,  
  22.                                           disk_over_commit, host)  
  23.         except (exception.ComputeServiceUnavailable,  
  24.                 exception.InvalidHypervisorType,  
  25.                 exception.UnableToMigrateToSelf,  
  26.                 exception.DestinationHypervisorTooOld) as ex:  
  27.             raise exc.HTTPBadRequest(explanation=ex.format_message())  
  28.         except Exception:  
  29.             if host is None:  
  30.                 msg = _("Live migration of instance %(id)s to another host"  
  31.                         " failed") % locals()  
  32.             else:  
  33.                 msg = _("Live migration of instance %(id)s to host %(host)s"  
  34.                         " failed") % locals()  
  35.             LOG.exception(msg)  
  36.             # Return messages from scheduler  
  37.             raise exc.HTTPBadRequest(explanation=msg)  
  38.   
  39.         return webob.Response(status_int=202)  
复制代码




在这个方法中我们可以看到,实现虚拟机动态迁移主要实现的语句就是:

self.compute_api.live_migrate(context, instance, block_migration, disk_over_commit, host)
我们接着看方法live_migrate的源码:
  1. # check_instance_state:这是一个装饰器,用于在进入API方法之前,检测虚拟机和/或任务的状态;  
  2.     # 如果实例处于错误的状态,将会引发异常;  
  3.     @check_instance_state(vm_state=[vm_states.ACTIVE])  
  4.     def live_migrate(self, context, instance, block_migration,  
  5.                      disk_over_commit, host_name):  
  6.         """
  7.         实时地迁移服务到新的主机;
  8.         """  
  9.         LOG.debug(_("Going to try to live migrate instance to %s"),  
  10.                   host_name or "another host", instance=instance)  
  11.   
  12.         # update:更新实例的记录,如果任务或者虚拟机状态发生改变则发送一个状态更新的通知;  
  13.         # 返回更新后的实例信息;  
  14.         instance = self.update(context, instance,  
  15.                                task_state=task_states.MIGRATING,  
  16.                                expected_task_state=None)  
  17.   
  18.         # 调用call方法实现在主题topic的模式上发送实时迁移虚拟机的消息,并等待回应;  
  19.         self.scheduler_rpcapi.live_migration(context, block_migration,  
  20.                 disk_over_commit, instance, host_name)  
复制代码




这个方法中,主要实现了两部分内容,更新记录中的实例信息和在主题topic的模式上发送实时迁移虚拟机的消息。

我们先来看跟新记录中的实例信息这部分内容,主要是通过调用方法update方法实现的。具体来看方法update的实现源码:
  1. def update(self, context, instance, **kwargs):  
  2.         """
  3.         更新数据存储中的实例;
  4.         :param context: The security context
  5.         安全上下文信息;
  6.         :param instance: The instance to update
  7.         要更新的实例;
  8.          
  9.         # 更新实例的记录,如果任务或者虚拟机状态发生改变则发送一个状态更新的通知;
  10.         # 返回更新后的实例信息;
  11.         """  
  12.          
  13.         # 更新实例的记录,如果任务或者虚拟机状态发生改变则发送一个状态更新的通知;  
  14.         # 返回旧的实例信息和更新后的实例信息;  
  15.         _, updated = self._update(context, instance, **kwargs)  
  16.         return updated  
复制代码



  1. def _update(self, context, instance, **kwargs):  
  2.         # 更新实例的记录,如果任务或者虚拟机状态发生改变则发送一个状态更新的通知;  
  3.         # 返回旧的实例信息和更新后的实例信息;  
  4.          
  5.         # 更新实例的信息;  
  6.         # 返回旧的实例信息和更新后的实例信息;  
  7.         old_ref, instance_ref = self.db.instance_update_and_get_original(context, instance['uuid'], kwargs)  
  8.         # 发送通知,来报告实例中发生的任何改变;  
  9.         notifications.send_update(context, old_ref, instance_ref, service="api")  
  10.   
  11.         return dict(old_ref.iteritems()), dict(instance_ref.iteritems())  
复制代码



方法_update主要实现了两方面的内容,一是实现更新实例信息,二是发送通知,来报告实例中发生的任何改变。

先来看方法instance_update_and_get_original的实现源码:

  1. def instance_update_and_get_original(context, instance_uuid, values):  
  2.     """   
  3.     更新实例的信息;
  4.     返回旧的实例信息和更新后的实例信息;
  5.     """  
  6.     rv = IMPL.instance_update_and_get_original(context, instance_uuid, values)  
  7.     try:  
  8.         # 在top等级的cell上,更新实例;  
  9.         cells_rpcapi.CellsAPI().instance_update_at_top(context, rv[1])  
  10.     except Exception:  
  11.         LOG.exception(_("Failed to notify cells of instance update"))  
  12.     return rv  
复制代码
  1. def instance_update_and_get_original(context, instance_uuid, values):  
  2.     """   
  3.     更新实例的信息;
  4.     返回旧的实例信息和更新后的实例信息;
  5.     """  
  6.     return _instance_update(context, instance_uuid, values,  
  7.                             copy_old_instance=True)  
复制代码


  1. def _instance_update(context, instance_uuid, values, copy_old_instance=False):  
  2.     """
  3.     更新实例的信息;
  4.     返回旧的实例信息和更新后的实例信息;
  5.     """  
  6.       
  7.     # 获取db_session的session;  
  8.     # get_session:返回一个SQLAlchemy session,若没有定义,则新建一个SQLAlchemy session;  
  9.     session = get_session()  
  10.   
  11.     # 如果instance_uuid不是一个UUID值,则引发异常;  
  12.     # is_uuid_like:判断instance_uuid是否是一个UUID值,返回结果;  
  13.     if not uuidutils.is_uuid_like(instance_uuid):  
  14.         raise exception.InvalidUUID(instance_uuid)  
  15.   
  16.     with session.begin():  
  17.         # _instance_get_by_uuid:通过instance_uuid获取一个具体的实例;  
  18.         instance_ref = _instance_get_by_uuid(context, instance_uuid, session=session)  
  19.         if "expected_task_state" in values:  
  20.             # it is not a db column so always pop out  
  21.             expected = values.pop("expected_task_state")  
  22.             if not isinstance(expected, (tuple, list, set)):  
  23.                 expected = (expected,)  
  24.             actual_state = instance_ref["task_state"]  
  25.             if actual_state not in expected:  
  26.                 raise exception.UnexpectedTaskStateError(actual=actual_state,  
  27.                                                          expected=expected)  
  28.   
  29.         instance_hostname = instance_ref['hostname'] or ''  
  30.         if ("hostname" in values and  
  31.                 values["hostname"].lower() != instance_hostname.lower()):  
  32.                 _validate_unique_server_name(context,  
  33.                                              session,  
  34.                                              values['hostname'])  
  35.   
  36.         if copy_old_instance:  
  37.             old_instance_ref = copy.copy(instance_ref)  
  38.         else:  
  39.             old_instance_ref = None  
  40.   
  41.         metadata = values.get('metadata')  
  42.         if metadata is not None:  
  43.             _instance_metadata_update_in_place(context, instance_ref,  
  44.                                                'metadata',  
  45.                                                models.InstanceMetadata,  
  46.                                                values.pop('metadata'),  
  47.                                                session)  
  48.   
  49.         system_metadata = values.get('system_metadata')  
  50.         if system_metadata is not None:  
  51.             _instance_metadata_update_in_place(context, instance_ref,  
  52.                                                'system_metadata',  
  53.                                                models.InstanceSystemMetadata,  
  54.                                                values.pop('system_metadata'),  
  55.                                                session)  
  56.   
  57.         instance_ref.update(values)  
  58.         instance_ref.save(session=session)  
  59.   
  60.     return (old_instance_ref, instance_ref)  
复制代码

再来看方法_update中实现发送通知,来报告实例中发生的任何改变的实现内容,即调用了方法send_update来实现的:
  1. def send_update(context, old_instance, new_instance, service=None, host=None):  
  2.     """
  3.     发送通知,来报告实例中发生的任何改变;
  4.     """  
  5.   
  6.     if not CONF.notify_on_any_change and not CONF.notify_on_state_change:  
  7.         return  
  8.   
  9.     update_with_state_change = False  
  10.   
  11.     old_vm_state = old_instance["vm_state"]  
  12.     new_vm_state = new_instance["vm_state"]  
  13.     old_task_state = old_instance["task_state"]  
  14.     new_task_state = new_instance["task_state"]  
  15.   
  16.     if old_vm_state != new_vm_state:  
  17.         update_with_state_change = True  
  18.     elif CONF.notify_on_state_change:  
  19.         if (CONF.notify_on_state_change.lower() == "vm_and_task_state" and  
  20.             old_task_state != new_task_state):  
  21.             update_with_state_change = True  
  22.   
  23.     if update_with_state_change:  
  24.         send_update_with_states(context, new_instance, old_vm_state,  
  25.                 new_vm_state, old_task_state, new_task_state, service, host)  
  26.   
  27.     else:  
  28.         try:  
  29.             _send_instance_update_notification(context, new_instance,  
  30.                     service=service, host=host)  
  31.         except Exception:  
  32.             LOG.exception(_("Failed to send state update notification"),  
  33.                     instance=new_instance)  
复制代码











我们再回到方法live_migrate中,来看第二部分主要实现的内容,即调用方法live_migration来实现在主题topic的模式上发送实时迁移虚拟机的消息。来看方法live_migration:


  1. def live_migration(self, ctxt, block_migration, disk_over_commit,  
  2.             instance, dest):  
  3.         # 调用call方法实现在主题topic的模式上发送实时迁移虚拟机的消息,并等待回应;  
  4.          
  5.         # to_primitive:转换更新后的instance到primitives格式;  
  6.         instance_p = jsonutils.to_primitive(instance)  
  7.          
  8.         # call:在一个主题topic上发送一条消息,并等待响应;rv:多次调用(呼叫)等待者类的对象列表;返回rv[-1]列表的结尾;  
  9.         return self.call(ctxt, self.make_msg('live_migration',  
  10.                 block_migration=block_migration,  
  11.                 disk_over_commit=disk_over_commit, instance=instance_p,  
  12.                 dest=dest))  
复制代码





这里调用方法call来实现在一个主题topic上发送一条消息(进行实时迁移操作),并等待响应。这里将调用/nova/scheduler/manager.py----live_migration这个方法来执行实时迁移的调度的方法,并返回实例当前运行的主机。具体来看方法的实现源码:


  1. def live_migration(self, context, instance, dest, block_migration, disk_over_commit):  
  2.         try:  
  3.             # schedule_live_migration:执行实时迁移的调度方法;  
  4.             # 返回实例当前运行的主机;     
  5.             return self.driver.schedule_live_migration(  
  6.                 context, instance, dest,  
  7.                 block_migration, disk_over_commit)  
  8.         except (exception.ComputeServiceUnavailable,  
  9.                 exception.InvalidHypervisorType,  
  10.                 exception.UnableToMigrateToSelf,  
  11.                 exception.DestinationHypervisorTooOld,  
  12.                 exception.InvalidLocalStorage,  
  13.                 exception.InvalidSharedStorage) as ex:  
  14.             request_spec = {'instance_properties': {  
  15.                 'uuid': instance['uuid'], },  
  16.             }  
  17.             with excutils.save_and_reraise_exception():  
  18.                 self._set_vm_state_and_notify('live_migration',  
  19.                             dict(vm_state=instance['vm_state'],  
  20.                                  task_state=None,  
  21.                                  expected_task_state=task_states.MIGRATING,),  
  22.                                               context, ex, request_spec)  
  23.         except Exception as ex:  
  24.             with excutils.save_and_reraise_exception():  
  25.                 self._set_vm_state_and_notify('live_migration',  
  26.                                              {'vm_state': vm_states.ERROR},  
  27.                                              context, ex, {})  
复制代码
  1. def schedule_live_migration(self, context, instance, dest,  
  2.                                 block_migration, disk_over_commit):  
  3.         """            
  4.         执行实时迁移的调度方法;
  5.         返回实例当前运行的主机;         
  6.         """  
  7.         # 检测确保能够进行实时迁移;  
  8.         # _live_migration_src_check:为源主机的实时迁移作例行的检测;  
  9.         self._live_migration_src_check(context, instance)  
  10.   
  11.         # 如果目标主机没有定义,则由调度算法选取一个主机;  
  12.         if dest is None:            
  13.             # ignore_hosts:避免作为目标主机的主机;  
  14.             # 源主机不能作为目标主机;  
  15.             ignore_hosts = [instance['host']]  
  16.             while dest is None:  
  17.                 # _live_migration_dest_check:为实时迁移作目标主机的例行检测;  
  18.                 # 返回获取的目标主机;  
  19.                 dest = self._live_migration_dest_check(context, instance, dest, ignore_hosts)  
  20.                 try:  
  21.                     # _live_migration_common_check:实时迁移进行的常规检测;  
  22.                     self._live_migration_common_check(context, instance, dest)  
  23.                     # check_can_live_migrate_destination:检测是否可以进行实时迁移;  
  24.                     # 这个检测将会在目标主机上进行,然后返回检测结果给源主机;  
  25.                     migrate_data = self.compute_rpcapi.\  
  26.                         check_can_live_migrate_destination(context, instance,  
  27.                                                            dest,  
  28.                                                            block_migration,  
  29.                                                            disk_over_commit)  
  30.                 except exception.Invalid:  
  31.                     ignore_hosts.append(dest)  
  32.                     dest = None  
  33.                     continue  
  34.         else:  
  35.             # _live_migration_dest_check:为实时迁移作目标主机的例行检测;  
  36.             # 返回获取的目标主机;  
  37.             self._live_migration_dest_check(context, instance, dest)  
  38.             # _live_migration_common_check:实时迁移进行的常规检测;  
  39.             self._live_migration_common_check(context, instance, dest)  
  40.             # check_can_live_migrate_destination:检测是否可以进行实时迁移;  
  41.             # 这个检测将会在目标主机上进行,然后返回检测结果给源主机;  
  42.             migrate_data = self.compute_rpcapi.\  
  43.                 check_can_live_migrate_destination(context, instance, dest,  
  44.                                                    block_migration,  
  45.                                                    disk_over_commit)  
  46.   
  47.         # 执行迁移;  
  48.         src = instance['host']  
  49.         self.compute_rpcapi.live_migration(context, host=src,  
  50.                 instance=instance, dest=dest,  
  51.                 block_migration=block_migration,  
  52.                 migrate_data=migrate_data)  
复制代码





可以看见,在方法schedule_live_migration中,主要进行了三部分的内容,
第一,如果目前主机不存在,则由调度算法选取一个目标主机,并且进行相关的检测,确保能够进行实时迁移操作;
第二,如果目标主机存在,则直接进行相关的检测操作,确保能够进行实时迁移操作;
第三,执行迁移操作。


在前两部分的内容中,分别调用了三个方法_live_migration_dest_check、_live_migration_common_check和

check_can_live_migrate_destination。我们分别来看这三个方法:
首先来看方法_live_migration_dest_check,具体来看它的源码:

  1. def _live_migration_dest_check(self, context, instance_ref, dest, ignore_hosts=None):  
  2.         """
  3.         为实时迁移作目标主机的例行检测;
  4.         :param dest: destination host
  5.         目标主机;
  6.         :param ignore_hosts: hosts that should be avoided as dest host
  7.         应该避免作为目标主机的主机;
  8.         """  
  9.   
  10.         # 如果没有指定目标主机,则选取一个;  
  11.         if dest is None:  
  12.             # instance_type_get:通过id值获取实例类型;  
  13.             instance_type = db.instance_type_get(context, instance_ref['instance_type_id'])  
  14.             # 以字典的形式返回给定image镜像instance_ref['image_ref']的镜像数据;  
  15.             # 调用glance客户端,下载镜像数据(此时为JSON格式);  
  16.             # 转换从glance下载的镜像数据为python可处理的字典格式;  
  17.             image = self.image_service.show(context, instance_ref['image_ref'])  
  18.             request_spec = {'instance_properties': instance_ref,  
  19.                             'instance_type': instance_type,  
  20.                             'instance_uuids': [instance_ref['uuid']],  
  21.                             'image': image}  
  22.             # 过滤器;  
  23.             filter_properties = {'ignore_hosts': ignore_hosts}  
  24.             # 调用合适的scheduler算法来获取合适的主机,作为实时迁移的目标主机;  
  25.             return self.select_hosts(context, request_spec, filter_properties)[0]  
  26.   
  27.         # 检测主机上的实例是否在运行,并且目标主机和源主机是不同的;  
  28.         src = instance_ref['host']  
  29.         if dest == src:  
  30.             raise exception.UnableToMigrateToSelf(  
  31.                     instance_id=instance_ref['uuid'], host=dest)  
  32.   
  33.         # 检确保目标主机是存在的;  
  34.         try:  
  35.             # service_get_by_compute_host:为给定的计算主机获取服务入口(条目);  
  36.             dservice_ref = db.service_get_by_compute_host(context, dest)  
  37.         except exception.NotFound:  
  38.             raise exception.ComputeServiceUnavailable(host=dest)  
  39.   
  40.         # 确保目标主机是活跃的;  
  41.         if not self.servicegroup_api.service_is_up(dservice_ref):  
  42.             raise exception.ComputeServiceUnavailable(host=dest)  
  43.   
  44.         # 检测内存需求;  
  45.         # 确保目标主机具有足够的内存来进行实时迁移;  
  46.         self._assert_compute_node_has_enough_memory(context, instance_ref, dest)  
  47.   
  48.         return dest  
复制代码



这个方法中,将会判断是否定义了目标主机dest,如果没有定义目标主机,将会调用合适的scheduler算法来获取合适的主机,作为实时迁移的目标主机。然后会针对目标主机进行一系列的检查操作。
再来看方法_live_migration_common_check,看看它的源码实现:

  1. def _live_migration_common_check(self, context, instance_ref, dest):  
  2.         """
  3.         实时迁移进行的常规检测;
  4.         """  
  5.         # 获取目标主机信息;  
  6.         dservice_ref = self._get_compute_info(context, dest)  
  7.         src = instance_ref['host']  
  8.         # 获取源主机信息;  
  9.         oservice_ref = self._get_compute_info(context, src)  
  10.   
  11.         # 确保目标主机和源主机的虚拟机管理程序是相同的;  
  12.         orig_hypervisor = oservice_ref['hypervisor_type']  
  13.         dest_hypervisor = dservice_ref['hypervisor_type']  
  14.         if orig_hypervisor != dest_hypervisor:  
  15.             raise exception.InvalidHypervisorType()  
  16.   
  17.         # 确保目标主机的虚拟机管理程序版本大于源主机的虚拟机管理程序版本;  
  18.         orig_hypervisor = oservice_ref['hypervisor_version']  
  19.         dest_hypervisor = dservice_ref['hypervisor_version']  
  20.         if orig_hypervisor > dest_hypervisor:  
  21.             raise exception.DestinationHypervisorTooOld()  
复制代码





在这个方法中,主要实现了对源主机和目标主机上的虚拟机管理程序的版本进行检查。
最后来看方法check_can_live_migrate_destination,这个方法实现的是在目标主机上检测是否可以进行实时迁移,并将检测结果返回给源主机。来看看它的源码实现:


  1. def check_can_live_migrate_destination(self, ctxt, instance, destination,  
  2.                                            block_migration, disk_over_commit):  
  3.         instance_p = jsonutils.to_primitive(instance)  
  4.         return self.call(ctxt,  
  5.                          self.make_msg('check_can_live_migrate_destination',  
  6.                                        instance=instance_p,  
  7.                                        block_migration=block_migration,  
  8.                                        disk_over_commit=disk_over_commit),  
  9.                          topic=_compute_topic(self.topic, ctxt, destination, None))  
复制代码



这里调用call方法实现在一个主题topic上发送一条远程消息,实现在目标主机上进行检测是否可以进行实时迁移,并等待响应。接下来将会执行/nova/compute/manager.py中的方法check_can_live_migrate_destination,这个方法实现了在目标主机上进行检测是否可以进行实时迁移。具体来看方法的实现代码:
  1. @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())  
  2.     def check_can_live_migrate_destination(self, ctxt, instance,  
  3.                                            block_migration=False,  
  4.                                            disk_over_commit=False):  
  5.         """        
  6.         检测是否可以进行实时迁移;
  7.         这个检测将会在目标主机上进行,然后返回检测结果给源主机;
  8.         """  
  9.   
  10.         # 获取源主机信息;  
  11.         src_compute_info = self._get_compute_info(ctxt, instance['host'])  
  12.         # 获取目标主机信息;  
  13.         dst_compute_info = self._get_compute_info(ctxt, CONF.host)  
  14.         # check_can_live_migrate_destination:检测是否可以执行实时迁移;  
  15.         # 这个检测是运行在目标主机上的,然后返回检测结果给源主机;  
  16.         dest_check_data = self.driver.check_can_live_migrate_destination(ctxt,  
  17.             instance, src_compute_info, dst_compute_info,  
  18.             block_migration, disk_over_commit)  
  19.         migrate_data = {}  
  20.         try:  
  21.             # check_can_live_migrate_source:检测是否可以执行实时迁移;  
  22.             migrate_data = self.compute_rpcapi.check_can_live_migrate_source(ctxt, instance, dest_check_data)  
  23.         finally:  
  24.             # check_can_live_migrate_destination_cleanup:目标主机所要做的必要的清理工作;  
  25.             self.driver.check_can_live_migrate_destination_cleanup(ctxt, dest_check_data)  
  26.         if dest_check_data and 'migrate_data' in dest_check_data:  
  27.             migrate_data.update(dest_check_data['migrate_data'])  
  28.         return migrate_data  
复制代码



这个方法中继而调用了以下方法,这里不再做一一解释:

  1. def check_can_live_migrate_destination(self, ctxt, instance_ref,  
  2.                                            src_compute_info, dst_compute_info,  
  3.                                            block_migration=False,  
  4.                                            disk_over_commit=False):  
  5.         """         
  6.         检测是否可以执行实时迁移;
  7.         这个检测是运行在目标主机上的,然后返回检测结果给源主机;
  8.         """  
  9.         disk_available_mb = None  
  10.          
  11.         # 如果是块迁移;  
  12.         if block_migration:  
  13.             disk_available_gb = dst_compute_info['disk_available_least']  
  14.             # disk_available_mb:获取磁盘上可用空间(总的可用空间减去为主机预留的空间);  
  15.             # disk_available_gb * 1024:获取磁盘上可用空间的总量(MB);  
  16.             # reserved_host_disk_mb:这个参数定义了在磁盘上为host主机预留的空间总量(MB);  
  17.             # 参数的默认值为0;  
  18.             disk_available_mb = (disk_available_gb * 1024) - CONF.reserved_host_disk_mb  
  19.   
  20.         # 检测CPU相关信息;  
  21.         src = instance_ref['host']  
  22.         source_cpu_info = src_compute_info['cpu_info']  
  23.         # 检测主机中的CPU和xml文件中给定的CPU是否是兼容的;  
  24.         self._compare_cpu(source_cpu_info)  
  25.   
  26.         # _create_shared_storage_test_file:在CONF.instances_path定义的实例在磁盘disk上的存储路径下创建临时文件;  
  27.         filename = self._create_shared_storage_test_file()  
  28.   
  29.         return {"filename": filename,  
  30.                 "block_migration": block_migration,  
  31.                 "disk_over_commit": disk_over_commit,  
  32.                 "disk_available_mb": disk_available_mb}  
复制代码
  1. def check_can_live_migrate_source(self, ctxt, instance, dest_check_data):  
  2.         instance_p = jsonutils.to_primitive(instance)  
  3.         return self.call(ctxt, self.make_msg('check_can_live_migrate_source',  
  4.                                              instance=instance_p,  
  5.                                              dest_check_data=dest_check_data),  
  6.                          topic=_compute_topic(self.topic, ctxt, None,  
  7.                                               instance))  
复制代码
  1. @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())  
  2. def check_can_live_migrate_source(self, ctxt, instance, dest_check_data):  
  3.         """
  4.         检测是否可以执行实时迁移;
  5.         """  
  6.         capi = self.conductor_api  
  7.         bdms = capi.block_device_mapping_get_all_by_instance(ctxt, instance)  
  8.   
  9.         is_volume_backed = self.compute_api.is_volume_backed_instance(ctxt,  
  10.                                                                       instance,  
  11.                                                                       bdms)  
  12.         dest_check_data['is_volume_backed'] = is_volume_backed  
  13.         # check_can_live_migrate_source:检测是否可以执行实时迁移;  
  14.         return self.driver.check_can_live_migrate_source(ctxt, instance, dest_check_data)  
复制代码


  1. def check_can_live_migrate_source(self, ctxt, instance_ref,  
  2.                                       dest_check_data):  
  3.         """         
  4.         检测是否可以执行实时迁移;
  5.         """  
  6.         source = CONF.host  
  7.         filename = dest_check_data["filename"]  
  8.         block_migration = dest_check_data["block_migration"]  
  9.         is_volume_backed = dest_check_data.get('is_volume_backed', False)  
  10.   
  11.         shared = self._check_shared_storage_test_file(filename)  
  12.   
  13.         if block_migration:  
  14.             if shared:  
  15.                 reason = _("Block migration can not be used "  
  16.                            "with shared storage.")  
  17.                 raise exception.InvalidLocalStorage(reason=reason, path=source)  
  18.             # 检测目标主机是否有足够的磁盘空间来进行实时迁移;  
  19.             self._assert_dest_node_has_enough_disk(ctxt, instance_ref,  
  20.                                     dest_check_data['disk_available_mb'],  
  21.                                     dest_check_data['disk_over_commit'])  
  22.   
  23.         elif not shared and not is_volume_backed:  
  24.             reason = _("Live migration can not be used "  
  25.                        "without shared storage.")  
  26.             raise exception.InvalidSharedStorage(reason=reason, path=source)  
  27.         dest_check_data.update({"is_shared_storage": shared})  
  28.         return dest_check_data  
复制代码



我们再来看方法schedule_live_migration中的第三部分内容,即执行迁移操作。
先来回顾其中实现这部分的源码:

  1.       # 执行迁移;  
  2.         src = instance['host']  
  3.         self.compute_rpcapi.live_migration(context, host=src,  
  4.                 instance=instance, dest=dest,  
  5.                 block_migration=block_migration,  
  6.                 migrate_data=migrate_data)  
复制代码
  1. def live_migration(self, ctxt, instance, dest, block_migration, host,  
  2.                    migrate_data=None):  
  3.     instance_p = jsonutils.to_primitive(instance)  
  4.     self.cast(ctxt, self.make_msg('live_migration', instance=instance_p,  
  5.             dest=dest, block_migration=block_migration,  
  6.             migrate_data=migrate_data),  
  7.             topic=_compute_topic(self.topic, ctxt, host, None))  
复制代码


  1. def live_migration(self, context, instance, dest, block_migration, disk_over_commit):  
  2.         try:  
  3.             # schedule_live_migration:执行实时迁移的调度方法;  
  4.             # 返回实例当前运行的主机;     
  5.             return self.driver.schedule_live_migration(  
  6.                 context, instance, dest,  
  7.                 block_migration, disk_over_commit)  
  8.         except (exception.ComputeServiceUnavailable,  
  9.                 exception.InvalidHypervisorType,  
  10.                 exception.UnableToMigrateToSelf,  
  11.                 exception.DestinationHypervisorTooOld,  
  12.                 exception.InvalidLocalStorage,  
  13.                 exception.InvalidSharedStorage) as ex:  
  14.             request_spec = {'instance_properties': {  
  15.                 'uuid': instance['uuid'], },  
  16.             }  
  17.             with excutils.save_and_reraise_exception():  
  18.                 self._set_vm_state_and_notify('live_migration',  
  19.                             dict(vm_state=instance['vm_state'],  
  20.                                  task_state=None,  
  21.                                  expected_task_state=task_states.MIGRATING,),  
  22.                                               context, ex, request_spec)  
  23.         except Exception as ex:  
  24.             with excutils.save_and_reraise_exception():  
  25.                 self._set_vm_state_and_notify('live_migration',  
  26.                                              {'vm_state': vm_states.ERROR},  
  27.                                              context, ex, {})  
复制代码
  1. def live_migration(self, ctxt, instance_ref, dest,  
  2.                        post_method, recover_method, block_migration=False,  
  3.                        migrate_data=None):  
  4.         """        
  5.         建立一个绿色线程来运行方法_live_migration,来执行实时迁移;
  6.         主要是调用libvirt python接口方法virDomainMigrateToURI,来实现从当前主机迁移domain对象到给定的目标主机;
  7.         """  
  8.   
  9.         # spawn:建立一个绿色线程来运行方法“func(*args, **kwargs)”,这里就是来运行方法_live_migration;  
  10.         # _live_migration:执行实时迁移;  
  11.         # 主要是调用libvirt python接口方法virDomainMigrateToURI,来实现从当前主机迁移domain对象到给定的目标主机;  
  12.         greenthread.spawn(self._live_migration, ctxt, instance_ref, dest,  
  13.                           post_method, recover_method, block_migration,  
  14.                           migrate_data)  
复制代码
  1. def _live_migration(self, ctxt, instance_ref, dest, post_method,  
  2.                         recover_method, block_migration=False,  
  3.                         migrate_data=None):  
  4.         """
  5.         执行实时迁移;
  6.         主要是调用libvirt python接口方法virDomainMigrateToURI,来实现从当前主机迁移domain对象到给定的目标主机;
  7.         :params ctxt: security context
  8.         安全上下文信息;
  9.         :params instance_ref:
  10.             nova.db.sqlalchemy.models.Instance object
  11.             instance object that is migrated.
  12.         被迁移的实例对象;
  13.         :params dest: destination host
  14.         目标主机;
  15.         :params post_method:
  16.             post operation method.
  17.             expected nova.compute.manager.post_live_migration.
  18.         post操作方法;
  19.         :params recover_method:
  20.             recovery method when any exception occurs.
  21.             expected nova.compute.manager.recover_live_migration.
  22.         发生任何异常时候的恢复方法;
  23.         :params migrate_data: implementation specific params
  24.         实时迁移的具体参数;
  25.         """  
  26.   
  27.         # 执行迁移操作;  
  28.         try:  
  29.             if block_migration:  
  30.                 # 获取块迁移标志列表;  
  31.                 # block_migration_flag:这个参数定义了为块迁移设置迁移标志;  
  32.                 # 参数的默认值为'VIR_MIGRATE_UNDEFINE_SOURCE, VIR_MIGRATE_PEER2PEER, VIR_MIGRATE_NON_SHARED_INC';  
  33.                 flaglist = CONF.block_migration_flag.split(',')  
  34.             else:  
  35.                 # 获取实时迁移标志列表;  
  36.                 # 这个参数定义了实时迁移的迁移标志;  
  37.                 # 参数的默认值为'VIR_MIGRATE_UNDEFINE_SOURCE, VIR_MIGRATE_PEER2PEER';  
  38.                 flaglist = CONF.live_migration_flag.split(',')  
  39.             flagvals = [getattr(libvirt, x.strip()) for x in flaglist]  
  40.             logical_sum = reduce(lambda x, y: x | y, flagvals)  
  41.   
  42.             # 根据给定的实例名称检索libvirt域对象;  
  43.             dom = self._lookup_by_name(instance_ref["name"])  
  44.               
  45.             # migrateToURI:调用libvirt python接口方法virDomainMigrateToURI,来实现从当前主机迁移domain对象到给定的目标主机;  
  46.             # live_migration_uri:这个参数定义了迁移目标URI;  
  47.             # 参数的默认值为"qemu+tcp://%s/system";  
  48.             # 这里面的"%s"用迁移目标主机名来代替;  
  49.             # live_migration_bandwidth:这个参数定义了迁移过程中所使用的最大的带宽;  
  50.             # 参数的默认值为0;  
  51.             dom.migrateToURI(CONF.live_migration_uri % dest,  
  52.                              logical_sum,  
  53.                              None,  
  54.                              CONF.live_migration_bandwidth)  
  55.   
  56.         except Exception as e:  
  57.             with excutils.save_and_reraise_exception():  
  58.                 LOG.error(_("Live Migration failure: %(e)s") % locals(),  
  59.                           instance=instance_ref)  
  60.                 recover_method(ctxt, instance_ref, dest, block_migration)  
  61.   
  62.         # Waiting for completion of live_migration.  
  63.         # 获取等待完成实时迁移的时间;  
  64.         timer = utils.FixedIntervalLoopingCall(f=None)  
  65.   
  66.         def wait_for_live_migration():  
  67.             """
  68.             waiting for live migration completion.
  69.             检测虚拟机迁移操作的实施状态;
  70.             """  
  71.             try:  
  72.                 self.get_info(instance_ref)['state']  
  73.             except exception.NotFound:  
  74.                 timer.stop()  
  75.                 post_method(ctxt, instance_ref, dest, block_migration,  
  76.                             migrate_data)  
  77.   
  78.         # 这两条语句所实现的功能是以一定的时间间隔(0.5)循环调用wait_for_live_migration方法,来检测虚拟机迁移的状态,知道虚拟机成功迁移为止;  
  79.         timer.f = wait_for_live_migration  
  80.         timer.start(interval=0.5).wait()  
复制代码



至此,Nova虚拟机动态迁移的实现机制和实现源码解析完成。
博文中不免有不正确的地方,欢迎朋友们不吝批评指正,谢谢大家了






博客地址:http://blog.csdn.net/gaoxingnengjisuan

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条