分享

Swift源码分析----swift-proxy与swift-container

tntzbzc 发表于 2014-11-20 15:34:47 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 13962
问题导读

1.哪个函数可以从原始请求中获取drive, part, account, container, obj等信息?
2.如何获取container所属account的信息,返回分区号、account的副本节点和container数目?






概述:
这篇文章主要关注swift-proxy与swift-container服务中PUT,POST,DELETE,GET,HEAD等方法的对应调用实现;

源码解析部分(代码中较重要的部分已经进行了相关的注释):

GETorHEAD
/swift/proxy/controllers/container.py----class ContainerController(Controller)----def GETorHEAD
  1. def GETorHEAD(self, req):
  2. """Handler for HTTP GET/HEAD requests."""
  3.     if not self.account_info(self.account_name, req)[1]:
  4.         return HTTPNotFound(request=req)
  5.         
  6.     # 获取指定container的对应分区号;
  7.     part = self.app.container_ring.get_part(self.account_name, self.container_name)
  8.     resp = self.GETorHEAD_base(
  9.         req, _('Container'), self.app.container_ring, part,
  10.         req.swift_entity_path)
  11.         
  12.     if 'swift.authorize' in req.environ:
  13.         req.acl = resp.headers.get('x-container-read')
  14.         aresp = req.environ['swift.authorize'](req)
  15.         if aresp:
  16.             return aresp
  17.     if not req.environ.get('swift_owner', False):
  18.         for key in self.app.swift_owner_headers:
  19.             if key in resp.headers:
  20.                 del resp.headers[key]
  21.     return resp
复制代码



/swift/container/server.py----class ContainerController(object)----def HEAD
  1. def HEAD(self, req):
  2.     """
  3.     HEAD请求返回container的基本信息(元数据信息),并以key-value的形式保存在HTTPHEAD中返回;
  4.     """
  5.     # 从原始请求中获取drive, part, account, container, obj等信息;
  6.     drive, part, account, container, obj = split_and_validate_path(req, 4, 5, True)
  7.     out_content_type = get_listing_content_type(req)
  8.         
  9.     # mount_check是是否进行mount检查;
  10.     # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
  11.     if self.mount_check and not check_mount(self.root, drive):
  12.         return HTTPInsufficientStorage(drive=drive, request=req)
  13.         
  14.     # 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
  15.     broker = self._get_container_broker(drive, part, account, container,
  16.                                         pending_timeout=0.1,
  17.                                         stale_reads_ok=True)
  18.         
  19.     if broker.is_deleted():
  20.         return HTTPNotFound(request=req)
  21.         
  22.     # 获取全局数据;
  23.     # 返回包括account, created_at, put_timestamp, delete_timestamp, container_count, object_count, bytes_used, hash, id等值的字典;
  24.     info = broker.get_info()
  25.     headers = {
  26.             'X-Container-Object-Count': info['object_count'],
  27.             'X-Container-Bytes-Used': info['bytes_used'],
  28.             'X-Timestamp': info['created_at'],
  29.             'X-PUT-Timestamp': info['put_timestamp'],
  30.         }
  31.     headers.update(
  32.             (key, value)
  33.             for key, (value, timestamp) in broker.metadata.iteritems()
  34.             if value != '' and (key.lower() in self.save_headers or
  35.                                 is_sys_or_user_meta('container', key)))
  36.     headers['Content-Type'] = out_content_type
  37.     return HTTPNoContent(request=req, headers=headers, charset='utf-8')
复制代码



