分享

Swift源码分析----swift-proxy与swift-object

tntzbzc 发表于 2014-11-20 15:34:43 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 15815
导读
代码诠释每个人都有自己的理解,尽信书不如无书,更多的是参考和交流
1.更新object的元数据信息包含哪些流程?
2.新建(上传数据)/更新一个object对象有哪些流程?





概述:
这篇文章主要关注swift-proxy与swift-object服务中PUT,POST,DELETE,GET,HEAD等方法的对应调用实现;

源码解析部分(代码中较重要的部分已经进行了相关的注释):


GETorHEAD
/swift/proxy/controllers/obj.py----class ContainerController(Controller)----def GETorHEAD

  1. def GETorHEAD(self, req):
  2.     """
  3.     处理HTTP协议GET或者HEAD请求;
  4.     """
  5.     # 获取指定object所属的container的信息;
  6.     container_info = self.container_info(self.account_name, self.container_name, req)
  7.         
  8.     req.acl = container_info['read_acl']
  9.     if 'swift.authorize' in req.environ:
  10.         aresp = req.environ['swift.authorize'](req)
  11.         if aresp:
  12.             return aresp
  13.     # 获取指定object所对应的分区号;
  14.     partition = self.app.object_ring.get_part(self.account_name, self.container_name, self.object_name)
  15.         
  16.     resp = self.GETorHEAD_base(
  17.             req, _('Object'), self.app.object_ring, partition,
  18.             req.swift_entity_path)
  19.     if ';' in resp.headers.get('content-type', ''):
  20.         # strip off swift_bytes from content-type
  21.         content_type, check_extra_meta = resp.headers['content-type'].rsplit(';', 1)
  22.         if check_extra_meta.lstrip().startswith('swift_bytes='):
  23.             resp.content_type = content_type
  24.     return resp
复制代码


/swift/obj/server.py----class ContainerController(object)----def HEAD

  1. def HEAD(self, request):
  2.     """
  3.     检索返回一个object的metadata,同GET请求的处理方法几乎一致,唯一不同的是不在body中返回file;
  4.     """
  5.     device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
  6.     try:
  7.         disk_file = self.get_diskfile(device, partition, account, container, obj)
  8.     except DiskFileDeviceUnavailable:
  9.         return HTTPInsufficientStorage(drive=device, request=request)
  10.     try:
  11.         metadata = disk_file.read_metadata()
  12.     except (DiskFileNotExist, DiskFileQuarantined):
  13.         return HTTPNotFound(request=request, conditional_response=True)
  14.     response = Response(request=request, conditional_response=True)
  15.     response.headers['Content-Type'] = metadata.get('Content-Type', 'application/octet-stream')
  16.     for key, value in metadata.iteritems():
  17.         if is_user_meta('object', key) or key.lower() in self.allowed_headers:
  18.             response.headers[key] = value
  19.     response.etag = metadata['ETag']
  20.     ts = metadata['X-Timestamp']
  21.     response.last_modified = math.ceil(float(ts))
  22.     # Needed for container sync feature
  23.     response.headers['X-Timestamp'] = ts
  24.     response.content_length = int(metadata['Content-Length'])
  25.     try:
  26.         response.content_encoding = metadata['Content-Encoding']
  27.     except KeyError:
  28.         pass
  29.     return response
复制代码


/swift/obj/server.py----class ContainerController(object)----def GET

  1. def GET(self, request):
  2.     """
  3.     检索一个object对象,在response.heads中返回metadata,在response.body中返回objectdata,流程如下:
  4.     1.根据url中的信息新建DiskFile对象file,检查request.heads中的必要K-V,检查mount情况;
  5.     2.如果file#is_deleted或者file.metadata中'X-Delete-At'小于当前时间(表示已标记为准备删除)
  6.       或者通过file#get_data_file_size查看文件是否异常,如果已经删除或存在异常,返回404HTTPNotFound;
  7.     3.检查request.heads里的'If-match'和'If-none-match',前者检查file.metadata中的'ETag'是否与其一致确定所检索的文件,后者确定如果没有匹配的是否返回file的etag信息;
  8.     4.确定了需要操作的file,利用file的iterator,将其绑定response的构造函数参数app_iter,
  9.       并且将file.metadata写入response.heads中,并返回response;
  10.     """
  11.     device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
  12.         
  13.     keep_cache = self.keep_cache_private or (
  14.         'X-Auth-Token' not in request.headers and
  15.         'X-Storage-Token' not in request.headers)
  16.         
  17.     try:
  18.         disk_file = self.get_diskfile(device, partition, account, container, obj)
  19.     except DiskFileDeviceUnavailable:
  20.         return HTTPInsufficientStorage(drive=device, request=request)
  21.         
  22.     try:
  23.         with disk_file.open():
  24.             metadata = disk_file.get_metadata()
  25.             obj_size = int(metadata['Content-Length'])
  26.             file_x_ts = metadata['X-Timestamp']
  27.             file_x_ts_flt = float(file_x_ts)
  28.             file_x_ts_utc = datetime.fromtimestamp(file_x_ts_flt, UTC)
  29.             if_unmodified_since = request.if_unmodified_since
  30.                
  31.             if if_unmodified_since and file_x_ts_utc > if_unmodified_since:
  32.                 return HTTPPreconditionFailed(request=request)
  33.             if_modified_since = request.if_modified_since
  34.             if if_modified_since and file_x_ts_utc
  35. POST
