/swift/proxy/controllers/obj.py----class ContainerController(Controller)----def GETorHEAD
- def GETorHEAD(self, req):
- """
- 处理HTTP协议GET或者HEAD请求;
- """
- # 获取指定object所属的container的信息;
- container_info = self.container_info(self.account_name, self.container_name, req)
- req.acl = container_info['read_acl']
- if 'swift.authorize' in req.environ:
- aresp = req.environ['swift.authorize'](req)
- if aresp:
- return aresp
- # 获取指定object所对应的分区号;
- partition = self.app.object_ring.get_part(self.account_name, self.container_name, self.object_name)
- resp = self.GETorHEAD_base(
- req, _('Object'), self.app.object_ring, partition,
- req.swift_entity_path)
- if ';' in resp.headers.get('content-type', ''):
- # strip off swift_bytes from content-type
- content_type, check_extra_meta = resp.headers['content-type'].rsplit(';', 1)
- if check_extra_meta.lstrip().startswith('swift_bytes='):
- resp.content_type = content_type
- return resp
/swift/obj/server.py----class ContainerController(object)----def HEAD
- def HEAD(self, request):
- """
- 检索返回一个object的metadata,同GET请求的处理方法几乎一致,唯一不同的是不在body中返回file;
- """
- device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
- try:
- disk_file = self.get_diskfile(device, partition, account, container, obj)
- except DiskFileDeviceUnavailable:
- return HTTPInsufficientStorage(drive=device, request=request)
- try:
- metadata = disk_file.read_metadata()
- except (DiskFileNotExist, DiskFileQuarantined):
- return HTTPNotFound(request=request, conditional_response=True)
- response = Response(request=request, conditional_response=True)
- response.headers['Content-Type'] = metadata.get('Content-Type', 'application/octet-stream')
- for key, value in metadata.iteritems():
- if is_user_meta('object', key) or key.lower() in self.allowed_headers:
- response.headers[key] = value
- response.etag = metadata['ETag']
- ts = metadata['X-Timestamp']
- response.last_modified = math.ceil(float(ts))
- # Needed for container sync feature
- response.headers['X-Timestamp'] = ts
- response.content_length = int(metadata['Content-Length'])
- try:
- response.content_encoding = metadata['Content-Encoding']
- except KeyError:
- pass
- return response
/swift/obj/server.py----class ContainerController(object)----def GET
- def GET(self, request):
- """
- 检索一个object对象,在response.heads中返回metadata,在response.body中返回objectdata,流程如下:
- 1.根据url中的信息新建DiskFile对象file,检查request.heads中的必要K-V,检查mount情况;
- 2.如果file#is_deleted或者file.metadata中'X-Delete-At'小于当前时间(表示已标记为准备删除)
- 或者通过file#get_data_file_size查看文件是否异常,如果已经删除或存在异常,返回404HTTPNotFound;
- 3.检查request.heads里的'If-match'和'If-none-match',前者检查file.metadata中的'ETag'是否与其一致确定所检索的文件,后者确定如果没有匹配的是否返回file的etag信息;
- 4.确定了需要操作的file,利用file的iterator,将其绑定response的构造函数参数app_iter,
- 并且将file.metadata写入response.heads中,并返回response;
- """
- device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
- keep_cache = self.keep_cache_private or (
- 'X-Auth-Token' not in request.headers and
- 'X-Storage-Token' not in request.headers)
- try:
- disk_file = self.get_diskfile(device, partition, account, container, obj)
- except DiskFileDeviceUnavailable:
- return HTTPInsufficientStorage(drive=device, request=request)
- try:
- with disk_file.open():
- metadata = disk_file.get_metadata()
- obj_size = int(metadata['Content-Length'])
- file_x_ts = metadata['X-Timestamp']
- file_x_ts_flt = float(file_x_ts)
- file_x_ts_utc = datetime.fromtimestamp(file_x_ts_flt, UTC)
- if_unmodified_since = request.if_unmodified_since
- if if_unmodified_since and file_x_ts_utc > if_unmodified_since:
- return HTTPPreconditionFailed(request=request)
- if_modified_since = request.if_modified_since
- if if_modified_since and file_x_ts_utc
/swift/proxy/controllers/obj.py----class ContainerController(Controller)----def POST
- def POST(self, req):
- """
- 处理HTTP协议POST请求;
- """
- # 计算预计删除对象时间???
- if 'x-delete-after' in req.headers:
- try:
- x_delete_after = int(req.headers['x-delete-after'])
- except ValueError:
- return HTTPBadRequest(request=req,
- content_type='text/plain',
- body='Non-integer X-Delete-After')
- req.headers['x-delete-at'] = normalize_delete_at_timestamp(time.time() + x_delete_after)
- # 在object的实现方法中,系统默认以PUT方法来实现POST方法;
- if self.app.object_post_as_copy:
- req.method = 'PUT'
- req.path_info = '/v1/%s/%s/%s' % (self.account_name, self.container_name, self.object_name)
- req.headers['Content-Length'] = 0
- req.headers['X-Copy-From'] = quote('/%s/%s' % (self.container_name, self.object_name))
- req.headers['X-Fresh-Metadata'] = 'true'
- req.environ['swift_versioned_copy'] = True
- if req.environ.get('QUERY_STRING'):
- req.environ['QUERY_STRING'] += '&multipart-manifest=get'
- else:
- req.environ['QUERY_STRING'] = 'multipart-manifest=get'
- resp = self.PUT(req)
- if resp.status_int != HTTP_CREATED:
- return resp
- return HTTPAccepted(request=req)
- else:
- error_response = check_metadata(req, 'object')
- if error_response:
- return error_response
- container_info = self.container_info(
- self.account_name, self.container_name, req)
- container_partition = container_info['partition']
- containers = container_info['nodes']
- req.acl = container_info['write_acl']
- if 'swift.authorize' in req.environ:
- aresp = req.environ['swift.authorize'](req)
- if aresp:
- return aresp
- if not containers:
- return HTTPNotFound(request=req)
- if 'x-delete-at' in req.headers:
- try:
- x_delete_at = normalize_delete_at_timestamp(int(req.headers['x-delete-at']))
- if int(x_delete_at)
/swift/obj/server.py----class ContainerController(object)----def POST
- def POST(self, request):
- """
- 更新object的元数据信息,流程如下:
- 1.从requesturl中提取device,partition, account, container, obj;
- 检查requestheads中的'x-timestamp'是否存在,检查mount情况;
- 2.根据请求信息新建DiskFile对象file,检查是否存在;
- (包括检查metadata中的'X-Delete-At',调用file#is_deleted()和检查file.data_size)
- 3.如果检查都通过,则根据request.heads中的元素更新metadata;
- 4.从request.heads中提取'X-Delete-At'并与file.metadata中的相同字段比较;
- 根据较新的值调用file#delete_at_update(),通知更新container的信息;
- 5.调用file#put()方法将metadata写入到.meta文件和data_file的扩展属性中;
- 实现更新object的元数据信息;
- 并通知object的更新到container;
- """
- # 根据request.path获取device、partition、account、container、obj等参数;
- device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
- if 'x-timestamp' not in request.headers or not check_float(request.headers['x-timestamp']):
- return HTTPBadRequest(body='Missing timestamp', request=request, content_type='text/plain')
- new_delete_at = int(request.headers.get('X-Delete-At') or 0)
- if new_delete_at and new_delete_at = request.headers['x-timestamp']:
- return HTTPConflict(request=request)
- metadata = {'X-Timestamp': request.headers['x-timestamp']}
- metadata.update(val for val in request.headers.iteritems() if is_user_meta('object', val[0]))
- for header_key in self.allowed_headers:
- if header_key in request.headers:
- header_caps = header_key.title()
- metadata[header_caps] = request.headers[header_key]
- orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
- if orig_delete_at != new_delete_at:
- if new_delete_at:
- self.delete_at_update('PUT', new_delete_at, account, container, obj, request, device)
- if orig_delete_at:
- self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device)
- disk_file.write_metadata(metadata)
- return HTTPAccepted(request=request)
/swift/proxy/controllers/obj.py----class ContainerController(Controller)----def PUT
- def PUT(self, req):
- """
- 处理HTTP协议PUT请求;
- """
- ......
- # 容器全局数据;
- container_info = self.container_info(self.account_name, self.container_name, req)
- container_partition = container_info['partition']
- containers = container_info['nodes']
- req.acl = container_info['write_acl']
- req.environ['swift_sync_key'] = container_info['sync_key']
- object_versions = container_info['versions']
- ......
- # 获取对象的分区号和所有副本节点;
- partition, nodes = self.app.object_ring.get_nodes(self.account_name, self.container_name, self.object_name)
- ......
- pile = GreenPile(len(nodes))
- ......
- outgoing_headers = self._backend_requests(
- req, len(nodes), container_partition, containers,
- delete_at_container, delete_at_part, delete_at_nodes)
- # _connect_put_node:实现了PUT方法到各个节点的连接和推送;
- for nheaders in outgoing_headers:
- # RFC2616:8.2.3 disallows 100-continue without a body
- if (req.content_length > 0) or chunked:
- nheaders['Expect'] = '100-continue'
- pile.spawn(self._connect_put_node, node_iter, partition,
- req.swift_entity_path, nheaders,
- self.app.logger.thread_locals)
- # 获取到各个节点的所有的连接;
- conns = [conn for conn in pile if conn]
- ......
- bytes_transferred = 0
- try:
- with ContextPool(len(nodes)) as pool:
- for conn in conns:
- conn.failed = False
- conn.queue = Queue(self.app.put_queue_depth)
- pool.spawn(self._send_file, conn, req.path)
- while True:
- with ChunkReadTimeout(self.app.client_timeout):
- try:
- chunk = next(data_source)
- except StopIteration:
- if chunked:
- for conn in conns:
- conn.queue.put('0\r\n\r\n')
- break
- bytes_transferred += len(chunk)
- if bytes_transferred > MAX_FILE_SIZE:
- return HTTPRequestEntityTooLarge(request=req)
- for conn in list(conns):
- if not conn.failed:
- conn.queue.put('%x\r\n%s\r\n' % (len(chunk), chunk) if chunked else chunk)
- else:
- conns.remove(conn)
- ......
- for conn in conns:
- if conn.queue.unfinished_tasks:
- conn.queue.join()
- conns = [conn for conn in conns if not conn.failed]
- ......
- # 获取所有连接的响应信息;
- statuses, reasons, bodies, etags = self._get_put_responses(req, conns, nodes)
- ......
- # 根据投票机制,根据现实所有响应信息,实现返回通过投票机制的响应信息;
- resp = self.best_response(req, statuses, reasons, bodies, _('Object PUT'), etag=etag)
- ......
- return resp
- def _connect_put_node(self, nodes, part, path, headers, logger_thread_locals):
- """
- 实现PUT方法的推送;
- """
- self.app.logger.thread_locals = logger_thread_locals
- for node in nodes:
- try:
- start_time = time.time()
- with ConnectionTimeout(self.app.conn_timeout):
- conn = http_connect(node['ip'], node['port'], node['device'], part, 'PUT', path, headers)
- self.app.set_node_timing(node, time.time() - start_time)
- with Timeout(self.app.node_timeout):
- resp = conn.getexpect()
- if resp.status == HTTP_CONTINUE:
- conn.resp = None
- conn.node = node
- return conn
- elif is_success(resp.status):
- conn.resp = resp
- conn.node = node
- return conn
- elif headers['If-None-Match'] is not None and resp.status == HTTP_PRECONDITION_FAILED:
- conn.resp = resp
- conn.node = node
- return conn
- elif resp.status == HTTP_INSUFFICIENT_STORAGE:
- self.app.error_limit(node, _('ERROR Insufficient Storage'))
- except (Exception, Timeout):
- self.app.exception_occurred(node, _('Object'), _('Expect: 100-continue on %s') % path)
/swift/obj/server.py----class ContainerController(object)----def PUT
- def PUT(self, request):
- """
- 新建(上传数据)/更新一个object对象;,流程如下:
- 1.通过req的头信息明确content-length长度fsize;
- 2.获取对象文件管理类DiskFile的实例化对象;
- 3.获取指定对象的元数据信息;
- 4.为指定对象文件预分配磁盘空间(大小为fsize);
- 5.按照network_chunk_size(65536比特)接收来自network的chunk,并且检查上传文件的大小;
- 6.根据request.heads中的值新建/更新指定对象的metadata;
- 7.通过file#put方法把更新后的元数据写入磁盘(包括用temp文件改名.data文件和写入metadata);
- 8.通过file#unlinkold方法实现删除较早版本object文件;
- 9.调用container_update通知container更新信息;
- """
- # 根据request.path获取device、partition、account、container、obj等参数;
- device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
- if 'x-timestamp' not in request.headers or not check_float(request.headers['x-timestamp']):
- return HTTPBadRequest(body='Missing timestamp', request=request, content_type='text/plain')
- # 检测确认要建立对象的一切都已经准备好;
- error_response = check_object_creation(request, obj)
- if error_response:
- return error_response
- new_delete_at = int(request.headers.get('X-Delete-At') or 0)
- if new_delete_at and new_delete_at = request.headers['x-timestamp']:
- return HTTPConflict(request=request)
- orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
- upload_expiration = time.time() + self.max_upload_time
- etag = md5()
- elapsed_time = 0
- try:
- # create:为文件预分配磁盘空间(大小为size);
- # 按照network_chunk_size接收来自network的chunk,并且检查上传文件的大小;
- # 根据request.heads中的值新建/更新file.metadata;
- # 通过file#put方法把更新后的元数据写入磁盘(包括用temp文件改名.data文件和写入metadata);
- # 为文件预分配磁盘空间(大小为size);
- with disk_file.create(size=fsize) as writer:
- upload_size = 0
- def timeout_reader():
- with ChunkReadTimeout(self.client_timeout):
- return request.environ['wsgi.input'].read(self.network_chunk_size)
- # 按照network_chunk_size接收来自network的chunk;
- try:
- for chunk in iter(lambda: timeout_reader(), ''):
- start_time = time.time()
- if start_time > upload_expiration:
- self.logger.increment('PUT.timeouts')
- return HTTPRequestTimeout(request=request)
- etag.update(chunk)
- upload_size = writer.write(chunk)
- elapsed_time += time.time() - start_time
- except ChunkReadTimeout:
- return HTTPRequestTimeout(request=request)
- if upload_size:
- self.logger.transfer_rate('PUT.' + device + '.timing', elapsed_time, upload_size)
- # 并且检查上传文件的大小;
- # 如果接收到的文件大小和request.head中声明的一致,并且etag也与heads中的'etag'一致时,说明文件接收成功;
- if fsize is not None and fsize != upload_size:
- return HTTPClientDisconnect(request=request)
- etag = etag.hexdigest()
- if 'etag' in request.headers and request.headers['etag'].lower() != etag:
- return HTTPUnprocessableEntity(request=request)
- # 根据request.heads中的值新建/更新file.metadata;
- metadata = {
- 'X-Timestamp': request.headers['x-timestamp'],
- 'Content-Type': request.headers['content-type'],
- 'ETag': etag,
- 'Content-Length': str(upload_size),
- }
- metadata.update(val for val in request.headers.iteritems() if is_user_meta('object', val[0]))
- for header_key in (
- request.headers.get('X-Backend-Replication-Headers') or
- self.allowed_headers):
- if header_key in request.headers:
- header_caps = header_key.title()
- metadata[header_caps] = request.headers[header_key]
- # 通过file#put方法把更新后的元数据写入磁盘(包括用temp文件改名.data文件和写入metadata);
- writer.put(metadata)
- except DiskFileNoSpace:
- return HTTPInsufficientStorage(drive=device, request=request)
- # 通过file#unlinkold删除较早版本object文件;
- if orig_delete_at != new_delete_at:
- if new_delete_at:
- self.delete_at_update('PUT', new_delete_at, account, container, obj, request, device)
- if orig_delete_at:
- self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device)
- # 调用container_update通知container更新信息;
- self.container_update(
- 'PUT', account, container, obj, request,
- HeaderKeyDict({
- 'x-size': metadata['Content-Length'],
- 'x-content-type': metadata['Content-Type'],
- 'x-timestamp': metadata['X-Timestamp'],
- 'x-etag': metadata['ETag']}),
- device)
- return HTTPCreated(request=request, etag=etag)
/swift/proxy/controllers/obj.py----class ContainerController(Controller)----def DELETE
- def DELETE(self, req):
- """
- """
- container_info = self.container_info(self.account_name, self.container_name, req)
- container_partition = container_info['partition']
- containers = container_info['nodes']
- req.acl = container_info['write_acl']
- req.environ['swift_sync_key'] = container_info['sync_key']
- object_versions = container_info['versions']
- if object_versions:
- # this is a version manifest and needs to be handled differently
- object_versions = unquote(object_versions)
- lcontainer = object_versions.split('/')[0]
- prefix_len = '%03x' % len(self.object_name)
- lprefix = prefix_len + self.object_name + '/'
- last_item = None
- try:
- for last_item in self._listing_iter(lcontainer, lprefix, req.environ):
- pass
- except ListingIterNotFound:
- # no worries, last_item is None
- pass
- except ListingIterNotAuthorized as err:
- return err.aresp
- except ListingIterError:
- return HTTPServerError(request=req)
- if last_item:
- # there are older versions so copy the previous version to the
- # current object and delete the previous version
- orig_container = self.container_name
- orig_obj = self.object_name
- self.container_name = lcontainer
- self.object_name = last_item['name'].encode('utf-8')
- copy_path = '/v1/' + self.account_name + '/' + self.container_name + '/' + self.object_name
- # 拷贝的目标文件:/self.container_name/self.object_name
- copy_headers = {'X-Newest': 'True', 'Destination': orig_container + '/' + orig_obj}
- copy_environ = {'REQUEST_METHOD': 'COPY', 'swift_versioned_copy': True}
- # 根据给定的参数建立一个新的请求对象;
- creq = Request.blank(copy_path, headers=copy_headers, environ=copy_environ)
- copy_resp = self.COPY(creq)
- if is_client_error(copy_resp.status_int):
- # some user error, maybe permissions
- return HTTPPreconditionFailed(request=req)
- elif not is_success(copy_resp.status_int):
- # could not copy the data, bail
- return HTTPServiceUnavailable(request=req)
- # reset these because the COPY changed them
- self.container_name = lcontainer
- self.object_name = last_item['name'].encode('utf-8')
- new_del_req = Request.blank(copy_path, environ=req.environ)
- container_info = self.container_info(self.account_name, self.container_name, req)
- container_partition = container_info['partition']
- containers = container_info['nodes']
- new_del_req.acl = container_info['write_acl']
- new_del_req.path_info = copy_path
- req = new_del_req
- # remove 'X-If-Delete-At', since it is not for the older copy
- if 'X-If-Delete-At' in req.headers:
- del req.headers['X-If-Delete-At']
- if 'swift.authorize' in req.environ:
- aresp = req.environ['swift.authorize'](req)
- if aresp:
- return aresp
- if not containers:
- return HTTPNotFound(request=req)
- # 获取指定对象的分区号和所有副本节点;
- partition, nodes = self.app.object_ring.get_nodes(self.account_name, self.container_name, self.object_name)
- # Used by container sync feature
- if 'x-timestamp' in req.headers:
- try:
- req.headers['X-Timestamp'] = normalize_timestamp(req.headers['x-timestamp'])
- except ValueError:
- return HTTPBadRequest(request=req, content_type='text/plain',
- body='X-Timestamp should be a UNIX timestamp float value; '
- 'was %r' % req.headers['x-timestamp'])
- else:
- req.headers['X-Timestamp'] = normalize_timestamp(time.time())
- headers = self._backend_requests(req, len(nodes), container_partition, containers)
- # 发送一个HTTP请求到多个节点,并汇聚所有返回的响应信息;
- # 根据投票机制,根据现实所有响应信息,返回通过投票机制的响应信息(因为是获取多个节点的响应信息);
- resp = self.make_requests(req, self.app.object_ring,
- partition, 'DELETE', req.swift_entity_path,
- headers)
- return resp
/swift/obj/server.py----class ContainerController(object)----def DELETE
- def DELETE(self, request):
- """
- 实现删除指定对象文件,并删除较早版本的object文件;
- 当object更新后,也要更新container,当object删除后进行更新所属container;
- """
- device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
- if 'x-timestamp' not in request.headers or not check_float(request.headers['x-timestamp']):
- return HTTPBadRequest(body='Missing timestamp', request=request, content_type='text/plain')
- try:
- disk_file = self.get_diskfile(device, partition, account, container, obj)
- except DiskFileDeviceUnavailable:
- return HTTPInsufficientStorage(drive=device, request=request)
- # 获取要删除对象的元数据;
- try:
- orig_metadata = disk_file.read_metadata()
- except DiskFileExpired as e:
- orig_timestamp = e.timestamp
- orig_metadata = e.metadata
- response_class = HTTPNotFound
- except DiskFileDeleted as e:
- orig_timestamp = e.timestamp
- orig_metadata = {}
- response_class = HTTPNotFound
- except (DiskFileNotExist, DiskFileQuarantined):
- orig_timestamp = 0
- orig_metadata = {}
- response_class = HTTPNotFound
- else:
- orig_timestamp = orig_metadata.get('X-Timestamp', 0)
- if orig_timestamp < request.headers['x-timestamp']:
- response_class = HTTPNoContent
- else:
- response_class = HTTPConflict
- orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
- try:
- req_if_delete_at_val = request.headers['x-if-delete-at']
- req_if_delete_at = int(req_if_delete_at_val)
- except KeyError:
- pass
- except ValueError:
- return HTTPBadRequest(request=request, body='Bad X-If-Delete-At header value')
- else:
- if orig_delete_at != req_if_delete_at:
- return HTTPPreconditionFailed(request=request, body='X-If-Delete-At and X-Delete-At do not match')
- # 当更新object的时候,更新到期的对象所属container;
- # 经过分析代码,在方法delete_at_update中没有执行任何操作;
- if orig_delete_at:
- self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device)
- req_timestamp = request.headers['X-Timestamp']
- if orig_timestamp < req_timestamp:
- # 实现删除比给定时间戳旧的任何对象文件;
- disk_file.delete(req_timestamp)
- # 当object更新后,也要更新container,用于当object更新时更新所属container;
- self.container_update(
- 'DELETE', account, container, obj, request,
- HeaderKeyDict({'x-timestamp': req_timestamp}),
- device)
- return response_class(request=request)
- class DiskFile(object)----def delete
- def delete(self, timestamp):
- """
- 实现删除比给定时间戳旧的任何对象文件,并实现更新时间戳;
- """
- timestamp = normalize_timestamp(timestamp)
- with self.create() as deleter:
- deleter._extension = '.ts'
- deleter.put({'X-Timestamp': timestamp})
- def container_update(self, op, account, container, obj, request, headers_out, objdevice):
- """
- 当object更新后,也要更新container;
- 通过头文件获取所有要实现更新container所属的device和host;
- 通过循环遍历实现发送HTTP请求至所属container,更新container的数据;
- """
- headers_in = request.headers
- # 从原始请求的头部信息中获取container的相应的host信息;
- conthosts = [h.strip() for h in
- headers_in.get('X-Container-Host', '').split(',')]
- # 从原始请求的头部信息中获取container的相应的device信息;
- contdevices = [d.strip() for d in
- headers_in.get('X-Container-Device', '').split(',')]
- # 从原始请求的头部信息中获取container的相应的partition信息;
- contpartition = headers_in.get('X-Container-Partition', '')
- # 如果要改变信息的container所对应的host数目和device数目不同,引发错误声明并返回;
- if len(conthosts) != len(contdevices):
- # This shouldn't happen unless there's a bug in the proxy,
- # but if there is, we want to know about it.
- self.logger.error(_('ERROR Container update failed: different '
- 'numbers of hosts and devices in request: '
- '"%s" vs "%s"') %
- (headers_in.get('X-Container-Host', ''),
- headers_in.get('X-Container-Device', '')))
- return
- if contpartition:
- updates = zip(conthosts, contdevices)
- else:
- updates = []
- headers_out['x-trans-id'] = headers_in.get('x-trans-id', '-')
- headers_out['referer'] = request.as_referer()
- # 遍历所有要改变信息container中相对应的host和device,发送更新container信息的请求要求到相应的目标之上;
- # 调用方法async_update来具体实现发送HTTP请求至所属container,更新container的数据;
- for conthost, contdevice in updates:
- self.async_update(op, account, container, obj, conthost,
- contpartition, contdevice, headers_out,
- objdevice)
- def async_update(self, op, account, container, obj, host, partition, contdevice, headers_out, objdevice):
- """
- 发送或者保存一个异步更新;
- 用于当object发生变化时,发送HTTP请求至所属container,更新container的数据;
- 如果请求失败,则将更新序列化写入async_dir的dest文件中,具体路径如下:
- ASYNCDIR='async_pending'
- async_dir=self.devices/objdevice/
- hash_path=hash(account,container, obj)
- dest=/[-3:]/-
- """
- headers_out['user-agent'] = 'obj-server %s' % os.getpid()
- # 完整路径;
- full_path = '/%s/%s/%s' % (account, container, obj)
- if all([host, partition, contdevice]):
- try:
- # 在conn_timeout时间内执行with之下的连接操作,否则引发异常;
- with ConnectionTimeout(self.conn_timeout):
- # 从host中解析出来ip和port的值;
- ip, port = host.rsplit(':', 1)
- # 建立一个HTTPConnection类的对象;
- # 返回HTTPConnection连接对象;
- conn = http_connect(ip, port, contdevice, partition, op, full_path, headers_out)
- # 在node_timeout时间内执行with之下的操作,否则引发异常;
- with Timeout(self.node_timeout):
- # 获取来自所属container服务器的响应;
- response = conn.getresponse()
- response.read()
- # 根据连接状态判断连接是否成功,成功则直接返回;
- if is_success(response.status):
- return
- else:
- self.logger.error(_(
- 'ERROR Container update failed '
- '(saving for async update later): %(status)d '
- 'response from %(ip)s:%(port)s/%(dev)s'),
- {'status': response.status, 'ip': ip, 'port': port,
- 'dev': contdevice})
- except (Exception, Timeout):
- self.logger.exception(_(
- 'ERROR container update failed with '
- '%(ip)s:%(port)s/%(dev)s (saving for async update later)'),
- {'ip': ip, 'port': port, 'dev': contdevice})
- data = {'op': op, 'account': account, 'container': container, 'obj': obj, 'headers': headers_out}
- timestamp = headers_out['x-timestamp']
- self._diskfile_mgr.pickle_async_update(objdevice, account, container, obj, data, timestamp)
- def pickle_async_update(self, device, account, container, obj, data, timestamp):
- device_path = self.construct_dev_path(device)
- # 如果请求失败,则将更新序列化写入async_dir的dest文件中,具体路径如下:
- # ASYNCDIR='async_pending'
- # async_dir=self.devices/objdevice/
- # hash_path=hash(account,container, obj)
- # dest=/[-3:]/-
- async_dir = os.path.join(device_path, ASYNCDIR)
- # hash_path:根据情况获取account/container/object的哈希值,这里当然是获取object的哈希值啦;
- ohash = hash_path(account, container, obj)
- # 确保pickle文件写入到磁盘;
- # 先写道临时位置,确保它同步到磁盘,然后移动到磁盘上最终的位置;
- self.threadpools[device].run_in_thread(
- write_pickle,
- data,
- os.path.join(async_dir, ohash[-3:], ohash + '-' + normalize_timestamp(timestamp)),
- os.path.join(device_path, 'tmp'))
- self.logger.increment('async_pendings')
- def write_pickle(obj, dest, tmp=None, pickle_protocol=0):
- """
- 确保pickle文件写入到磁盘;
- 先写道临时位置,确保它同步到磁盘,然后移动到磁盘上最终的位置;
- """
- if tmp is None:
- tmp = os.path.dirname(dest)
- fd, tmppath = mkstemp(dir=tmp, suffix='.tmp')
- with os.fdopen(fd, 'wb') as fo:
- pickle.dump(obj, fo, pickle_protocol)
- fo.flush()
- os.fsync(fd)
- renamer(tmppath, dest)