/swift/container/server.py----class ContainerController(object)----def GET
  1. def GET(self, req):
  2.     """
  3.     处理HTTP协议的GET请求;
  4.     GET同HEAD一样,都是请求返回container的基本信息,并以key-value的形式保存在HTTPHEAD当中;
  5.     不同之处在于GET方法中获取了指定container下的object列表,存储在body中,同HTTPHEAD一同返回;
  6.     """
  7.     # 从原始请求中获取drive, part, account, container, obj等信息;
  8.     drive, part, account, container, obj = split_and_validate_path(req, 4, 5, True)
  9.     path = get_param(req, 'path')
  10.     prefix = get_param(req, 'prefix')
  11.     delimiter = get_param(req, 'delimiter')
  12.         
  13.     # mount_check是是否进行mount检查;
  14.     # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
  15.     if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):
  16.         # delimiters can be made more flexible later
  17.         return HTTPPreconditionFailed(body='Bad delimiter')
  18.         
  19.     marker = get_param(req, 'marker', '')
  20.     end_marker = get_param(req, 'end_marker')
  21.     limit = CONTAINER_LISTING_LIMIT
  22.     given_limit = get_param(req, 'limit')
  23.     if given_limit and given_limit.isdigit():
  24.         limit = int(given_limit)
  25.         if limit > CONTAINER_LISTING_LIMIT:
  26.             return HTTPPreconditionFailed(
  27.                 request=req,
  28.                 body='Maximum limit is %d' % CONTAINER_LISTING_LIMIT)
  29.                
  30.     out_content_type = get_listing_content_type(req)
  31.     if self.mount_check and not check_mount(self.root, drive):
  32.         return HTTPInsufficientStorage(drive=drive, request=req)
  33.         
  34.     # 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
  35.     broker = self._get_container_broker(drive, part, account, container,
  36.                                         pending_timeout=0.1,
  37.                                         stale_reads_ok=True)
  38.         
  39.     if broker.is_deleted():
  40.         return HTTPNotFound(request=req)
  41.         
  42.     # 获取全局数据;
  43.     # 返回包括account, created_at, put_timestamp, delete_timestamp, container_count, object_count, bytes_used, hash, id等值的字典;
  44.     info = broker.get_info()
  45.     resp_headers = {
  46.             'X-Container-Object-Count': info['object_count'],
  47.             'X-Container-Bytes-Used': info['bytes_used'],
  48.             'X-Timestamp': info['created_at'],
  49.             'X-PUT-Timestamp': info['put_timestamp'],
  50.         }
  51.     for key, (value, timestamp) in broker.metadata.iteritems():
  52.         if value and (key.lower() in self.save_headers or is_sys_or_user_meta('container', key)):
  53.             resp_headers[key] = value
  54.     ret = Response(request=req, headers=resp_headers, content_type=out_content_type, charset='utf-8')
  55.     # 获取objects排序列表;
  56.     container_list = broker.list_objects_iter(limit, marker, end_marker, prefix, delimiter, path)
  57.         
  58.     if out_content_type == 'application/json':
  59.         et.body = json.dumps([self.update_data_record(record)
  60.                                    for record in container_list])
  61.     elif out_content_type.endswith('/xml'):
  62.         doc = Element('container', name=container.decode('utf-8'))
  63.         for obj in container_list:
  64.             record = self.update_data_record(obj)
  65.             if 'subdir' in record:
  66.                 name = record['subdir'].decode('utf-8')
  67.                 sub = SubElement(doc, 'subdir', name=name)
  68.                 SubElement(sub, 'name').text = name
  69.             else:
  70.                 obj_element = SubElement(doc, 'object')
  71.                 for field in ["name", "hash", "bytes", "content_type", "last_modified"]:
  72.                     SubElement(obj_element, field).text = str(record.pop(field)).decode('utf-8')
  73.                 for field in sorted(record):
  74.                     SubElement(obj_element, field).text = str(record[field]).decode('utf-8')
  75.         ret.body = tostring(doc, encoding='UTF-8').replace(
  76.             "",
  77.             '', 1)
  78.     else:
  79.         if not container_list:
  80.             return HTTPNoContent(request=req, headers=resp_headers)
  81.         ret.body = '\n'.join(rec[0] for rec in container_list) + '\n'
  82.     return ret
  83. PUT
复制代码


/swift/proxy/controllers/container.py----class ContainerController(Controller)----def PUT

  1. def PUT(self, req):
  2. """HTTP PUT request handler."""
  3.     error_response = self.clean_acls(req) or check_metadata(req, 'container')
  4.     if error_response:
  5.         return error_response
  6.     if not req.environ.get('swift_owner'):
  7.         for key in self.app.swift_owner_headers:
  8.             req.headers.pop(key, None)
  9.     if len(self.container_name) > MAX_CONTAINER_NAME_LENGTH:
  10.         resp = HTTPBadRequest(request=req)
  11.         esp.body = 'Container name length of %d longer than %d' % (len(self.container_name), MAX_CONTAINER_NAME_LENGTH)
  12.         return resp
  13.         
  14.     # 获取container所属account的信息,返回分区号、account的副本节点和container数目;
  15.     account_partition, accounts, container_count = self.account_info(self.account_name, req)
  16.         
  17.     if not accounts and self.app.account_autocreate:
  18.         self.autocreate_account(req.environ, self.account_name)
  19.         account_partition, accounts, container_count = self.account_info(self.account_name, req)
  20.                
  21.     if not accounts:
  22.         return HTTPNotFound(request=req)
  23.         
  24.     if self.app.max_containers_per_account > 0 and \
  25.             container_count >= self.app.max_containers_per_account and \
  26.             self.account_name not in self.app.max_containers_whitelist:
  27.         resp = HTTPForbidden(request=req)
  28.         resp.body = 'Reached container limit of %s' % self.app.max_containers_per_account
  29.         return resp
  30.         
  31.     # 获取指定container的分区号和所有副本节点;
  32.     container_partition, containers = self.app.container_ring.get_nodes(self.account_name, self.container_name)
  33.         
  34.     headers = self._backend_requests(req, len(containers), account_partition, accounts)
  35.         
  36.     # 清除memcache和env中的缓存信息;
  37.     clear_info_cache(self.app, req.environ, self.account_name, self.container_name)
  38.         
  39.     resp = self.make_requests(
  40.         req, self.app.container_ring,
  41.         container_partition, 'PUT', req.swift_entity_path, headers)
  42.         
  43.     return resp