复制代码


/swift/proxy/controllers/obj.py----class ContainerController(Controller)----def POST


  1. def POST(self, req):
  2.     """
  3.     处理HTTP协议POST请求;
  4.     """
  5.     # 计算预计删除对象时间???
  6.     if 'x-delete-after' in req.headers:
  7.         try:
  8.             x_delete_after = int(req.headers['x-delete-after'])
  9.         except ValueError:
  10.             return HTTPBadRequest(request=req,
  11.                                   content_type='text/plain',
  12.                                   body='Non-integer X-Delete-After')
  13.         req.headers['x-delete-at'] = normalize_delete_at_timestamp(time.time() + x_delete_after)
  14.         
  15.     # 在object的实现方法中,系统默认以PUT方法来实现POST方法;
  16.     if self.app.object_post_as_copy:
  17.         req.method = 'PUT'
  18.         req.path_info = '/v1/%s/%s/%s' % (self.account_name, self.container_name, self.object_name)
  19.         req.headers['Content-Length'] = 0
  20.         req.headers['X-Copy-From'] = quote('/%s/%s' % (self.container_name, self.object_name))
  21.         req.headers['X-Fresh-Metadata'] = 'true'
  22.         req.environ['swift_versioned_copy'] = True
  23.         if req.environ.get('QUERY_STRING'):
  24.             req.environ['QUERY_STRING'] += '&multipart-manifest=get'
  25.         else:
  26.             req.environ['QUERY_STRING'] = 'multipart-manifest=get'
  27.         resp = self.PUT(req)
  28.         if resp.status_int != HTTP_CREATED:
  29.             return resp
  30.         return HTTPAccepted(request=req)
  31.         
  32.     else:
  33.         error_response = check_metadata(req, 'object')
  34.         if error_response:
  35.             return error_response
  36.         container_info = self.container_info(
  37.             self.account_name, self.container_name, req)
  38.         container_partition = container_info['partition']
  39.         containers = container_info['nodes']
  40.         req.acl = container_info['write_acl']
  41.         if 'swift.authorize' in req.environ:
  42.             aresp = req.environ['swift.authorize'](req)
  43.             if aresp:
  44.                 return aresp
  45.         if not containers:
  46.             return HTTPNotFound(request=req)
  47.         if 'x-delete-at' in req.headers:
  48.             try:
  49.                 x_delete_at = normalize_delete_at_timestamp(int(req.headers['x-delete-at']))
  50.                 if int(x_delete_at)
复制代码


