分享

Swift源码分析----swift-account-reaper(2)

pig2 发表于 2014-11-21 16:07:10 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 13672

问题导读

1.方法direct_delete_container的实现了什么功能?
2.方法direct_delete_object的实现了什么功能?







接上一篇
Swift源码分析----swift-account-reaper(1)

  1. def reap_container(self, account, account_partition, account_nodes, container):  
  2.      """         
  3.      实现收割container操作;
  4.      实现删除容器container下数据和容器container本身;
  5.      当执行删除一个单独的object出现异常的时候,进程将会继续执行删除container中其它object的操作,
  6.      删除失败的object将会在下一次这个方法被调用的时候继续尝试删除操作;
  7.      当获取要删除的object列表出现异常的时候,进程将会停止(但是也将会在下一次这个方法被调用的时候继续尝试这个操作);
  8.      如果所有object已经被删除,将会通过发送一个删除请求到所有的container节点,来实现删除container本身;
  9.      每个container服务的删除将会更新对应的account服务,且从account列表删除container;
  10.      """  
  11.     account_nodes = list(account_nodes)  
  12.          
  13.     # get_container_ring:获取swift.common.ring.Ring对象,名称为'container';  
  14.     # 为account/container/object获取分区和节点信息;  
  15.     # 返回元组(分区,节点信息列表);  
  16.     # 在节点信息列表中至少包含id、weight、zone、ip、port、device、meta;  
  17.     part, nodes = self.get_container_ring().get_nodes(account, container)  
  18.     node = nodes[-1]  
  19.     # 绿色线程的连接池类;  
  20.     pool = GreenPool(size=self.object_concurrency)  
  21.     marker = ''  
  22.          
  23.     # 删除从container中获取的所有object;  
  24.     while True:  
  25.         objects = None  
  26.         try:  
  27.             # direct_get_container:发送调用'GET'方法的请求,实现从容器服务(器)直接获取容器中内容的列表;  
  28.             # objects获取所哟的object;  
  29.             objects = direct_get_container(  
  30.                         node, part, account, container,  
  31.                         marker=marker,  
  32.                         conn_timeout=self.conn_timeout,  
  33.                         response_timeout=self.node_timeout)[1]  
  34.             self.stats_return_codes[2] = self.stats_return_codes.get(2, 0) + 1  
  35.             self.logger.increment('return_codes.2')  
  36.         except ClientException as err:  
  37.             if self.logger.getEffectiveLevel() <= DEBUG:  
  38.                 self.logger.exception(_('Exception with %(ip)s:%(port)s/%(device)s'), node)  
  39.             self.stats_return_codes[err.http_status / 100] = self.stats_return_codes.get(err.http_status / 100, 0) + 1  
  40.             self.logger.increment('return_codes.%d' % (err.http_status / 100,))  
  41.         if not objects:  
  42.             break  
  43.               
  44.         # 删除从container中获取的所有object;  
  45.         try:  
  46.             for obj in objects:  
  47.                 if isinstance(obj['name'], unicode):  
  48.                     obj['name'] = obj['name'].encode('utf8')  
  49.                      
  50.                 # 在绿色线程中执行方法reap_object;  
  51.                 # reap_object:通过一个发送到每个节点的删除object请求,执行删除给定的obj['name'];  
  52.                 # 执行删除请求,每个object服务将会更新删除相应的容器服务器;  
  53.                 # 并从container列表删除这个object;  
  54.                 pool.spawn(self.reap_object, account, container, part, nodes, obj['name'])  
  55.             pool.waitall()  
  56.         except (Exception, Timeout):  
  57.             self.logger.exception(_('Exception with objects for container '  
  58.                                      '%(container)s for account %(account)s'),  
  59.                                    {'container': container, 'account': account})  
  60.         marker = objects[-1]['name']  
  61.         if marker == '':  
  62.             break  
  63.     successes = 0  
  64.     failures = 0  
  65.          
  66.     # 在所有相关节点中,删除container的相关信息;  
  67.     for node in nodes:  
  68.         anode = account_nodes.pop()  
  69.         try:  
  70.             # direct_delete_container:发送调用'DELETE'方法的请求,实现从account服务(器)直接删除container;  
  71.             direct_delete_container(  
  72.                     node, part, account, container,  
  73.                     conn_timeout=self.conn_timeout,  
  74.                     response_timeout=self.node_timeout,  
  75.                     headers={'X-Account-Host': '%(ip)s:%(port)s' % anode,  
  76.                              'X-Account-Partition': str(account_partition),  
  77.                              'X-Account-Device': anode['device'],  
  78.                              'X-Account-Override-Deleted': 'yes'})  
  79.   
  80.             successes += 1  
  81.             self.stats_return_codes[2] = self.stats_return_codes.get(2, 0) + 1  
  82.             self.logger.increment('return_codes.2')  
  83.         except ClientException as err:  
  84.             if self.logger.getEffectiveLevel() <= DEBUG:  
  85.                 self.logger.exception(_('Exception with %(ip)s:%(port)s/%(device)s'), node)  
  86.             failures += 1  
  87.             self.logger.increment('containers_failures')  
  88.             self.stats_return_codes[err.http_status / 100] = self.stats_return_codes.get(err.http_status / 100, 0) + 1  
  89.             self.logger.increment('return_codes.%d' % (err.http_status / 100,))  
  90.     if successes > failures:  
  91.         self.stats_containers_deleted += 1  
  92.         self.logger.increment('containers_deleted')  
  93.     elif not successes:  
  94.         self.stats_containers_remaining += 1  
  95.         self.logger.increment('containers_remaining')  
  96.     else:  
  97.         self.stats_containers_possibly_remaining += 1  
  98.         self.logger.increment('containers_possibly_remaining')  