复制代码


/swift/container/server.py----class ContainerController(object)----def PUT

  1. def PUT(self, req):
  2.     """
  3.     如果包含object信息,根据元数据在数据库中实现建立一个object;
  4.     如果不包含object信息,实现更新container的数据库中的元数据信息,并调用account_update通知account-server更新状态;
  5.     """
  6.     # 从原始请求中获取drive, part, account, container, obj等信息;
  7.     drive, part, account, container, obj = split_and_validate_path(req, 4, 5, True)
  8.         
  9.     if 'x-timestamp' not in req.headers or not check_float(req.headers['x-timestamp']):
  10.         return HTTPBadRequest(body='Missing timestamp', request=req, content_type='text/plain')
  11.     if 'x-container-sync-to' in req.headers:
  12.         err, sync_to, realm, realm_key = validate_sync_to(
  13.             req.headers['x-container-sync-to'], self.allowed_sync_hosts,
  14.             self.realms_conf)
  15.         if err:
  16.             return HTTPBadRequest(err)
  17.             
  18.     if self.mount_check and not check_mount(self.root, drive):
  19.         return HTTPInsufficientStorage(drive=drive, request=req)
  20.     timestamp = normalize_timestamp(req.headers['x-timestamp'])
  21.         
  22.     # 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
  23.     broker = self._get_container_broker(drive, part, account, container)
  24.         
  25.     # 如果包含object信息,则序列化后写入db_file.pending文件;
  26.     # 如果包含object信息,根据元数据在数据库中实现建立一个object;
  27.     if obj:     # put container object
  28.         if account.startswith(self.auto_create_account_prefix) and not os.path.exists(broker.db_file):
  29.                
  30.             # 数据库初始化;
  31.             try:
  32.                 broker.initialize(timestamp)
  33.             except DatabaseAlreadyExists:
  34.                 pass
  35.         if not os.path.exists(broker.db_file):
  36.             return HTTPNotFound()
  37.             
  38.         # 根据相关的metadata在数据库中建立一个object;
  39.         broker.put_object(obj, timestamp, int(req.headers['x-size']),
  40.                           req.headers['x-content-type'],
  41.                           req.headers['x-etag'])
  42.         return HTTPCreated(request=req)
  43.     # 如果不包含object信息,则根据request.head中的key-value更新container_statu数据库的metadata;
  44.     # 并调用account_update通知account-server更新状态;
  45.     # 如果不包含object信息,实现更新container的数据库中的元数据信息;
  46.     else:   # put container
  47.         created = self._update_or_create(req, broker, timestamp)
  48.         metadata = {}
  49.         metadata.update(
  50.             (key, (value, timestamp))
  51.             for key, value in req.headers.iteritems()
  52.             if key.lower() in self.save_headers or
  53.             is_sys_or_user_meta('container', key))
  54.         if metadata:
  55.             if 'X-Container-Sync-To' in metadata:
  56.                 if 'X-Container-Sync-To' not in broker.metadata or \
  57.                         metadata['X-Container-Sync-To'][0] != \
  58.                         broker.metadata['X-Container-Sync-To'][0]:
  59.                     broker.set_x_container_sync_points(-1, -1)
  60.             # 更新数据库的元数据字典;
  61.             broker.update_metadata(metadata)
  62.             
  63.         # 根据最新的container信息更新account服务;     
  64.         # 用于在对container做删除/修改操作时通知其所属account做同步修改;
  65.         # 主要部分就是向account所在server_ip发送PUT请求,URL格式为:
  66.         # PUThttp://{account_ip}:{account_port}/{account_device}/{account_partition}/{account}/{container}
  67.         resp = self.account_update(req, account, container, broker)
  68.         if resp:
  69.             return resp
  70.         
  71.         if created:
  72.             return HTTPCreated(request=req)
  73.         else:
  74.             return HTTPAccepted(request=req)
  75. POST