/swift/obj/server.py----class ContainerController(object)----def POST

  1. def POST(self, request):
  2.     """     
  3.     更新object的元数据信息,流程如下:
  4.     1.从requesturl中提取device,partition, account, container, obj;
  5.       检查requestheads中的'x-timestamp'是否存在,检查mount情况;
  6.     2.根据请求信息新建DiskFile对象file,检查是否存在;
  7.       (包括检查metadata中的'X-Delete-At',调用file#is_deleted()和检查file.data_size)
  8.     3.如果检查都通过,则根据request.heads中的元素更新metadata;
  9.     4.从request.heads中提取'X-Delete-At'并与file.metadata中的相同字段比较;
  10.       根据较新的值调用file#delete_at_update(),通知更新container的信息;
  11.     5.调用file#put()方法将metadata写入到.meta文件和data_file的扩展属性中;
  12.         
  13.     实现更新object的元数据信息;
  14.     并通知object的更新到container;
  15.     """
  16.     # 根据request.path获取device、partition、account、container、obj等参数;
  17.     device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
  18.     if 'x-timestamp' not in request.headers or not check_float(request.headers['x-timestamp']):
  19.         return HTTPBadRequest(body='Missing timestamp', request=request, content_type='text/plain')
  20.     new_delete_at = int(request.headers.get('X-Delete-At') or 0)
  21.     if new_delete_at and new_delete_at = request.headers['x-timestamp']:
  22.         return HTTPConflict(request=request)
  23.         
  24.     metadata = {'X-Timestamp': request.headers['x-timestamp']}
  25.     metadata.update(val for val in request.headers.iteritems() if is_user_meta('object', val[0]))
  26.     for header_key in self.allowed_headers:
  27.        if header_key in request.headers:
  28.             header_caps = header_key.title()
  29.             metadata[header_caps] = request.headers[header_key]
  30.         
  31.     orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
  32.         
  33.     if orig_delete_at != new_delete_at:
  34.         if new_delete_at:
  35.             self.delete_at_update('PUT', new_delete_at, account, container, obj, request, device)
  36.         if orig_delete_at:
  37.             self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device)
  38.     disk_file.write_metadata(metadata)
  39.     return HTTPAccepted(request=request)
复制代码




下面将继续swift-proxy与swift-object的分析工作。





PUT
/swift/proxy/controllers/obj.py----class ContainerController(Controller)----def PUT

  1. def PUT(self, req):
  2.     """
  3.     处理HTTP协议PUT请求;
  4.     """
  5.     ......
  6.     # 容器全局数据;
  7.     container_info = self.container_info(self.account_name, self.container_name, req)
  8.     container_partition = container_info['partition']
  9.     containers = container_info['nodes']
  10.     req.acl = container_info['write_acl']
  11.     req.environ['swift_sync_key'] = container_info['sync_key']
  12.     object_versions = container_info['versions']
  13.      ......
  14.     # 获取对象的分区号和所有副本节点;
  15.     partition, nodes = self.app.object_ring.get_nodes(self.account_name, self.container_name, self.object_name)
  16.      ......
  17.         
  18.     pile = GreenPile(len(nodes))
  19.      ......
  20.     outgoing_headers = self._backend_requests(
  21.         req, len(nodes), container_partition, containers,
  22.         delete_at_container, delete_at_part, delete_at_nodes)
  23.     # _connect_put_node:实现了PUT方法到各个节点的连接和推送;
  24.     for nheaders in outgoing_headers:
  25.         # RFC2616:8.2.3 disallows 100-continue without a body
  26.         if (req.content_length > 0) or chunked:
  27.             nheaders['Expect'] = '100-continue'
  28.         pile.spawn(self._connect_put_node, node_iter, partition,
  29.                    req.swift_entity_path, nheaders,
  30.                    self.app.logger.thread_locals)
  31.     # 获取到各个节点的所有的连接;
  32.     conns = [conn for conn in pile if conn]
  33.      ......
  34.      
  35.     bytes_transferred = 0
  36.     try:
  37.         with ContextPool(len(nodes)) as pool:
  38.             for conn in conns:
  39.                 conn.failed = False
  40.                 conn.queue = Queue(self.app.put_queue_depth)
  41.                 pool.spawn(self._send_file, conn, req.path)
  42.             while True:
  43.                  with ChunkReadTimeout(self.app.client_timeout):
  44.                     try:
  45.                         chunk = next(data_source)
  46.                     except StopIteration:
  47.                         if chunked:
  48.                             for conn in conns:
  49.                                 conn.queue.put('0\r\n\r\n')
  50.                         break
  51.                 bytes_transferred += len(chunk)
  52.                 if bytes_transferred > MAX_FILE_SIZE:
  53.                     return HTTPRequestEntityTooLarge(request=req)
  54.                 for conn in list(conns):
  55.                     if not conn.failed:
  56.                         conn.queue.put('%x\r\n%s\r\n' % (len(chunk), chunk) if chunked else chunk)
  57.                     else:
  58.                         conns.remove(conn)
  59.                 ......
  60.             for conn in conns:
  61.                 if conn.queue.unfinished_tasks:
  62.                     conn.queue.join()
  63.         conns = [conn for conn in conns if not conn.failed]
  64.     ......
  65.         
  66.     # 获取所有连接的响应信息;
  67.     statuses, reasons, bodies, etags = self._get_put_responses(req, conns, nodes)
  68.     ......
  69.     # 根据投票机制,根据现实所有响应信息,实现返回通过投票机制的响应信息;
  70.     resp = self.best_response(req, statuses, reasons, bodies, _('Object PUT'), etag=etag)
  71.      ......
  72.       
  73.     return resp