复制代码

1.获取指定account指定container的分区号和所有副本所在节点信息,从节点列表中获取一个节点(因为所有节点上的副本信息都是一致的);
2.调用方法direct_get_container,实现发送调用'GET'方法的请求,实现从容器服务(器)直接获取指定容器下的所有对象列表;
3.遍历所有的对象,针对每一个对象调用方法reap_object实现从对象服务器删除指定对象的数据信息;
4.遍历container的所有副本所在节点,针对每一个节点调用方法direct_delete_container实现从服务(器)直接删除container相关数据信息(比如元数据信息数据库信息等等);

转到3,来看方法reap_object的实现:
  1. def reap_object(self, account, container, container_partition, container_nodes, obj):  
  2.      """         
  3.      实现收割object操作;
  4.      通过一个发送到每个节点的删除object请求,执行删除给定的object;
  5.      执行删除请求,每个object服务将会更新删除相应的容器服务器;
  6.      并从container列表删除这个object;
  7.      """  
  8.     # container副本相关节点;  
  9.     container_nodes = list(container_nodes)  
  10.          
  11.     # 为account/container/object获取分区和节点信息;  
  12.     # 返回元组(分区,节点信息列表);  
  13.     # 在节点信息列表中至少包含id、weight、zone、ip、port、device、meta;  
  14.     # get_object_ring:获取swift.common.ring.Ring对象,名称为'object';         
  15.     part, nodes = self.get_object_ring().get_nodes(account, container, obj)  
  16.     successes = 0  
  17.     failures = 0  
  18.     for node in nodes:  
  19.         cnode = container_nodes.pop()  
  20.         try:  
  21.             # 建立一个HTTPConnection类的对象;  
  22.             # 发出的HTTP请求的方法'DELETE'到服务器;  
  23.             # 直接从对象服务(器)删除对象;  
  24.             # 获取来自服务器的响应;  
  25.             direct_delete_object(  
  26.                     node, part, account, container, obj,  
  27.                     conn_timeout=self.conn_timeout,  
  28.                     response_timeout=self.node_timeout,  
  29.                     headers={'X-Container-Host': '%(ip)s:%(port)s' % cnode,  
  30.                              'X-Container-Partition': str(container_partition),  
  31.                              'X-Container-Device': cnode['device']})  
  32.   
  33.             successes += 1  
  34.             self.stats_return_codes[2] = self.stats_return_codes.get(2, 0) + 1  
  35.             self.logger.increment('return_codes.2')  
  36.         except ClientException as err:  
  37.             if self.logger.getEffectiveLevel() <= DEBUG:  
  38.                 self.logger.exception(_('Exception with %(ip)s:%(port)s/%(device)s'), node)  
  39.             failures += 1  
  40.             self.logger.increment('objects_failures')  
  41.             self.stats_return_codes[err.http_status / 100] = self.stats_return_codes.get(err.http_status / 100, 0) + 1  
  42.             self.logger.increment('return_codes.%d' % (err.http_status / 100,))  
  43.         if successes > failures:  
  44.             self.stats_objects_deleted += 1  
  45.             self.logger.increment('objects_deleted')  
  46.         elif not successes:  
  47.             self.stats_objects_remaining += 1  
  48.             self.logger.increment('objects_remaining')  
  49.         else:  
  50.             self.stats_objects_possibly_remaining += 1  
  51.             self.logger.increment('objects_possibly_remaining')  