复制代码


/swift/proxy/controllers/container.py----class ContainerController(Controller)----def POST

  1. def POST(self, req):
  2. """HTTP POST request handler."""
  3.     error_response = self.clean_acls(req) or check_metadata(req, 'container')
  4.     if error_response:
  5.         return error_response
  6.     if not req.environ.get('swift_owner'):
  7.         for key in self.app.swift_owner_headers:
  8.             req.headers.pop(key, None)
  9.         
  10.     # 获取container所属account的信息,返回分区号、account的副本节点和container数目;
  11.     account_partition, accounts, container_count = self.account_info(self.account_name, req)
  12.         
  13.     if not accounts:
  14.         return HTTPNotFound(request=req)
  15.         
  16.     # 获取指定container的分区号和所有副本节点;
  17.     container_partition, containers = self.app.container_ring.get_nodes(self.account_name, self.container_name)
  18.         
  19.     headers = self.generate_request_headers(req, transfer=True)
  20.     clear_info_cache(self.app, req.environ, self.account_name, self.container_name)
  21.         
  22.     resp = self.make_requests(
  23.             req, self.app.container_ring, container_partition, 'POST',
  24.             req.swift_entity_path, [headers] * len(containers))
  25.         
  26.     return resp
复制代码


/swift/container/server.py----class ContainerController(object)----def POST

  1. def POST(self, req):
  2.     """
  3.     实现更新container的元数据信息,从head中取出特定要求的metadata更新至指定container的数据库;
  4.     """
  5.     # 从原始请求中获取drive, part, account, container, obj等信息;
  6.     drive, part, account, container = split_and_validate_path(req, 4)
  7.     if 'x-timestamp' not in req.headers or not check_float(req.headers['x-timestamp']):
  8.         return HTTPBadRequest(body='Missing or bad timestamp', request=req, content_type='text/plain')
  9.     if 'x-container-sync-to' in req.headers:
  10.         err, sync_to, realm, realm_key = validate_sync_to(
  11.             req.headers['x-container-sync-to'], self.allowed_sync_hosts,
  12.             self.realms_conf)
  13.         if err:
  14.             return HTTPBadRequest(err)
  15.         
  16.     # mount_check是是否进行mount检查;
  17.     # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
  18.     if self.mount_check and not check_mount(self.root, drive):
  19.         return HTTPInsufficientStorage(drive=drive, request=req)
  20.         
  21.     # 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
  22.     broker = self._get_container_broker(drive, part, account, container)
  23.         
  24.     if broker.is_deleted():
  25.         return HTTPNotFound(request=req)
  26.         
  27.     # 把timestamp(时间戳)转换为标准格式;
  28.     timestamp = normalize_timestamp(req.headers['x-timestamp'])
  29.       
  30.     metadata = {}
  31.     metadata.update(
  32.         (key, (value, timestamp)) for key, value in req.headers.iteritems()
  33.         if key.lower() in self.save_headers or
  34.         is_sys_or_user_meta('container', key))
  35.         
  36.     # 然后从head中取出特定要求的metadata更新至数据库;
  37.     if metadata:
  38.         if 'X-Container-Sync-To' in metadata:
  39.             if 'X-Container-Sync-To' not in broker.metadata or \
  40.                     metadata['X-Container-Sync-To'][0] != \
  41.                     broker.metadata['X-Container-Sync-To'][0]:
  42.                 broker.set_x_container_sync_points(-1, -1)
  43.         broker.update_metadata(metadata)
  44.         
  45.     return HTTPNoContent(request=req)
复制代码


DELETE
/swift/proxy/controllers/container.py----class ContainerController(Controller)----def DELETE

  1. def DELETE(self, req):
  2. """HTTP DELETE request handler."""
  3.     # 获取container所属account的信息,返回分区号、account的副本节点和container数目;
  4.     account_partition, accounts, container_count = self.account_info(self.account_name, req)
  5.         
  6.     if not accounts:
  7.         return HTTPNotFound(request=req)
  8.         
  9.     # 获取指定container的分区号和所有副本节点;
  10.     container_partition, containers = self.app.container_ring.get_nodes(self.account_name, self.container_name)
  11.         
  12.     headers = self._backend_requests(req, len(containers), account_partition, accounts)
  13.     clear_info_cache(self.app, req.environ, self.account_name, self.container_name)
  14.         
  15.     resp = self.make_requests(
  16.         req, self.app.container_ring, container_partition, 'DELETE',
  17.         req.swift_entity_path, headers)
  18.         
  19.     # Indicates no server had the container
  20.     if resp.status_int == HTTP_ACCEPTED:
  21.         return HTTPNotFound(request=req)
  22.     return resp