复制代码




注:上述方法为裁剪后剩余关键部分的代码,各部分具体实现已经在代码注释中标注出来;
来看方法_connect_put_node的实现:

  1. def _connect_put_node(self, nodes, part, path, headers, logger_thread_locals):
  2.     """
  3.     实现PUT方法的推送;
  4.     """
  5.     self.app.logger.thread_locals = logger_thread_locals
  6.     for node in nodes:
  7.         try:
  8.             start_time = time.time()
  9.                
  10.             with ConnectionTimeout(self.app.conn_timeout):
  11.                 conn = http_connect(node['ip'], node['port'], node['device'], part, 'PUT', path, headers)
  12.                
  13.             self.app.set_node_timing(node, time.time() - start_time)
  14.             with Timeout(self.app.node_timeout):
  15.                 resp = conn.getexpect()
  16.             if resp.status == HTTP_CONTINUE:
  17.                 conn.resp = None
  18.                 conn.node = node
  19.                 return conn
  20.             elif is_success(resp.status):
  21.                 conn.resp = resp
  22.                 conn.node = node
  23.                 return conn
  24.             elif headers['If-None-Match'] is not None and resp.status == HTTP_PRECONDITION_FAILED:
  25.                 conn.resp = resp
  26.                 conn.node = node
  27.                 return conn
  28.             elif resp.status == HTTP_INSUFFICIENT_STORAGE:
  29.                 self.app.error_limit(node, _('ERROR Insufficient Storage'))
  30.         except (Exception, Timeout):
  31.             self.app.exception_occurred(node, _('Object'), _('Expect: 100-continue on %s') % path)
复制代码


