问题导读
1.哪个函数可以从原始请求中获取drive, part, account, container, obj等信息?
2.如何获取container所属account的信息,返回分区号、account的副本节点和container数目?
概述:
这篇文章主要关注swift-proxy与swift-container服务中PUT,POST,DELETE,GET,HEAD等方法的对应调用实现;
源码解析部分(代码中较重要的部分已经进行了相关的注释):
GETorHEAD
/swift/proxy/controllers/container.py----class ContainerController(Controller)----def GETorHEAD
- def GETorHEAD(self, req):
- """Handler for HTTP GET/HEAD requests."""
- if not self.account_info(self.account_name, req)[1]:
- return HTTPNotFound(request=req)
-
- # 获取指定container的对应分区号;
- part = self.app.container_ring.get_part(self.account_name, self.container_name)
- resp = self.GETorHEAD_base(
- req, _('Container'), self.app.container_ring, part,
- req.swift_entity_path)
-
- if 'swift.authorize' in req.environ:
- req.acl = resp.headers.get('x-container-read')
- aresp = req.environ['swift.authorize'](req)
- if aresp:
- return aresp
- if not req.environ.get('swift_owner', False):
- for key in self.app.swift_owner_headers:
- if key in resp.headers:
- del resp.headers[key]
- return resp
复制代码
/swift/container/server.py----class ContainerController(object)----def HEAD
- def HEAD(self, req):
- """
- HEAD请求返回container的基本信息(元数据信息),并以key-value的形式保存在HTTPHEAD中返回;
- """
- # 从原始请求中获取drive, part, account, container, obj等信息;
- drive, part, account, container, obj = split_and_validate_path(req, 4, 5, True)
- out_content_type = get_listing_content_type(req)
-
- # mount_check是是否进行mount检查;
- # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
- if self.mount_check and not check_mount(self.root, drive):
- return HTTPInsufficientStorage(drive=drive, request=req)
-
- # 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
- broker = self._get_container_broker(drive, part, account, container,
- pending_timeout=0.1,
- stale_reads_ok=True)
-
- if broker.is_deleted():
- return HTTPNotFound(request=req)
-
- # 获取全局数据;
- # 返回包括account, created_at, put_timestamp, delete_timestamp, container_count, object_count, bytes_used, hash, id等值的字典;
- info = broker.get_info()
- headers = {
- 'X-Container-Object-Count': info['object_count'],
- 'X-Container-Bytes-Used': info['bytes_used'],
- 'X-Timestamp': info['created_at'],
- 'X-PUT-Timestamp': info['put_timestamp'],
- }
- headers.update(
- (key, value)
- for key, (value, timestamp) in broker.metadata.iteritems()
- if value != '' and (key.lower() in self.save_headers or
- is_sys_or_user_meta('container', key)))
- headers['Content-Type'] = out_content_type
- return HTTPNoContent(request=req, headers=headers, charset='utf-8')
复制代码
/swift/container/server.py----class ContainerController(object)----def GET
- def GET(self, req):
- """
- 处理HTTP协议的GET请求;
- GET同HEAD一样,都是请求返回container的基本信息,并以key-value的形式保存在HTTPHEAD当中;
- 不同之处在于GET方法中获取了指定container下的object列表,存储在body中,同HTTPHEAD一同返回;
- """
- # 从原始请求中获取drive, part, account, container, obj等信息;
- drive, part, account, container, obj = split_and_validate_path(req, 4, 5, True)
- path = get_param(req, 'path')
- prefix = get_param(req, 'prefix')
- delimiter = get_param(req, 'delimiter')
-
- # mount_check是是否进行mount检查;
- # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
- if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):
- # delimiters can be made more flexible later
- return HTTPPreconditionFailed(body='Bad delimiter')
-
- marker = get_param(req, 'marker', '')
- end_marker = get_param(req, 'end_marker')
- limit = CONTAINER_LISTING_LIMIT
- given_limit = get_param(req, 'limit')
- if given_limit and given_limit.isdigit():
- limit = int(given_limit)
- if limit > CONTAINER_LISTING_LIMIT:
- return HTTPPreconditionFailed(
- request=req,
- body='Maximum limit is %d' % CONTAINER_LISTING_LIMIT)
-
- out_content_type = get_listing_content_type(req)
- if self.mount_check and not check_mount(self.root, drive):
- return HTTPInsufficientStorage(drive=drive, request=req)
-
- # 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
- broker = self._get_container_broker(drive, part, account, container,
- pending_timeout=0.1,
- stale_reads_ok=True)
-
- if broker.is_deleted():
- return HTTPNotFound(request=req)
-
- # 获取全局数据;
- # 返回包括account, created_at, put_timestamp, delete_timestamp, container_count, object_count, bytes_used, hash, id等值的字典;
- info = broker.get_info()
- resp_headers = {
- 'X-Container-Object-Count': info['object_count'],
- 'X-Container-Bytes-Used': info['bytes_used'],
- 'X-Timestamp': info['created_at'],
- 'X-PUT-Timestamp': info['put_timestamp'],
- }
- for key, (value, timestamp) in broker.metadata.iteritems():
- if value and (key.lower() in self.save_headers or is_sys_or_user_meta('container', key)):
- resp_headers[key] = value
- ret = Response(request=req, headers=resp_headers, content_type=out_content_type, charset='utf-8')
- # 获取objects排序列表;
- container_list = broker.list_objects_iter(limit, marker, end_marker, prefix, delimiter, path)
-
- if out_content_type == 'application/json':
- et.body = json.dumps([self.update_data_record(record)
- for record in container_list])
- elif out_content_type.endswith('/xml'):
- doc = Element('container', name=container.decode('utf-8'))
- for obj in container_list:
- record = self.update_data_record(obj)
- if 'subdir' in record:
- name = record['subdir'].decode('utf-8')
- sub = SubElement(doc, 'subdir', name=name)
- SubElement(sub, 'name').text = name
- else:
- obj_element = SubElement(doc, 'object')
- for field in ["name", "hash", "bytes", "content_type", "last_modified"]:
- SubElement(obj_element, field).text = str(record.pop(field)).decode('utf-8')
- for field in sorted(record):
- SubElement(obj_element, field).text = str(record[field]).decode('utf-8')
- ret.body = tostring(doc, encoding='UTF-8').replace(
- "",
- '', 1)
- else:
- if not container_list:
- return HTTPNoContent(request=req, headers=resp_headers)
- ret.body = '\n'.join(rec[0] for rec in container_list) + '\n'
- return ret
- PUT
复制代码
/swift/proxy/controllers/container.py----class ContainerController(Controller)----def PUT
- def PUT(self, req):
- """HTTP PUT request handler."""
- error_response = self.clean_acls(req) or check_metadata(req, 'container')
- if error_response:
- return error_response
- if not req.environ.get('swift_owner'):
- for key in self.app.swift_owner_headers:
- req.headers.pop(key, None)
- if len(self.container_name) > MAX_CONTAINER_NAME_LENGTH:
- resp = HTTPBadRequest(request=req)
- esp.body = 'Container name length of %d longer than %d' % (len(self.container_name), MAX_CONTAINER_NAME_LENGTH)
- return resp
-
- # 获取container所属account的信息,返回分区号、account的副本节点和container数目;
- account_partition, accounts, container_count = self.account_info(self.account_name, req)
-
- if not accounts and self.app.account_autocreate:
- self.autocreate_account(req.environ, self.account_name)
- account_partition, accounts, container_count = self.account_info(self.account_name, req)
-
- if not accounts:
- return HTTPNotFound(request=req)
-
- if self.app.max_containers_per_account > 0 and \
- container_count >= self.app.max_containers_per_account and \
- self.account_name not in self.app.max_containers_whitelist:
- resp = HTTPForbidden(request=req)
- resp.body = 'Reached container limit of %s' % self.app.max_containers_per_account
- return resp
-
- # 获取指定container的分区号和所有副本节点;
- container_partition, containers = self.app.container_ring.get_nodes(self.account_name, self.container_name)
-
- headers = self._backend_requests(req, len(containers), account_partition, accounts)
-
- # 清除memcache和env中的缓存信息;
- clear_info_cache(self.app, req.environ, self.account_name, self.container_name)
-
- resp = self.make_requests(
- req, self.app.container_ring,
- container_partition, 'PUT', req.swift_entity_path, headers)
-
- return resp
复制代码
/swift/container/server.py----class ContainerController(object)----def PUT
- def PUT(self, req):
- """
- 如果包含object信息,根据元数据在数据库中实现建立一个object;
- 如果不包含object信息,实现更新container的数据库中的元数据信息,并调用account_update通知account-server更新状态;
- """
- # 从原始请求中获取drive, part, account, container, obj等信息;
- drive, part, account, container, obj = split_and_validate_path(req, 4, 5, True)
-
- if 'x-timestamp' not in req.headers or not check_float(req.headers['x-timestamp']):
- return HTTPBadRequest(body='Missing timestamp', request=req, content_type='text/plain')
- if 'x-container-sync-to' in req.headers:
- err, sync_to, realm, realm_key = validate_sync_to(
- req.headers['x-container-sync-to'], self.allowed_sync_hosts,
- self.realms_conf)
- if err:
- return HTTPBadRequest(err)
-
- if self.mount_check and not check_mount(self.root, drive):
- return HTTPInsufficientStorage(drive=drive, request=req)
- timestamp = normalize_timestamp(req.headers['x-timestamp'])
-
- # 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
- broker = self._get_container_broker(drive, part, account, container)
-
- # 如果包含object信息,则序列化后写入db_file.pending文件;
- # 如果包含object信息,根据元数据在数据库中实现建立一个object;
- if obj: # put container object
- if account.startswith(self.auto_create_account_prefix) and not os.path.exists(broker.db_file):
-
- # 数据库初始化;
- try:
- broker.initialize(timestamp)
- except DatabaseAlreadyExists:
- pass
- if not os.path.exists(broker.db_file):
- return HTTPNotFound()
-
- # 根据相关的metadata在数据库中建立一个object;
- broker.put_object(obj, timestamp, int(req.headers['x-size']),
- req.headers['x-content-type'],
- req.headers['x-etag'])
- return HTTPCreated(request=req)
- # 如果不包含object信息,则根据request.head中的key-value更新container_statu数据库的metadata;
- # 并调用account_update通知account-server更新状态;
- # 如果不包含object信息,实现更新container的数据库中的元数据信息;
- else: # put container
- created = self._update_or_create(req, broker, timestamp)
- metadata = {}
- metadata.update(
- (key, (value, timestamp))
- for key, value in req.headers.iteritems()
- if key.lower() in self.save_headers or
- is_sys_or_user_meta('container', key))
- if metadata:
- if 'X-Container-Sync-To' in metadata:
- if 'X-Container-Sync-To' not in broker.metadata or \
- metadata['X-Container-Sync-To'][0] != \
- broker.metadata['X-Container-Sync-To'][0]:
- broker.set_x_container_sync_points(-1, -1)
- # 更新数据库的元数据字典;
- broker.update_metadata(metadata)
-
- # 根据最新的container信息更新account服务;
- # 用于在对container做删除/修改操作时通知其所属account做同步修改;
- # 主要部分就是向account所在server_ip发送PUT请求,URL格式为:
- # PUThttp://{account_ip}:{account_port}/{account_device}/{account_partition}/{account}/{container}
- resp = self.account_update(req, account, container, broker)
- if resp:
- return resp
-
- if created:
- return HTTPCreated(request=req)
- else:
- return HTTPAccepted(request=req)
- POST
复制代码
/swift/proxy/controllers/container.py----class ContainerController(Controller)----def POST
- def POST(self, req):
- """HTTP POST request handler."""
- error_response = self.clean_acls(req) or check_metadata(req, 'container')
- if error_response:
- return error_response
- if not req.environ.get('swift_owner'):
- for key in self.app.swift_owner_headers:
- req.headers.pop(key, None)
-
- # 获取container所属account的信息,返回分区号、account的副本节点和container数目;
- account_partition, accounts, container_count = self.account_info(self.account_name, req)
-
- if not accounts:
- return HTTPNotFound(request=req)
-
- # 获取指定container的分区号和所有副本节点;
- container_partition, containers = self.app.container_ring.get_nodes(self.account_name, self.container_name)
-
- headers = self.generate_request_headers(req, transfer=True)
- clear_info_cache(self.app, req.environ, self.account_name, self.container_name)
-
- resp = self.make_requests(
- req, self.app.container_ring, container_partition, 'POST',
- req.swift_entity_path, [headers] * len(containers))
-
- return resp
复制代码
/swift/container/server.py----class ContainerController(object)----def POST
- def POST(self, req):
- """
- 实现更新container的元数据信息,从head中取出特定要求的metadata更新至指定container的数据库;
- """
- # 从原始请求中获取drive, part, account, container, obj等信息;
- drive, part, account, container = split_and_validate_path(req, 4)
- if 'x-timestamp' not in req.headers or not check_float(req.headers['x-timestamp']):
- return HTTPBadRequest(body='Missing or bad timestamp', request=req, content_type='text/plain')
- if 'x-container-sync-to' in req.headers:
- err, sync_to, realm, realm_key = validate_sync_to(
- req.headers['x-container-sync-to'], self.allowed_sync_hosts,
- self.realms_conf)
- if err:
- return HTTPBadRequest(err)
-
- # mount_check是是否进行mount检查;
- # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
- if self.mount_check and not check_mount(self.root, drive):
- return HTTPInsufficientStorage(drive=drive, request=req)
-
- # 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
- broker = self._get_container_broker(drive, part, account, container)
-
- if broker.is_deleted():
- return HTTPNotFound(request=req)
-
- # 把timestamp(时间戳)转换为标准格式;
- timestamp = normalize_timestamp(req.headers['x-timestamp'])
-
- metadata = {}
- metadata.update(
- (key, (value, timestamp)) for key, value in req.headers.iteritems()
- if key.lower() in self.save_headers or
- is_sys_or_user_meta('container', key))
-
- # 然后从head中取出特定要求的metadata更新至数据库;
- if metadata:
- if 'X-Container-Sync-To' in metadata:
- if 'X-Container-Sync-To' not in broker.metadata or \
- metadata['X-Container-Sync-To'][0] != \
- broker.metadata['X-Container-Sync-To'][0]:
- broker.set_x_container_sync_points(-1, -1)
- broker.update_metadata(metadata)
-
- return HTTPNoContent(request=req)
复制代码
DELETE
/swift/proxy/controllers/container.py----class ContainerController(Controller)----def DELETE
- def DELETE(self, req):
- """HTTP DELETE request handler."""
- # 获取container所属account的信息,返回分区号、account的副本节点和container数目;
- account_partition, accounts, container_count = self.account_info(self.account_name, req)
-
- if not accounts:
- return HTTPNotFound(request=req)
-
- # 获取指定container的分区号和所有副本节点;
- container_partition, containers = self.app.container_ring.get_nodes(self.account_name, self.container_name)
-
- headers = self._backend_requests(req, len(containers), account_partition, accounts)
- clear_info_cache(self.app, req.environ, self.account_name, self.container_name)
-
- resp = self.make_requests(
- req, self.app.container_ring, container_partition, 'DELETE',
- req.swift_entity_path, headers)
-
- # Indicates no server had the container
- if resp.status_int == HTTP_ACCEPTED:
- return HTTPNotFound(request=req)
- return resp
复制代码
/swift/container/server.py----class ContainerController(object)----def DELETE
- def DELETE(self, req):
- """
- 输入的URL格式为host:port/device/partition/account/container/
- 如果没有object字段,说明是删除container,过程和Account的DELETE操作一样,
- 先进行一系列检查,然后根据db_file.pengding文件刷新数据库到最新状态并检查是否已经删除,
- 如果status字段不为DELETED,清空数据库中的metadata字段,
- 更新delete_timestamp然后置status字段为DELETED,
- 最后调用account_update通知其所属account更新状态;
-
- 如果URL中包含object字段,则是为了在对其所包含的Object进行操作后同步更新container,
- 这里会调用ContainerBroker#delete_object,同样也是将删除信息序列化后写入db_file.pending文件,待下次对该container操作时更新进数据库;
-
- 删除指定的container或者删除指定container中的指定object;
- 如果URL中没有object字段,说明是删除container;
- 如果URL中包含object字段,标志指定的object为deleted;
- """
- # 从原始请求中获取drive, part, account, container, obj等信息;
- drive, part, account, container, obj = split_and_validate_path(req, 4, 5, True)
-
- if 'x-timestamp' not in req.headers or \
- not check_float(req.headers['x-timestamp']):
- return HTTPBadRequest(body='Missing timestamp', request=req, content_type='text/plain')
-
- # mount_check是是否进行mount检查;
- # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
- if self.mount_check and not check_mount(self.root, drive):
- return HTTPInsufficientStorage(drive=drive, request=req)
-
- # 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
- broker = self._get_container_broker(drive, part, account, container)
-
- # initialize:数据库初始化;
- if account.startswith(self.auto_create_account_prefix) and obj and \
- not os.path.exists(broker.db_file):
- try:
- broker.initialize(normalize_timestamp(req.headers.get('x-timestamp') or time.time()))
- except DatabaseAlreadyExists:
- pass
- if not os.path.exists(broker.db_file):
- return HTTPNotFound()
-
- # 如果存在obj,则执行标志object状态为deleted的操作;
- # delete_object:标志object状态为deleted;
- # 如果URL中包含object字段,则是为了在对其所包含的Object进行操作后同步更新container,
- # 这里会调用ContainerBroker#delete_object,同样也是将删除信息序列化后写入db_file.pending文件,
- # 待下次对该container操作时更新进数据库;
- # 标志指定的object为deleted;
- if obj: # delete object
- broker.delete_object(obj, req.headers.get('x-timestamp'))
- return HTTPNoContent(request=req)
-
- # 如果没有object字段,说明是删除container,过程和Account的DELETE操作一样,
- # 先进行一系列检查,然后根据db_file.pengding文件刷新数据库到最新状态并检查是否已经删除,
- # 如果status字段不为DELETED,清空数据库中的metadata字段,
- # 更新delete_timestamp然后置status字段为DELETED,
- # 最后调用account_update通知其所属account更新状态;
- else:
- # delete container
- # 检测container DB是否为空;
- if not broker.empty():
- return HTTPConflict(request=req)
- existed = float(broker.get_info()['put_timestamp']) and not broker.is_deleted()
- # 对数据库中的对象进行删除状态的标记工作,并不会执行文件的删除工作;
- broker.delete_db(req.headers['X-Timestamp'])
- if not broker.is_deleted():
- return HTTPConflict(request=req)
- # 根据最新的container信息更新account服务;
- # 用于在对container做删除/修改操作时通知其所属account做同步修改;
- # 主要部分就是向account所在server_ip发送PUT请求,URL格式为:
- # PUThttp://{account_ip}:{account_port}/{account_device}/{account_partition}/{account}/{container}
- resp = self.account_update(req, account, container, broker)
- if resp:
- return resp
-
- if existed:
- return HTTPNoContent(request=req)
- return HTTPNotFound()
复制代码
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
|