复制代码


/swift/container/server.py----class ContainerController(object)----def DELETE

  1. def DELETE(self, req):
  2.     """
  3.     输入的URL格式为host:port/device/partition/account/container/
  4.     如果没有object字段,说明是删除container,过程和Account的DELETE操作一样,
  5.     先进行一系列检查,然后根据db_file.pengding文件刷新数据库到最新状态并检查是否已经删除,
  6.     如果status字段不为DELETED,清空数据库中的metadata字段,
  7.     更新delete_timestamp然后置status字段为DELETED,
  8.     最后调用account_update通知其所属account更新状态;
  9.         
  10.     如果URL中包含object字段,则是为了在对其所包含的Object进行操作后同步更新container,
  11.     这里会调用ContainerBroker#delete_object,同样也是将删除信息序列化后写入db_file.pending文件,待下次对该container操作时更新进数据库;
  12.         
  13.     删除指定的container或者删除指定container中的指定object;
  14.     如果URL中没有object字段,说明是删除container;
  15.     如果URL中包含object字段,标志指定的object为deleted;
  16.     """
  17.     # 从原始请求中获取drive, part, account, container, obj等信息;
  18.     drive, part, account, container, obj = split_and_validate_path(req, 4, 5, True)
  19.         
  20.     if 'x-timestamp' not in req.headers or \
  21.             not check_float(req.headers['x-timestamp']):
  22.         return HTTPBadRequest(body='Missing timestamp', request=req, content_type='text/plain')
  23.         
  24.     # mount_check是是否进行mount检查;
  25.     # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
  26.     if self.mount_check and not check_mount(self.root, drive):
  27.         return HTTPInsufficientStorage(drive=drive, request=req)
  28.         
  29.     # 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
  30.     broker = self._get_container_broker(drive, part, account, container)
  31.         
  32.     # initialize:数据库初始化;
  33.     if account.startswith(self.auto_create_account_prefix) and obj and \
  34.             not os.path.exists(broker.db_file):
  35.         try:
  36.             broker.initialize(normalize_timestamp(req.headers.get('x-timestamp') or time.time()))
  37.         except DatabaseAlreadyExists:
  38.             pass
  39.     if not os.path.exists(broker.db_file):
  40.         return HTTPNotFound()
  41.         
  42.     # 如果存在obj,则执行标志object状态为deleted的操作;
  43.     # delete_object:标志object状态为deleted;
  44.     # 如果URL中包含object字段,则是为了在对其所包含的Object进行操作后同步更新container,
  45.     # 这里会调用ContainerBroker#delete_object,同样也是将删除信息序列化后写入db_file.pending文件,
  46.     # 待下次对该container操作时更新进数据库;
  47.     # 标志指定的object为deleted;
  48.     if obj:     # delete object
  49.         broker.delete_object(obj, req.headers.get('x-timestamp'))
  50.         return HTTPNoContent(request=req)
  51.         
  52.     # 如果没有object字段,说明是删除container,过程和Account的DELETE操作一样,
  53.     # 先进行一系列检查,然后根据db_file.pengding文件刷新数据库到最新状态并检查是否已经删除,
  54.     # 如果status字段不为DELETED,清空数据库中的metadata字段,
  55.     # 更新delete_timestamp然后置status字段为DELETED,
  56.     # 最后调用account_update通知其所属account更新状态;
  57.     else:
  58.         # delete container
  59.         # 检测container DB是否为空;
  60.         if not broker.empty():
  61.             return HTTPConflict(request=req)
  62.         existed = float(broker.get_info()['put_timestamp']) and not broker.is_deleted()
  63.         # 对数据库中的对象进行删除状态的标记工作,并不会执行文件的删除工作;
  64.         broker.delete_db(req.headers['X-Timestamp'])
  65.         if not broker.is_deleted():
  66.             return HTTPConflict(request=req)
  67.         # 根据最新的container信息更新account服务;  
  68.         # 用于在对container做删除/修改操作时通知其所属account做同步修改;
  69.         # 主要部分就是向account所在server_ip发送PUT请求,URL格式为:
  70.         # PUThttp://{account_ip}:{account_port}/{account_device}/{account_partition}/{account}/{container}
  71.         resp = self.account_update(req, account, container, broker)
  72.         if resp:
  73.             return resp
  74.             
  75.         if existed:
  76.             return HTTPNoContent(request=req)
  77.         return HTTPNotFound()
复制代码







博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn




没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条