/swift/obj/server.py----class ContainerController(object)----def PUT

  1. def PUT(self, request):
  2.     """
  3.     新建(上传数据)/更新一个object对象;,流程如下:
  4.     1.通过req的头信息明确content-length长度fsize;
  5.     2.获取对象文件管理类DiskFile的实例化对象;
  6.     3.获取指定对象的元数据信息;
  7.     4.为指定对象文件预分配磁盘空间(大小为fsize);
  8.     5.按照network_chunk_size(65536比特)接收来自network的chunk,并且检查上传文件的大小;
  9.     6.根据request.heads中的值新建/更新指定对象的metadata;
  10.     7.通过file#put方法把更新后的元数据写入磁盘(包括用temp文件改名.data文件和写入metadata);
  11.     8.通过file#unlinkold方法实现删除较早版本object文件;
  12.     9.调用container_update通知container更新信息;
  13.     """
  14.     # 根据request.path获取device、partition、account、container、obj等参数;
  15.     device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
  16.     if 'x-timestamp' not in request.headers or not check_float(request.headers['x-timestamp']):
  17.          return HTTPBadRequest(body='Missing timestamp', request=request, content_type='text/plain')
  18.         
  19.     # 检测确认要建立对象的一切都已经准备好;
  20.     error_response = check_object_creation(request, obj)
  21.         
  22.     if error_response:
  23.         return error_response
  24.         
  25.     new_delete_at = int(request.headers.get('X-Delete-At') or 0)
  26.     if new_delete_at and new_delete_at = request.headers['x-timestamp']:
  27.         return HTTPConflict(request=request)
  28.         
  29.     orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
  30.     upload_expiration = time.time() + self.max_upload_time
  31.     etag = md5()
  32.     elapsed_time = 0
  33.         
  34.     try:
  35.         # create:为文件预分配磁盘空间(大小为size);
  36.         # 按照network_chunk_size接收来自network的chunk,并且检查上传文件的大小;
  37.         # 根据request.heads中的值新建/更新file.metadata;
  38.         # 通过file#put方法把更新后的元数据写入磁盘(包括用temp文件改名.data文件和写入metadata);
  39.             
  40.         # 为文件预分配磁盘空间(大小为size);
  41.         with disk_file.create(size=fsize) as writer:
  42.             upload_size = 0
  43.             def timeout_reader():
  44.                 with ChunkReadTimeout(self.client_timeout):
  45.                     return request.environ['wsgi.input'].read(self.network_chunk_size)
  46.             # 按照network_chunk_size接收来自network的chunk;
  47.             try:
  48.                 for chunk in iter(lambda: timeout_reader(), ''):
  49.                     start_time = time.time()
  50.                     if start_time > upload_expiration:
  51.                         self.logger.increment('PUT.timeouts')
  52.                         return HTTPRequestTimeout(request=request)
  53.                     etag.update(chunk)
  54.                     upload_size = writer.write(chunk)
  55.                     elapsed_time += time.time() - start_time
  56.             except ChunkReadTimeout:
  57.                 return HTTPRequestTimeout(request=request)
  58.             if upload_size:
  59.                 self.logger.transfer_rate('PUT.' + device + '.timing', elapsed_time, upload_size)
  60.                
  61.             # 并且检查上传文件的大小;
  62.             # 如果接收到的文件大小和request.head中声明的一致,并且etag也与heads中的'etag'一致时,说明文件接收成功;
  63.             if fsize is not None and fsize != upload_size:
  64.                 return HTTPClientDisconnect(request=request)
  65.             etag = etag.hexdigest()
  66.             if 'etag' in request.headers and request.headers['etag'].lower() != etag:
  67.                 return HTTPUnprocessableEntity(request=request)
  68.                
  69.             # 根据request.heads中的值新建/更新file.metadata;
  70.             metadata = {
  71.                     'X-Timestamp': request.headers['x-timestamp'],
  72.                     'Content-Type': request.headers['content-type'],
  73.                     'ETag': etag,
  74.                     'Content-Length': str(upload_size),
  75.                 }
  76.             metadata.update(val for val in request.headers.iteritems() if is_user_meta('object', val[0]))
  77.             for header_key in (
  78.                     request.headers.get('X-Backend-Replication-Headers') or
  79.                     self.allowed_headers):
  80.                 if header_key in request.headers:
  81.                     header_caps = header_key.title()
  82.                     metadata[header_caps] = request.headers[header_key]
  83.                
  84.             # 通过file#put方法把更新后的元数据写入磁盘(包括用temp文件改名.data文件和写入metadata);
  85.             writer.put(metadata)
  86.     except DiskFileNoSpace:
  87.         return HTTPInsufficientStorage(drive=device, request=request)
  88.     # 通过file#unlinkold删除较早版本object文件;
  89.     if orig_delete_at != new_delete_at:
  90.         if new_delete_at:
  91.             self.delete_at_update('PUT', new_delete_at, account, container, obj, request, device)
  92.         if orig_delete_at:
  93.             self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device)
  94.     # 调用container_update通知container更新信息;
  95.     self.container_update(
  96.             'PUT', account, container, obj, request,
  97.             HeaderKeyDict({
  98.                 'x-size': metadata['Content-Length'],
  99.                 'x-content-type': metadata['Content-Type'],
  100.                 'x-timestamp': metadata['X-Timestamp'],
  101.                 'x-etag': metadata['ETag']}),
  102.             device)
  103.         
  104.     return HTTPCreated(request=request, etag=etag)
复制代码


