问题导读
1. _get_account_broker功能是什么?
2./swift/proxy/controllers/account.py作用是什么?
概述:
请求信息到达swift-proxy服务之后,会确定获取具体的控制器(ObjectController、ContainerController、AccountController),接下来就是调用/swift/proxy/controllers/account.py或/swift/proxy/controllers/container.py或/swift/proxy/controllers/obj.py下面的PUT,POST,DELETE,GET,HEAD等方法;然后再在具体的方法中实现到具体存储服务(Object-server、Container-server、Account-server)的连接,继而调用其下具体的PUT,POST,DELETE,GET,HEAD等方法来进行请求req的实现;
这篇博客主要关注swift-proxy与swift-account服务中PUT,POST,DELETE,GET,HEAD等方法的对应调用实现;
源码解析部分(代码中较重要的部分已经进行了相关的注释):
GETorHEAD
/swift/proxy/controllers/account.py----class AccountController(Controller)----def GETorHEAD
- def GETorHEAD(self, req):
- """
- 通过HTTP处理GET/HEAD请求;
- """
- if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH:
- resp = HTTPBadRequest(request=req)
- resp.body = 'Account name length of %d longer than %d' % (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH)
- return resp
-
- # get_nodes:为object获取分区号和所有副本节点信息;
- partition, nodes = self.app.account_ring.get_nodes(self.account_name)
-
- # GETorHEAD_base:HTTP GET或HEAD的基本处理;
- # 返回swob.Response对象;
- resp = self.GETorHEAD_base(
- req, _('Account'), self.app.account_ring, partition,
- req.swift_entity_path.rstrip('/'))
-
- if resp.status_int == HTTP_NOT_FOUND:
- if resp.headers.get('X-Account-Status', '').lower() == 'deleted':
- resp.status = HTTP_GONE
- elif self.app.account_autocreate:
- resp = account_listing_response(self.account_name, req,
- get_listing_content_type(req))
- if req.environ.get('swift_owner'):
- self.add_acls_from_sys_metadata(resp)
- else:
- for header in self.app.swift_owner_headers:
- resp.headers.pop(header, None)
- return resp
复制代码
1.为object获取分区号和所有副本节点信息;
2.HTTP GET或HEAD的基本处理,实现发送请求到远程对象服务上并调用具体方法来处理请求信息;
注:这里的具体实现就不详细解析了;
/swift/account/server.py----class AccountController(object)----def HEAD
- def HEAD(self, req):
- """
- 处理HTTP协议的HEAD请求;
- HEAD请求返回account的基本信息,并以key-value的形式保存在HTTPHEAD中返回;
- """
- drive, part, account = split_and_validate_path(req, 3)
- out_content_type = get_listing_content_type(req)
-
- # mount_check是是否进行mount检查;
- # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
- if self.mount_check and not check_mount(self.root, drive):
- return HTTPInsufficientStorage(drive=drive, request=req)
-
- # _get_account_broker是一个内部方法,功能是返回一个AccountBroker的实例,用于代理对sqlite数据库的操作;
- broker = self._get_account_broker(drive, part, account,
- pending_timeout=0.1,
- stale_reads_ok=True)
-
- if broker.is_deleted():
- return self._deleted_response(broker, req, HTTPNotFound)
-
- # get_info:为账户获取全局数据,获得account的基本信息;
- # 返回包括account, created_at, put_timestamp, delete_timestamp, container_count, object_count, bytes_used, hash, id等值的字典;
- info = broker.get_info()
- headers = {
- 'X-Account-Container-Count': info['container_count'],
- 'X-Account-Object-Count': info['object_count'],
- 'X-Account-Bytes-Used': info['bytes_used'],
- 'X-Timestamp': info['created_at'],
- 'X-PUT-Timestamp': info['put_timestamp']}
- headers.update((key, value)
- for key, (value, timestamp) in
- broker.metadata.iteritems() if value != '')
- headers['Content-Type'] = out_content_type
-
- return HTTPNoContent(request=req, headers=headers, charset='utf-8')
复制代码
注:HEAD请求返回account的基本信息(元数据信息),并以key-value的形式保存在HTTPHEAD中返回;
/swift/account/server.py----class AccountController(object)----def GET
- def GET(self, req):
- """
- 处理HTTP协议的GET请求;
- GET同HEAD一样,都是请求返回account的基本信息,并以key-value的形式保存在HTTPHEAD当中;
- 不同之处在于GET方法中获取了指定account下的container列表,存储在body中,同HTTPHEAD一同返回;
- """
- drive, part, account = split_and_validate_path(req, 3)
- prefix = get_param(req, 'prefix')
- delimiter = get_param(req, 'delimiter')
- if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):
- # delimiters can be made more flexible later
- return HTTPPreconditionFailed(body='Bad delimiter')
- limit = ACCOUNT_LISTING_LIMIT
- given_limit = get_param(req, 'limit')
- if given_limit and given_limit.isdigit():
- limit = int(given_limit)
- if limit > ACCOUNT_LISTING_LIMIT:
- return HTTPPreconditionFailed(request=req, body='Maximum limit is %d' % ACCOUNT_LISTING_LIMIT)
- marker = get_param(req, 'marker', '')
- end_marker = get_param(req, 'end_marker')
- out_content_type = get_listing_content_type(req)
-
- # mount_check是是否进行mount检查;
- # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
- if self.mount_check and not check_mount(self.root, drive):
- return HTTPInsufficientStorage(drive=drive, request=req)
-
- # _get_account_broker是一个内部方法,功能是返回一个AccountBroker的实例,用于代理对sqlite数据库的操作;
- broker = self._get_account_broker(drive, part, account,
- pending_timeout=0.1,
- stale_reads_ok=True)
-
- if broker.is_deleted():
- return self._deleted_response(broker, req, HTTPNotFound)
-
- # 获取指定account下的容器列表(包括具体容器的相关信息);
- return account_listing_response(account, req, out_content_type, broker,
- limit, marker, end_marker, prefix,
- delimiter)
复制代码
来看方法account_listing_response的具体实现:
- def account_listing_response(account, req, response_content_type, broker=None,
- limit='', marker='', end_marker='', prefix='',
- delimiter=''):
- """
- 获取指定account下的容器列表(包括具体容器的相关信息);
- """
- if broker is None:
- broker = FakeAccountBroker()
-
- info = broker.get_info()
- resp_headers = {
- 'X-Account-Container-Count': info['container_count'],
- 'X-Account-Object-Count': info['object_count'],
- 'X-Account-Bytes-Used': info['bytes_used'],
- 'X-Timestamp': info['created_at'],
- 'X-PUT-Timestamp': info['put_timestamp']}
- resp_headers.update((key, value)
- for key, (value, timestamp) in
- broker.metadata.iteritems() if value != '')
-
- # 获取容器列表,每个容器信息包括(name, object_count, bytes_used, 0);
- account_list = broker.list_containers_iter(limit, marker, end_marker,
- prefix, delimiter)
- if response_content_type == 'application/json':
- data = []
- for (name, object_count, bytes_used, is_subdir) in account_list:
- if is_subdir:
- data.append({'subdir': name})
- else:
- data.append({'name': name, 'count': object_count,
- 'bytes': bytes_used})
- account_list = json.dumps(data)
- elif response_content_type.endswith('/xml'):
- output_list = ['<?xml version="1.0" encoding="UTF-8"?>',
- '<account name=%s>' % saxutils.quoteattr(account)]
- for (name, object_count, bytes_used, is_subdir) in account_list:
- if is_subdir:
- output_list.append(
- '<subdir name=%s />' % saxutils.quoteattr(name))
- else:
- item = '<container><name>%s</name><count>%s</count>' \
- '<bytes>%s</bytes></container>' % \
- (saxutils.escape(name), object_count, bytes_used)
- output_list.append(item)
- output_list.append('</account>')
- account_list = '\n'.join(output_list)
- else:
- if not account_list:
- resp = HTTPNoContent(request=req, headers=resp_headers)
- resp.content_type = response_content_type
- resp.charset = 'utf-8'
- return resp
- account_list = '\n'.join(r[0] for r in account_list) + '\n'
- ret = HTTPOk(body=account_list, request=req, headers=resp_headers)
- ret.content_type = response_content_type
- ret.charset = 'utf-8'
- return ret
复制代码
下面将继续swift-proxy与swift-account的分析工作。
PUT
/swift/proxy/controllers/account.py----class AccountController(Controller)----def PUT
- def PUT(self, req):
- """HTTP PUT request handler."""
- if not self.app.allow_account_management:
- return HTTPMethodNotAllowed(
- request=req,
- headers={'Allow': ', '.join(self.allowed_methods)})
- error_response = check_metadata(req, 'account')
- if error_response:
- return error_response
- if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH:
- resp = HTTPBadRequest(request=req)
- resp.body = 'Account name length of %d longer than %d' % \
- (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH)
- return resp
-
-
- # 获取指定account的分区号和所有副本节点;
- account_partition, accounts = \
- self.app.account_ring.get_nodes(self.account_name)
- # 根据原始请求和额外的头信息,为后端的请求生成头信息字典;
- headers = self.generate_request_headers(req, transfer=True)
- clear_info_cache(self.app, req.environ, self.account_name)
- resp = self.make_requests(
- req, self.app.account_ring, account_partition, 'PUT',
- req.swift_entity_path, [headers] * len(accounts))
-
- self.add_acls_from_sys_metadata(resp)
- return resp
复制代码
这里代码比较好理解,这里来看以下方法make_requests的实现,这个方法是比较重要的:
- def make_requests(self, req, ring, part, method, path, headers, query_string=''):
- """
- 发送一个HTTP请求到多个节点,并汇聚所有返回的响应信息;
- 根据投票机制,根据现实所有响应信息,返回通过投票机制的响应信息(因为是获取多个节点的响应信息);
-
- 调用示例:
- resp = self.make_requests(
- req, self.app.account_ring, account_partition, 'PUT',
- req.path_info, [headers] * len(accounts))
- """
- # get_part_nodes:获取一个分区所有副本相关的节点信息;
- start_nodes = ring.get_part_nodes(part)
- nodes = GreenthreadSafeIterator(self.app.iter_nodes(ring, part))
- pile = GreenAsyncPile(len(start_nodes))
-
- for head in headers:
- # _make_request:发送请求迭代器,实现一次发送请求到一个远程节点;
- pile.spawn(self._make_request, nodes, part, method, path,
- head, query_string, self.app.logger.thread_locals)
- response = []
- statuses = []
- for resp in pile:
- if not resp:
- continue
- response.append(resp)
- statuses.append(resp[0])
- if self.have_quorum(statuses, len(start_nodes)):
- break
- # give any pending requests *some* chance to finish
- pile.waitall(self.app.post_quorum_timeout)
- while len(response) < len(start_nodes):
- response.append((HTTP_SERVICE_UNAVAILABLE, '', '', ''))
- statuses, reasons, resp_headers, bodies = zip(*response)
-
- # 根据投票机制,根据现实所有响应信息,实现返回通过投票机制的响应信息(因为是获取多个节点的响应信息);
- return self.best_response(req, statuses, reasons, bodies,
- '%s %s' % (self.server_type, req.method),
- headers=resp_headers)
复制代码
具体的解释代码注释中已经标注了,也是比较好理解的,再来看一下这里方法best_response的具体实现:
- def best_response(self, req, statuses, reasons, bodies, server_type, etag=None, headers=None):
- """
- 从一些服务器给定响应的列表,根据投票机制,根据现实所有响应信息,实现返回通过投票机制的响应信息;
- """
- # Response:WSGI相应对象类;
- resp = Response(request=req)
- if len(statuses):
- for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST):
- hstatuses = [s for s in statuses if hundred = quorum_size(len(statuses)):
- status = max(hstatuses)
- status_index = statuses.index(status)
- resp.status = '%s %s' % (status, reasons[status_index])
- resp.body = bodies[status_index]
- if headers:
- update_headers(resp, headers[status_index])
- if etag:
- resp.headers['etag'] = etag.strip('"')
- return resp
- self.app.logger.error(_('%(type)s returning 503 for %(statuses)s'),
- {'type': server_type, 'statuses': statuses})
- resp.status = '503 Internal Server Error'
- return resp
复制代码
/swift/account/server.py----class AccountController(object)----def PUT
- def PUT(self, req):
- """
- 处理HTTP协议PUT请求;
- PUT请求会handle两种类型的HTTP请求:
- 当url中有时,会新建/更新container的metadata信息;
- 当url中没有时,会更新account的metadata信息;
- """
- # 分割和验证给定的请求路径,获取drive, part, account, container;
- drive, part, account, container = split_and_validate_path(req, 3, 4)
-
- # mount_check是是否进行mount检查;
- # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
- if self.mount_check and not check_mount(self.root, drive):
- return HTTPInsufficientStorage(drive=drive, request=req)
-
- # 当url中有时,会新建/更新container的信息;
- if container: # put account container
- pending_timeout = None
- if 'x-trans-id' in req.headers:
- pending_timeout = 3
-
- # _get_account_broker是一个内部方法,功能是返回一个AccountBroker的实例,用于代理对sqlite数据库的操作;
- broker = self._get_account_broker(drive, part, account, pending_timeout=pending_timeout)
-
- # 如果相关的数据库不存在,则进行数据库的初始化操作;
- if account.startswith(self.auto_create_account_prefix) and not os.path.exists(broker.db_file):
-
- # normalize_timestamp:把timestamp(时间戳)转换为标准格式;
- # initialize:数据库初始化;
- try:
- broker.initialize(normalize_timestamp(req.headers.get('x-timestamp') or time.time()))
- except DatabaseAlreadyExists:
- pass
- if req.headers.get('x-account-override-deleted', 'no').lower() != 'yes' and broker.is_deleted():
- return HTTPNotFound(request=req)
-
- # 应用给定的属性建立container(在数据库中);
- broker.put_container(container, req.headers['x-put-timestamp'],
- req.headers['x-delete-timestamp'],
- req.headers['x-object-count'],
- req.headers['x-bytes-used'])
- if req.headers['x-delete-timestamp'] > req.headers['x-put-timestamp']:
- return HTTPNoContent(request=req)
- else:
- return HTTPCreated(request=req)
-
- # 当url中没有时,会更新account的metadata信息
- else: # put account
- # _get_account_broker是一个内部方法,功能是返回一个AccountBroker的实例,用于代理对sqlite数据库的操作;
- broker = self._get_account_broker(drive, part, account)
- # normalize_timestamp:把timestamp(时间戳)转换为标准格式;
- timestamp = normalize_timestamp(req.headers['x-timestamp'])
-
- # 如果对应的数据库对象不存在,则进行数据库的初始化操作;
- if not os.path.exists(broker.db_file):
- try:
- broker.initialize(timestamp)
- created = True
- except DatabaseAlreadyExists:
- created = False
-
- # is_status_deleted:如果状态标志为DELETED,则返回true;
- elif broker.is_status_deleted():
- return self._deleted_response(broker, req, HTTPForbidden, body='Recently deleted')
- else:
- # 检测帐号的数据库是否被删除;
- created = broker.is_deleted()
- # 如果put_timestamp小于当前的时间戳timestamp,则更新put_timestamp;
- broker.update_put_timestamp(timestamp)
- if broker.is_deleted():
- return HTTPConflict(request=req)
-
- metadata = {}
- # 根据requesthead中的以'x-account-meta-'开始的key的值更新到metadata;
- metadata.update((key, (value, timestamp))
- for key, value in req.headers.iteritems()
- if is_sys_or_user_meta('account', key))
-
- # 更新数据库的metadata字段(AccountBroker#update_metadata);
- # 更新数据库的元数据字典;
- if metadata:
- broker.update_metadata(metadata)
-
- # 如果created==True,返回201Created,否则返回202Accepted
- if created:
- return HTTPCreated(request=req)
- else:
- return HTTPAccepted(request=req)PUT
复制代码
请求会handle两种类型的HTTP请求:
当url中有时,会新建/更新container的metadata信息;
当url中没有时,会更新account的metadata信息;
POST
/swift/proxy/controllers/account.py----class AccountController(Controller)----def POST
- def POST(self, req):
- """HTTP POST request handler."""
- if len(self.account_name) > MAX_ACCOUNT_NAME_LENGTH:
- resp = HTTPBadRequest(request=req)
- resp.body = 'Account name length of %d longer than %d' % (len(self.account_name), MAX_ACCOUNT_NAME_LENGTH)
- return resp
- error_response = check_metadata(req, 'account')
- if error_response:
- return error_response
-
- # 获取指定account的分区号和所有副本节点;
- account_partition, accounts = self.app.account_ring.get_nodes(self.account_name)
-
- # 根据原始请求和额外的头信息,为后端的请求生成头信息字典;
- headers = self.generate_request_headers(req, transfer=True)
-
- clear_info_cache(self.app, req.environ, self.account_name)
-
- resp = self.make_requests(
- req, self.app.account_ring, account_partition, 'POST',
- req.swift_entity_path, [headers] * len(accounts))
-
- # 如果没有找到指定account,则先建立这个account;
- # 再实现对其的POST操作;
- if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate:
- self.autocreate_account(req.environ, self.account_name)
- resp = self.make_requests(
- req, self.app.account_ring, account_partition, 'POST',
- req.swift_entity_path, [headers] * len(accounts))
-
- self.add_acls_from_sys_metadata(resp)
- return resp
复制代码
/swift/account/server.py----class AccountController(object)----def POST
- def POST(self, req):
- """
- 处理HTTP协议的POST请求;
- 实现更新account的元数据信息,从head中取出特定要求的metadata更新至指定account的数据库(AccountBroker#update_metadata);
- """
- drive, part, account = split_and_validate_path(req, 3)
- if 'x-timestamp' not in req.headers or not check_float(req.headers['x-timestamp']):
- return HTTPBadRequest(body='Missing or bad timestamp',
- request=req,
- content_type='text/plain')
- if self.mount_check and not check_mount(self.root, drive):
- return HTTPInsufficientStorage(drive=drive, request=req)
-
- # _get_account_broker是一个内部方法,功能是返回一个AccountBroker的实例,用于代理对sqlite数据库的操作;
- broker = self._get_account_broker(drive, part, account)
-
- if broker.is_deleted():
- return self._deleted_response(broker, req, HTTPNotFound)
-
- # 把timestamp(时间戳)转换为标准格式;
- timestamp = normalize_timestamp(req.headers['x-timestamp'])
-
- metadata = {}
- metadata.update((key, (value, timestamp))
- for key, value in req.headers.iteritems()
- if is_sys_or_user_meta('account', key))
-
- # 然后从head中取出特定要求的metadata更新至数据库;
- if metadata:
- broker.update_metadata(metadata)
-
- return HTTPNoContent(request=req)
复制代码
DELETE
/swift/proxy/controllers/account.py----class AccountController(Controller)----def DELETE
- def DELETE(self, req):
- """HTTP DELETE request handler."""
- if req.query_string:
- return HTTPBadRequest(request=req)
- if not self.app.allow_account_management:
- return HTTPMethodNotAllowed(
- request=req,
- headers={'Allow': ', '.join(self.allowed_methods)})
-
- # 获取指定account的分区号和所有副本节点;
- account_partition, accounts = self.app.account_ring.get_nodes(self.account_name)
-
- # 根据原始请求和额外的头信息,为后端的请求生成头信息字典;
- headers = self.generate_request_headers(req)
-
- clear_info_cache(self.app, req.environ, self.account_name)
-
- resp = self.make_requests(
- req, self.app.account_ring, account_partition, 'DELETE',
- req.swift_entity_path, [headers] * len(accounts))
复制代码
/swift/account/server.py----class AccountController(object)----def DELETE
- def DELETE(self, req):
- """
- 处理HTTP协议DELETE请求;
- DELETE请求会删除当前account,但是这里的删除是逻辑删除,只是标记account为删除状态,并不会真正删除account和相关资源;
- """
- # 分割和验证给定的请求路径,获取drive, part, account;
- drive, part, account = split_and_validate_path(req, 3)
-
- # mount_check是是否进行mount检查;
- # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
- # root是devices所在根目录;
- # self.root = conf.get('devices', '/srv/node')
- if self.mount_check and not check_mount(self.root, drive):
- return HTTPInsufficientStorage(drive=drive, request=req)
-
- # 检查head是否包含特定信息'x-timestamp';
- if 'x-timestamp' not in req.headers or \
- not check_float(req.headers['x-timestamp']):
- return HTTPBadRequest(body='Missing timestamp', request=req, content_type='text/plain')
-
- # _get_account_broker是一个内部方法,功能是返回一个AccountBroker的实例,用于代理对sqlite数据库的操作;
- broker = self._get_account_broker(drive, part, account)
-
- # 如果对应的数据库中的对象已经删除,则引发http错误提示;
- if broker.is_deleted():
- return self._deleted_response(broker, req, HTTPNotFound)
-
- # 对数据库中的对象进行删除状态的标记工作,并不会执行文件的删除工作;
- broker.delete_db(req.headers['x-timestamp'])
- return self._deleted_response(broker, req, HTTPNoContent)
复制代码
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
|