本帖最后由 xioaxu790 于 2014-6-13 10:28 编辑
问题导读:
1、方法quota_reserve执行的过程分为哪几步?
2、如何删除一个实例?
继续看方法quota_reserve:
- def quota_reserve(context, resources, quotas, deltas, expire, until_refresh, max_age, project_id=None):
-
- elevated = context.elevated()
- # 获取db_session的session;
- # get_session:返回一个SQLAlchemy session,若没有定义,则新建一个SQLAlchemy session;
- session = get_session()
- with session.begin():
-
- # 获取context中的project_id;
- if project_id is None:
- project_id = context.project_id
-
- # 从quota_usages表中获得当前工程的各种资源的使用情况;
- usages = _get_quota_usages(context, session, project_id)
-
- # Handle usage refresh
- work = set(deltas.keys())
- while work:
- # 任意地从集合work中取出一个元素;
- resource = work.pop()
-
- refresh = False
- if resource not in usages:
- usages[resource] = _quota_usage_create(elevated,project_id,resource,0, 0,until_refresh or None,session=session)
- refresh = True
- elif usages[resource].in_use < 0:
- refresh = True
- elif usages[resource].until_refresh is not None:
- usages[resource].until_refresh -= 1
- if usages[resource].until_refresh <= 0:
- refresh = True
- elif max_age and (usages[resource].updated_at - timeutils.utcnow()).seconds >= max_age:
- refresh = True
-
- # 执行更新(同步)usage;
- if refresh:
- # Grab the sync routine
- # 获取同步方法,_sync_*(),这些方法定义在quota模块中,不同的资源有不同的同步方法;
- sync = resources[resource].sync
-
- # 查询当前正在使用的实时的资源的数据信息;
- updates = sync(elevated, project_id, session)
- for res, in_use in updates.items():
- # 如果试实时使用的资源没有在usages中,那么把它添加进去;
- if res not in usages:
- usages[res] = _quota_usage_create(elevated,project_id,res,0, 0,until_refresh or None,session=session)
-
- # 更新usage中的in_use数据信息;
- usages[res].in_use = in_use
- # 更新usage中的until_refresh数据信息;
- usages[res].until_refresh = until_refresh or None
-
- work.discard(res)
-
- # 检测资源数据中in_use加上delta之后,可能小于0的情况;
- unders = [resource for resource, delta in deltas.items()
- if delta < 0 and
- delta + usages[resource].in_use < 0]
-
- # Now, let's check the quotas
- # 检测这个resource的hard_limit是否小于in_use+resourced+delta之和;
- overs = [resource for resource, delta in deltas.items()
- if quotas[resource] >= 0 and delta >= 0 and
- quotas[resource] < delta + usages[resource].total]
-
- # Create the reservations
- # 如果没有超过的话,更新reservations表,再次更新usages
- if not overs:
- reservations = []
- for resource, delta in deltas.items():
- # str(uuid.uuid4()):随机获取uuid值,是唯一的;
- # usages[resource]:更新过的当前资源在quota_usages表中的使用情况;
- reservation = reservation_create(elevated,str(uuid.uuid4()),usages[resource],project_id,resource, delta, expire,session=session)
- reservations.append(reservation.uuid)
-
- # 更新usages中的reserved值,加上变化值;
- if delta > 0:
- usages[resource].reserved += delta
-
- # Apply updates to the usages table
- # 更新quota_usages表;
- for usage_ref in usages.values():
- usage_ref.save(session=session)
-
- if unders:
- LOG.warning(_("Change will make usage less than 0 for the following resources: %(unders)s") % locals())
- if overs:
- usages = dict((k, dict(in_use=v['in_use'], reserved=v['reserved']))
- for k, v in usages.items())
- raise exception.OverQuota(overs=sorted(overs), quotas=quotas,usages=usages)
-
- return reservations
复制代码
继续来看代码,看看是如何执行refresh更新(同步)操作的;
- # 执行更新(同步)usage;
- if refresh:
- # Grab the sync routine
- # 获取同步方法,_sync_*(),这些方法定义在quota模块中,不同的资源有不同的同步方法;
- sync = resources[resource].sync
-
- # 查询当前正在使用的实时的资源的数据信息;
- updates = sync(elevated, project_id, session)
- for res, in_use in updates.items():
- # 如果试实时使用的资源没有在usages中,那么把它添加进去;
- if res not in usages:
- usages[res] = _quota_usage_create(elevated,project_id,res,0, 0,until_refresh or None,session=session)
-
- # 更新usage中的in_use数据信息;
- usages[res].in_use = in_use
- # 更新usage中的until_refresh数据信息;
- usages[res].until_refresh = until_refresh or None
-
- work.discard(res)
复制代码
回顾一下,在quota.py中resources的定义:
- resources = [
- ReservableResource('instances', _sync_instances, 'quota_instances'),
- ReservableResource('cores', _sync_instances, 'quota_cores'),
- ReservableResource('ram', _sync_instances, 'quota_ram'),
- ReservableResource('floating_ips', _sync_floating_ips, 'quota_floating_ips'),
- ReservableResource('fixed_ips', _sync_fixed_ips, 'quota_fixed_ips'),
- ReservableResource('security_groups', _sync_security_groups, 'quota_security_groups'),
-
- AbsoluteResource('metadata_items', 'quota_metadata_items'),
- AbsoluteResource('injected_files', 'quota_injected_files'),
- AbsoluteResource('injected_file_content_bytes', 'quota_injected_file_content_bytes'),
- AbsoluteResource('injected_file_path_bytes', 'quota_injected_file_path_bytes'),
-
- CountableResource('security_group_rules', db.security_group_rule_count_by_group, 'quota_security_group_rules'),
- CountableResource('key_pairs', db.key_pair_count_by_user, 'quota_key_pairs'),
- ]
复制代码
这些_sync_*就是同步方法,也就是这里通过语句:sync = resources[resource].sync来获取的方法。通过这些方法能够实时查询到工程当前所使用资源的情况,也就能够用于刷新(同步)资源使用信息的操作。
阅读代码我们可以知道:
deltas.keys() = ['instances', 'ram', 'cores']
work = set(deltas.keys())
resource = work.pop()
sync = resources[resource].sync
所以sync只能从'instances', 'ram', 'cores'三个资源中选取对应的同步方法,而这三个资源对应的同步方法都是_sync_instances,进而我们来看看_sync_instances这个方法:
- def _sync_instances(context, project_id, session):
- return dict(zip(('instances', 'cores', 'ram'),
- db.instance_data_get_for_project(
- context, project_id, session=session)))
复制代码
再看方法instance_data_get_for_project:
- def instance_data_get_for_project(context, project_id, session=None):
- result = model_query(context,
- func.count(models.Instance.id),
- func.sum(models.Instance.vcpus),
- func.sum(models.Instance.memory_mb),
- base_model=models.Instance,
- session=session).\
- filter_by(project_id=project_id).\
- first()
- # NOTE(vish): convert None to 0
- return (result[0] or 0, result[1] or 0, result[2] or 0)
复制代码
在来看看类Instance,这个类定义了虚拟机实例信息的数据库表;
- class Instance(BASE, NovaBase):
- """
- 表示一个来宾VM的类;
- 构建数据表instances的数据结构;
- """
-
- # 表名instances;
- __tablename__ = 'instances'
- injected_files = []
-
- # 定义数据表instances的字段id;
- id = Column(Integer, primary_key=True, autoincrement=True)
-
- @property
- def name(self):
- try:
- base_name = CONF.instance_name_template % self.id
- except TypeError:
- # Support templates like "uuid-%(uuid)s", etc.
- info = {}
- # NOTE(russellb): Don't use self.iteritems() here, as it will
- # result in infinite recursion on the name property.
- for column in iter(object_mapper(self).columns):
- key = column.name
- # prevent recursion if someone specifies %(name)s
- # %(name)s will not be valid.
- if key == 'name':
- continue
- info[key] = self[key]
- try:
- base_name = CONF.instance_name_template % info
- except KeyError:
- base_name = self.uuid
- return base_name
-
- def _extra_keys(self):
- return ['name']
-
- user_id = Column(String(255)) # 定义数据表instances的字段user_id;
- project_id = Column(String(255)) # 定义数据表instances的字段project_id;
-
- image_ref = Column(String(255)) # 定义数据表instances的字段image_ref;
- kernel_id = Column(String(255)) # 定义数据表instances的字段kernel_id;
- ramdisk_id = Column(String(255)) # 定义数据表instances的字段ramdisk_id;
- hostname = Column(String(255)) # 定义数据表instances的字段hostname;
-
- launch_index = Column(Integer) # 定义数据表instances的字段lauch_index;
- key_name = Column(String(255)) # 定义数据表instances的字段key_name;
- key_data = Column(Text) # 定义数据表instances的字段key_data;
-
- power_state = Column(Integer) # 定义数据表instances的字段power_state;
- vm_state = Column(String(255)) # 定义数据表instances的字段vm_state;
- task_state = Column(String(255)) # 定义数据表instances的字段task_state;
-
- memory_mb = Column(Integer) # 定义数据表instances的字段memory_mb;
- vcpus = Column(Integer) # 定义数据表instances的字段vcpus;
- root_gb = Column(Integer) # 定义数据表instances的字段root_gb;
- ephemeral_gb = Column(Integer) # 定义数据表instances的字段ephemeral_gb;
-
- # 定义数据表instances的字段host;
- # 注意这个字段不是与hostname相关的,而是指nova的节点;
- host = Column(String(255)) # , ForeignKey('hosts.id'))
-
- # 定义数据表instances的字段node;
- # 识别实例所在的"ComputeNode" ;
- # 这与ComputeNode.hypervisor_hostname.
- node = Column(String(255))
-
- instance_type_id = Column(Integer) # 定义数据表instances的字段instance_type_id;
-
- user_data = Column(Text) # 定义数据表instances的字段user_data;
-
- reservation_id = Column(String(255)) # 定义数据表instances的字段reservation_id;
-
- scheduled_at = Column(DateTime) # 定义数据表instances的字段scheduled_at;
- launched_at = Column(DateTime) # 定义数据表instances的字段launched_at;
- terminated_at = Column(DateTime) # 定义数据表instances的字段terminated_at;
-
- availability_zone = Column(String(255)) # 定义数据表instances的字段availability_zone;
-
- # User editable field for display in user-facing UIs
- display_name = Column(String(255)) # 定义数据表instances的字段display_name;
- display_description = Column(String(255)) # 定义数据表instances的字段display_description;
-
- # To remember on which host an instance booted.
- # An instance may have moved to another host by live migration.
- # 记住在哪个主机上启动了一个实例;
- # 实例可能已经通过实时迁移到了另一个主机上;
- launched_on = Column(Text) # 定义数据表instances的字段launched_on;
- locked = Column(Boolean) # 定义数据表instances的字段locked;
-
- os_type = Column(String(255)) # 定义数据表instances的字段os_type;
- architecture = Column(String(255)) # 定义数据表instances的字段architecture;
- vm_mode = Column(String(255)) # 定义数据表instances的字段vm_mode;
- uuid = Column(String(36)) # 定义数据表instances的字段uuid;
-
- root_device_name = Column(String(255)) # 定义数据表instances的字段root_device_name;
- default_ephemeral_device = Column(String(255), nullable=True) # 定义数据表instances的字段default_ephemeral_device;
- default_swap_device = Column(String(255), nullable=True) # 定义数据表instances的字段default_swap_device;
- config_drive = Column(String(255)) # 定义数据表instances的字段config_drive;
-
- # User editable field meant to represent what ip should be used to connect to the instance
- # 用户可编辑字段,代表着哪一个IP能够被用来连接实例;
- access_ip_v4 = Column(types.IPAddress()) # 定义数据表instances的字段access_ip_v4;
- access_ip_v6 = Column(types.IPAddress()) # 定义数据表instances的字段access_ip_v6;
-
- auto_disk_config = Column(Boolean()) # 定义数据表instances的字段auto_disk_config;
- progress = Column(Integer) # 定义数据表instances的字段progress;
-
- # EC2 instance_initiated_shutdown_terminate
- # True: -> 'terminate'
- # False: -> 'stop'
- # Note(maoy): currently Nova will always stop instead of terminate
- # no matter what the flag says. So we set the default to False.
- # 当前的nova应该是stop而不是terminate,所以我们默认设置标志为False;
- shutdown_terminate = Column(Boolean(), default=False, nullable=False) # 定义数据表instances的字段shutdown_terminate;
-
- # EC2 disable_api_termination
- disable_terminate = Column(Boolean(), default=False, nullable=False) # 定义数据表instances的字段disable_terminate;
-
- # OpenStack compute cell name. This will only be set at the top of
- # the cells tree and it'll be a full cell name such as 'api!hop1!hop2'
- # OpenStack的计算单元名;
- cell_name = Column(String(255)) # 定义数据表instances的字段cell_name;
复制代码
由此我们可以知道,语句updates = sync(elevated, project_id, session)实现的就是定位到资源的同步方法,实时查询所需要更新的资源使用信息;比如这里调用的同步方法就是方法_sync_instances,它实现了从数据库中查询到匹配的数据表'instances',进而获取其id、vcpus和memory_mb三种资源的实时的使用情况,分别赋值给'instances','cores'和 'ram',以字典的形式返回给updates;
这里我调试运行了一下,得到了获取的实时资源使用数据信息:
当建立了10个实例的时候:
- updates = {instances:10, ram:20480, cores:10}
复制代码
删除了1个实例之后:
- updates = {instances:9, ram:18432, cores:9}
复制代码
所以在这里我们就能够理解,为什么应用语句:resource = work.pop() 来随机获取work集合中的一个资源,原因就是针对'instances', 'ram', 'cores'无论哪一个资源,都会调用同样的同步方法_sync_instances来获取实时的工程资源使用情况,都会得到这三个资源的实时的使用数据信息。
继续看下面的代码:
- for res, in_use in updates.items():
复制代码
这段代码完成的功能就是如果获取的实时使用的资源没有在usages中,那么就把它添加进去,并更新它的in_use和until_refresh数据信息。
继续看代码:
- unders = [resource for resource, delta in deltas.items()
- if delta < 0 and delta + usages[resource].in_use < 0]
- 检测资源数据中in_use加上delta之后,可能小于0的情况,如果存在这种情况,后面将会引发异常;
- [python] view plaincopyprint?在CODE上查看代码片派生到我的代码片
- overs = [resource for resource, delta in deltas.items()
- if quotas[resource] >= 0 and delta >= 0 and
- quotas[resource] < delta + usages[resource].total]
复制代码
检测这个resource的hard_limit是否小于in_use+resourced+delta之和,如果over值为真,说明delta + usages[resource].total的值已经大于系统限定的资源配额的数值,后面将会引发异常;
继续看下一段代码:
- if not overs:
- reservations = []
- for resource, delta in deltas.items():
- # str(uuid.uuid4()):随机获取uuid值,是唯一的;
- # usages[resource]:更新过的当前资源在quota_usages表中的使用情况;
- reservation = reservation_create(elevated,str(uuid.uuid4()),usages[resource],project_id,resource, delta, expire,session=session)
- reservations.append(reservation.uuid)
-
- # 更新usages中的reserved值,加上变化值;
- if delta > 0:
- usages[resource].reserved += delta
-
- # 更新quota_usages表;
- for usage_ref in usages.values():
- usage_ref.save(session=session)
复制代码
这段代码是当没有发生资源配额超出限制值的时候,建立更新reservations数据表,并再次更新usages。reservations表记录的是每次分配的各种资源的变化值,即delta保存的值,它和quota_usages通过usage_id外键关联。这个表中的记录是不更新的,每次分配都会相应的增加记录。
最后返回reservations。
至此,方法quota_reserve解析完成:
这个函数执行的过程主要分为以下几步:
(1)同步
为什么要同步呢?这里同步的是什么呢?因为在为工程分配资源时,可能有各种特殊情况导致quota_usages表中记录的in_use不准确,需要得到当前实际使用的资源的情况,更新一下in_use,得到真实的资源使用情况。这里的特殊情况有一下4个:
1)当前申请的资源没有在quota_usages表中记录
2)当前申请的资源在quota_usages表中的in_use值小于0
3)当前申请的资源的until_refresh值不为空,减1之后小于0
4)当前申请的资源在quota_usages表中更新的时间减去现在的时间大于max_age
如果符合这四种情况之一,就执行同步。同步时,是调用当前资源的_sync_*()函数,去相关的表中查询实时的使用情况,比如_sync_instances()就是去instances表中查询出当前工程的instances数量,vcpu之和,ram之和,然后以字典的方式返回。然后根据这些实时的数据,更新in_use值。
(2)检查
根据各种资源的配额,和变化的情况(delta),来检查两种极端的情况:under和over。under是检查delta为负数的情况,即执行了删除等操作,使delta为负,in_use减少,导致in_use值可能小于0。over是检查delta为正数时的情况,in_use+delta就有可能大于最大的限额了。这里只对over的情况进行处理,即它只关心上限,不关心下限。如果没有over的话,就有下面的第(3)步了。如果over的话,就直接进行第(4)步。
(3)向reservations表中增加记录,记录申请资源的delta值。
(4)把in_use值写入到quota_usages表中保存。(不论under还是over都执行)
(5)如果over,即超出最大限额,则报出OverQuota异常。
至此,方法_check_num_instances_quota也全部解析结束,这个方法实现了根据配额资源限制所要建立实例的数目。方法返回两个参数max_count和reservations,返回值max_count表示建立实例的最大数目,返回值reservations表示建立的预定(分配)的资源的UUID的列表。
本文,接着上一篇内容:
OpenStack建立实例完整过程源码详细分析(8)
|