DELETE
/swift/proxy/controllers/obj.py----class ContainerController(Controller)----def DELETE

  1. def DELETE(self, req):
  2.     """
  3.     处理HTTP协议DELETE请求;
  4.     """      
  5.     container_info = self.container_info(self.account_name, self.container_name, req)
  6.         
  7.     container_partition = container_info['partition']
  8.     containers = container_info['nodes']
  9.     req.acl = container_info['write_acl']
  10.     req.environ['swift_sync_key'] = container_info['sync_key']
  11.     object_versions = container_info['versions']
  12.         
  13.     if object_versions:
  14.         # this is a version manifest and needs to be handled differently
  15.         object_versions = unquote(object_versions)
  16.         lcontainer = object_versions.split('/')[0]
  17.         prefix_len = '%03x' % len(self.object_name)
  18.         lprefix = prefix_len + self.object_name + '/'
  19.         last_item = None
  20.         try:
  21.             for last_item in self._listing_iter(lcontainer, lprefix, req.environ):
  22.                 pass
  23.         except ListingIterNotFound:
  24.             # no worries, last_item is None
  25.             pass
  26.         except ListingIterNotAuthorized as err:
  27.             return err.aresp
  28.         except ListingIterError:
  29.             return HTTPServerError(request=req)
  30.             
  31.         if last_item:
  32.             # there are older versions so copy the previous version to the
  33.             # current object and delete the previous version
  34.             orig_container = self.container_name
  35.             orig_obj = self.object_name
  36.             self.container_name = lcontainer
  37.             self.object_name = last_item['name'].encode('utf-8')
  38.             copy_path = '/v1/' + self.account_name + '/' + self.container_name + '/' + self.object_name
  39.                
  40.             # 拷贝的目标文件:/self.container_name/self.object_name
  41.             copy_headers = {'X-Newest': 'True', 'Destination': orig_container + '/' + orig_obj}
  42.             copy_environ = {'REQUEST_METHOD': 'COPY', 'swift_versioned_copy': True}
  43.             # 根据给定的参数建立一个新的请求对象;
  44.             creq = Request.blank(copy_path, headers=copy_headers, environ=copy_environ)
  45.             copy_resp = self.COPY(creq)
  46.                
  47.             if is_client_error(copy_resp.status_int):
  48.                 # some user error, maybe permissions
  49.                 return HTTPPreconditionFailed(request=req)
  50.             elif not is_success(copy_resp.status_int):
  51.                 # could not copy the data, bail
  52.                 return HTTPServiceUnavailable(request=req)
  53.                
  54.             # reset these because the COPY changed them
  55.             self.container_name = lcontainer
  56.             self.object_name = last_item['name'].encode('utf-8')
  57.             new_del_req = Request.blank(copy_path, environ=req.environ)
  58.             container_info = self.container_info(self.account_name, self.container_name, req)
  59.             container_partition = container_info['partition']
  60.             containers = container_info['nodes']
  61.             new_del_req.acl = container_info['write_acl']
  62.             new_del_req.path_info = copy_path
  63.             req = new_del_req
  64.             # remove 'X-If-Delete-At', since it is not for the older copy
  65.             if 'X-If-Delete-At' in req.headers:
  66.                 del req.headers['X-If-Delete-At']
  67.     if 'swift.authorize' in req.environ:
  68.         aresp = req.environ['swift.authorize'](req)
  69.         if aresp:
  70.             return aresp
  71.         
  72.     if not containers:
  73.         return HTTPNotFound(request=req)
  74.         
  75.     # 获取指定对象的分区号和所有副本节点;
  76.     partition, nodes = self.app.object_ring.get_nodes(self.account_name, self.container_name, self.object_name)
  77.     # Used by container sync feature
  78.     if 'x-timestamp' in req.headers:
  79.         try:
  80.             req.headers['X-Timestamp'] = normalize_timestamp(req.headers['x-timestamp'])
  81.         except ValueError:
  82.             return HTTPBadRequest(request=req, content_type='text/plain',
  83.                     body='X-Timestamp should be a UNIX timestamp float value; '
  84.                     'was %r' % req.headers['x-timestamp'])
  85.     else:
  86.         req.headers['X-Timestamp'] = normalize_timestamp(time.time())
  87.     headers = self._backend_requests(req, len(nodes), container_partition, containers)
  88.         
  89.     # 发送一个HTTP请求到多个节点,并汇聚所有返回的响应信息;
  90.     # 根据投票机制,根据现实所有响应信息,返回通过投票机制的响应信息(因为是获取多个节点的响应信息);
  91.     resp = self.make_requests(req, self.app.object_ring,
  92.                               partition, 'DELETE', req.swift_entity_path,
  93.                               headers)
  94.         
  95.     return resp
复制代码