复制代码




3.1.获取指定account指定container指定object的分区号和所有副本所在节点信息;
3.2.遍历所有副本所在节点,针对每一个节点调用方法direct_delete_object实现发送调用'DELETE'方法的请求,实现从对象服务(器)直接删除指定对象数据;

转到3.2,来看方法direct_delete_object的实现:
  1. def direct_delete_object(node, part, account, container, obj, conn_timeout=5, response_timeout=15, headers=None):  
  2.      """
  3.      直接从对象服务(器)删除对象;
  4.      建立一个HTTPConnection类的对象;
  5.      发出的HTTP请求的方法'DELETE'到服务器;
  6.      直接从对象服务(器)删除对象;
  7.      获取来自服务器的响应;
  8.      """  
  9.     if headers is None:  
  10.         headers = {}  
  11.   
  12.     path = '/%s/%s/%s' % (account, container, obj)  
  13.     with Timeout(conn_timeout):  
  14.         # 建立一个HTTPConnection类的对象;  
  15.         # 发出的HTTP请求的方法'DELETE';  
  16.         # 返回HTTPConnection连接对象;  
  17.         conn = http_connect(node['ip'], node['port'], node['device'], part,  
  18.                             'DELETE', path, headers=gen_headers(headers, True))  
  19.     with Timeout(response_timeout):  
  20.         # getresponse:获取来自服务器的响应;  
  21.         resp = conn.getresponse()  
  22.         resp.read()  
  23.     if not is_success(resp.status):  
  24.         raise ClientException(  
  25.             'Object server %s:%s direct DELETE %s gave status %s' %  
  26.             (node['ip'], node['port'],  
  27.              repr('/%s/%s%s' % (node['device'], part, path)),  
  28.              resp.status),  
  29.             http_host=node['ip'], http_port=node['port'],  
  30.             http_device=node['device'], http_status=resp.status,  
  31.             http_reason=resp.reason)  
复制代码




转到4,来看方法direct_delete_container的实现:
  1. def direct_delete_container(node, part, account, container, conn_timeout=5, response_timeout=15, headers=None):  
  2.      """
  3.      发送调用'DELETE'方法的请求,实现从容器服务(器)直接删除container相关数据;
  4.      """  
  5.     if headers is None:  
  6.         headers = {}  
  7.   
  8.     path = '/%s/%s' % (account, container)  
  9.     with Timeout(conn_timeout):  
  10.         conn = http_connect(node['ip'], node['port'], node['device'], part,  
  11.                             'DELETE', path, headers=gen_headers(headers, True))  
  12.     with Timeout(response_timeout):  
  13.         resp = conn.getresponse()  
  14.         resp.read()  
  15.     if not is_success(resp.status):  
  16.         raise ClientException(  
  17.             'Container server %s:%s direct DELETE %s gave status %s' %  
  18.             (node['ip'], node['port'],  
  19.              repr('/%s/%s%s' % (node['device'], part, path)), resp.status),  
  20.             http_host=node['ip'], http_port=node['port'],  
  21.             http_device=node['device'], http_status=resp.status,  
  22.             http_reason=resp.reason)  
复制代码










没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条