/swift/obj/server.py----class ContainerController(object)----def DELETE


  1. def DELETE(self, request):  
  2.     """
  3.     实现删除指定对象文件,并删除较早版本的object文件;
  4.     当object更新后,也要更新container,当object删除后进行更新所属container;
  5.     """  
  6.     device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)  
  7.          
  8.     if 'x-timestamp' not in request.headers or not check_float(request.headers['x-timestamp']):  
  9.         return HTTPBadRequest(body='Missing timestamp', request=request, content_type='text/plain')  
  10.          
  11.     try:  
  12.         disk_file = self.get_diskfile(device, partition, account, container, obj)  
  13.     except DiskFileDeviceUnavailable:  
  14.         return HTTPInsufficientStorage(drive=device, request=request)  
  15.          
  16.     # 获取要删除对象的元数据;  
  17.     try:  
  18.         orig_metadata = disk_file.read_metadata()  
  19.   
  20.     except DiskFileExpired as e:  
  21.         orig_timestamp = e.timestamp  
  22.         orig_metadata = e.metadata  
  23.         response_class = HTTPNotFound  
  24.     except DiskFileDeleted as e:  
  25.         orig_timestamp = e.timestamp  
  26.         orig_metadata = {}  
  27.         response_class = HTTPNotFound  
  28.     except (DiskFileNotExist, DiskFileQuarantined):  
  29.         orig_timestamp = 0  
  30.         orig_metadata = {}  
  31.         response_class = HTTPNotFound  
  32.   
  33.          
  34.     else:  
  35.         orig_timestamp = orig_metadata.get('X-Timestamp', 0)  
  36.         if orig_timestamp < request.headers['x-timestamp']:  
  37.             response_class = HTTPNoContent  
  38.         else:  
  39.             response_class = HTTPConflict  
  40.          
  41.     orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)  
  42.     try:  
  43.         req_if_delete_at_val = request.headers['x-if-delete-at']  
  44.         req_if_delete_at = int(req_if_delete_at_val)  
  45.     except KeyError:  
  46.         pass  
  47.     except ValueError:  
  48.         return HTTPBadRequest(request=request, body='Bad X-If-Delete-At header value')  
  49.     else:  
  50.         if orig_delete_at != req_if_delete_at:  
  51.             return HTTPPreconditionFailed(request=request, body='X-If-Delete-At and X-Delete-At do not match')  
  52.          
  53.     # 当更新object的时候,更新到期的对象所属container;  
  54.     # 经过分析代码,在方法delete_at_update中没有执行任何操作;  
  55.     if orig_delete_at:  
  56.         self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device)  
  57.     req_timestamp = request.headers['X-Timestamp']  
  58.          
  59.     if orig_timestamp < req_timestamp:  
  60.         # 实现删除比给定时间戳旧的任何对象文件;  
  61.         disk_file.delete(req_timestamp)  
  62.         # 当object更新后,也要更新container,用于当object更新时更新所属container;  
  63.         self.container_update(  
  64.             'DELETE', account, container, obj, request,  
  65.             HeaderKeyDict({'x-timestamp': req_timestamp}),  
  66.             device)  
  67.     return response_class(request=request)
复制代码

语句disk_file.delete(req_timestamp)实现了删除指定对象,并实现了删除比给定时间戳旧的任何对象文件;
来看方法disk_file.delete()的实现:

  1. class DiskFile(object)----def delete
  2. def delete(self, timestamp):
  3.     """
  4.     实现删除比给定时间戳旧的任何对象文件,并实现更新时间戳;
  5.     """
  6.     timestamp = normalize_timestamp(timestamp)
  7.     with self.create() as deleter:
  8.         deleter._extension = '.ts'
  9.         deleter.put({'X-Timestamp': timestamp})
复制代码




注:具体实现通过分析方法create()和方法put()就可得知和理解,这里不多说了。
再来看方法container_update的实现:
  1. def container_update(self, op, account, container, obj, request, headers_out, objdevice):
  2.     """
  3.     当object更新后,也要更新container;      
  4.     通过头文件获取所有要实现更新container所属的device和host;
  5.     通过循环遍历实现发送HTTP请求至所属container,更新container的数据;
  6.     """
  7.     headers_in = request.headers
  8.     # 从原始请求的头部信息中获取container的相应的host信息;
  9.     conthosts = [h.strip() for h in
  10.                  headers_in.get('X-Container-Host', '').split(',')]
  11.     # 从原始请求的头部信息中获取container的相应的device信息;
  12.     contdevices = [d.strip() for d in
  13.                    headers_in.get('X-Container-Device', '').split(',')]
  14.     # 从原始请求的头部信息中获取container的相应的partition信息;
  15.     contpartition = headers_in.get('X-Container-Partition', '')
  16.     # 如果要改变信息的container所对应的host数目和device数目不同,引发错误声明并返回;
  17.     if len(conthosts) != len(contdevices):
  18.         # This shouldn't happen unless there's a bug in the proxy,
  19.         # but if there is, we want to know about it.
  20.         self.logger.error(_('ERROR Container update failed: different  '
  21.                             'numbers of hosts and devices in request: '
  22.                             '"%s" vs "%s"') %
  23.                            (headers_in.get('X-Container-Host', ''),
  24.                             headers_in.get('X-Container-Device', '')))
  25.         return
  26.     if contpartition:
  27.         updates = zip(conthosts, contdevices)
  28.     else:
  29.         updates = []
  30.     headers_out['x-trans-id'] = headers_in.get('x-trans-id', '-')
  31.     headers_out['referer'] = request.as_referer()
  32.         
  33.     # 遍历所有要改变信息container中相对应的host和device,发送更新container信息的请求要求到相应的目标之上;
  34.     # 调用方法async_update来具体实现发送HTTP请求至所属container,更新container的数据;
  35.     for conthost, contdevice in updates:
  36.         self.async_update(op, account, container, obj, conthost,
  37.                           contpartition, contdevice, headers_out,
  38.                           objdevice)
复制代码
  1. def async_update(self, op, account, container, obj, host, partition, contdevice, headers_out, objdevice):
  2.     """
  3.     发送或者保存一个异步更新;
  4.     用于当object发生变化时,发送HTTP请求至所属container,更新container的数据;
  5.     如果请求失败,则将更新序列化写入async_dir的dest文件中,具体路径如下:
  6.     ASYNCDIR='async_pending'
  7.     async_dir=self.devices/objdevice/
  8.     hash_path=hash(account,container, obj)
  9.     dest=/[-3:]/-
  10.     """
  11.     headers_out['user-agent'] = 'obj-server %s' % os.getpid()
  12.     # 完整路径;
  13.     full_path = '/%s/%s/%s' % (account, container, obj)
  14.     if all([host, partition, contdevice]):
  15.         try:
  16.             # 在conn_timeout时间内执行with之下的连接操作,否则引发异常;
  17.             with ConnectionTimeout(self.conn_timeout):
  18.                 # 从host中解析出来ip和port的值;
  19.                 ip, port = host.rsplit(':', 1)
  20.                 # 建立一个HTTPConnection类的对象;
  21.                 # 返回HTTPConnection连接对象;
  22.                 conn = http_connect(ip, port, contdevice, partition, op, full_path, headers_out)
  23.             # 在node_timeout时间内执行with之下的操作,否则引发异常;
  24.             with Timeout(self.node_timeout):
  25.                 # 获取来自所属container服务器的响应;
  26.                 response = conn.getresponse()
  27.                 response.read()
  28.                 # 根据连接状态判断连接是否成功,成功则直接返回;
  29.                 if is_success(response.status):
  30.                     return
  31.                 else:
  32.                     self.logger.error(_(
  33.                         'ERROR Container update failed '
  34.                         '(saving for async update later): %(status)d '
  35.                         'response from %(ip)s:%(port)s/%(dev)s'),
  36.                         {'status': response.status, 'ip': ip, 'port': port,
  37.                          'dev': contdevice})
  38.         except (Exception, Timeout):
  39.             self.logger.exception(_(
  40.                 'ERROR container update failed with '
  41.                 '%(ip)s:%(port)s/%(dev)s (saving for async update later)'),
  42.                 {'ip': ip, 'port': port, 'dev': contdevice})
  43.     data = {'op': op, 'account': account, 'container': container, 'obj': obj, 'headers': headers_out}
  44.     timestamp = headers_out['x-timestamp']
  45.     self._diskfile_mgr.pickle_async_update(objdevice, account, container, obj, data, timestamp)
复制代码

  1. def pickle_async_update(self, device, account, container, obj, data, timestamp):
  2.     device_path = self.construct_dev_path(device)
  3.     # 如果请求失败,则将更新序列化写入async_dir的dest文件中,具体路径如下:
  4.     # ASYNCDIR='async_pending'
  5.     # async_dir=self.devices/objdevice/
  6.     # hash_path=hash(account,container, obj)
  7.     # dest=/[-3:]/-
  8.     async_dir = os.path.join(device_path, ASYNCDIR)
  9.     # hash_path:根据情况获取account/container/object的哈希值,这里当然是获取object的哈希值啦;
  10.     ohash = hash_path(account, container, obj)
  11.     # 确保pickle文件写入到磁盘;
  12.     # 先写道临时位置,确保它同步到磁盘,然后移动到磁盘上最终的位置;
  13.     self.threadpools[device].run_in_thread(
  14.         write_pickle,
  15.         data,
  16.         os.path.join(async_dir, ohash[-3:], ohash + '-' + normalize_timestamp(timestamp)),
  17.         os.path.join(device_path, 'tmp'))
  18.     self.logger.increment('async_pendings')
复制代码
  1. def write_pickle(obj, dest, tmp=None, pickle_protocol=0):
  2. """
  3. 确保pickle文件写入到磁盘;
  4. 先写道临时位置,确保它同步到磁盘,然后移动到磁盘上最终的位置;
  5. """
  6. if tmp is None:
  7. tmp = os.path.dirname(dest)
  8. fd, tmppath = mkstemp(dir=tmp, suffix='.tmp')
  9. with os.fdopen(fd, 'wb') as fo:
  10. pickle.dump(obj, fo, pickle_protocol)
  11. fo.flush()
  12. os.fsync(fd)
  13. renamer(tmppath, dest)
复制代码









博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条