pig2 发表于 2014-11-21 16:07:10

Swift源码分析----swift-account-reaper(2)


问题导读

1.方法direct_delete_container的实现了什么功能?
2.方法direct_delete_object的实现了什么功能?


static/image/hrline/4.gif




接上一篇
Swift源码分析----swift-account-reaper(1)

def reap_container(self, account, account_partition, account_nodes, container):
   """         
   实现收割container操作;
   实现删除容器container下数据和容器container本身;
   当执行删除一个单独的object出现异常的时候,进程将会继续执行删除container中其它object的操作,
   删除失败的object将会在下一次这个方法被调用的时候继续尝试删除操作;
   当获取要删除的object列表出现异常的时候,进程将会停止(但是也将会在下一次这个方法被调用的时候继续尝试这个操作);
   如果所有object已经被删除,将会通过发送一个删除请求到所有的container节点,来实现删除container本身;
   每个container服务的删除将会更新对应的account服务,且从account列表删除container;
   """
    account_nodes = list(account_nodes)
         
    # get_container_ring:获取swift.common.ring.Ring对象,名称为'container';
    # 为account/container/object获取分区和节点信息;
    # 返回元组(分区,节点信息列表);
    # 在节点信息列表中至少包含id、weight、zone、ip、port、device、meta;
    part, nodes = self.get_container_ring().get_nodes(account, container)
    node = nodes[-1]
    # 绿色线程的连接池类;
    pool = GreenPool(size=self.object_concurrency)
    marker = ''
         
    # 删除从container中获取的所有object;
    while True:
      objects = None
      try:
            # direct_get_container:发送调用'GET'方法的请求,实现从容器服务(器)直接获取容器中内容的列表;
            # objects获取所哟的object;
            objects = direct_get_container(
                        node, part, account, container,
                        marker=marker,
                        conn_timeout=self.conn_timeout,
                        response_timeout=self.node_timeout)
            self.stats_return_codes = self.stats_return_codes.get(2, 0) + 1
            self.logger.increment('return_codes.2')
      except ClientException as err:
            if self.logger.getEffectiveLevel() <= DEBUG:
                self.logger.exception(_('Exception with %(ip)s:%(port)s/%(device)s'), node)
            self.stats_return_codes = self.stats_return_codes.get(err.http_status / 100, 0) + 1
            self.logger.increment('return_codes.%d' % (err.http_status / 100,))
      if not objects:
            break
            
      # 删除从container中获取的所有object;
      try:
            for obj in objects:
                if isinstance(obj['name'], unicode):
                  obj['name'] = obj['name'].encode('utf8')
                     
                # 在绿色线程中执行方法reap_object;
                # reap_object:通过一个发送到每个节点的删除object请求,执行删除给定的obj['name'];
                # 执行删除请求,每个object服务将会更新删除相应的容器服务器;
                # 并从container列表删除这个object;
                pool.spawn(self.reap_object, account, container, part, nodes, obj['name'])
            pool.waitall()
      except (Exception, Timeout):
            self.logger.exception(_('Exception with objects for container '
                                     '%(container)s for account %(account)s'),
                                 {'container': container, 'account': account})
      marker = objects[-1]['name']
      if marker == '':
            break
    successes = 0
    failures = 0
         
    # 在所有相关节点中,删除container的相关信息;
    for node in nodes:
      anode = account_nodes.pop()
      try:
            # direct_delete_container:发送调用'DELETE'方法的请求,实现从account服务(器)直接删除container;
            direct_delete_container(
                  node, part, account, container,
                  conn_timeout=self.conn_timeout,
                  response_timeout=self.node_timeout,
                  headers={'X-Account-Host': '%(ip)s:%(port)s' % anode,
                           'X-Account-Partition': str(account_partition),
                           'X-Account-Device': anode['device'],
                           'X-Account-Override-Deleted': 'yes'})

            successes += 1
            self.stats_return_codes = self.stats_return_codes.get(2, 0) + 1
            self.logger.increment('return_codes.2')
      except ClientException as err:
            if self.logger.getEffectiveLevel() <= DEBUG:
                self.logger.exception(_('Exception with %(ip)s:%(port)s/%(device)s'), node)
            failures += 1
            self.logger.increment('containers_failures')
            self.stats_return_codes = self.stats_return_codes.get(err.http_status / 100, 0) + 1
            self.logger.increment('return_codes.%d' % (err.http_status / 100,))
    if successes > failures:
      self.stats_containers_deleted += 1
      self.logger.increment('containers_deleted')
    elif not successes:
      self.stats_containers_remaining += 1
      self.logger.increment('containers_remaining')
    else:
      self.stats_containers_possibly_remaining += 1
      self.logger.increment('containers_possibly_remaining')
1.获取指定account指定container的分区号和所有副本所在节点信息,从节点列表中获取一个节点(因为所有节点上的副本信息都是一致的);
2.调用方法direct_get_container,实现发送调用'GET'方法的请求,实现从容器服务(器)直接获取指定容器下的所有对象列表;
3.遍历所有的对象,针对每一个对象调用方法reap_object实现从对象服务器删除指定对象的数据信息;
4.遍历container的所有副本所在节点,针对每一个节点调用方法direct_delete_container实现从服务(器)直接删除container相关数据信息(比如元数据信息数据库信息等等);

转到3,来看方法reap_object的实现:
def reap_object(self, account, container, container_partition, container_nodes, obj):
   """         
   实现收割object操作;
   通过一个发送到每个节点的删除object请求,执行删除给定的object;
   执行删除请求,每个object服务将会更新删除相应的容器服务器;
   并从container列表删除这个object;
   """
    # container副本相关节点;
    container_nodes = list(container_nodes)
         
    # 为account/container/object获取分区和节点信息;
    # 返回元组(分区,节点信息列表);
    # 在节点信息列表中至少包含id、weight、zone、ip、port、device、meta;
    # get_object_ring:获取swift.common.ring.Ring对象,名称为'object';         
    part, nodes = self.get_object_ring().get_nodes(account, container, obj)
    successes = 0
    failures = 0
    for node in nodes:
      cnode = container_nodes.pop()
      try:
            # 建立一个HTTPConnection类的对象;
            # 发出的HTTP请求的方法'DELETE'到服务器;
            # 直接从对象服务(器)删除对象;
            # 获取来自服务器的响应;
            direct_delete_object(
                  node, part, account, container, obj,
                  conn_timeout=self.conn_timeout,
                  response_timeout=self.node_timeout,
                  headers={'X-Container-Host': '%(ip)s:%(port)s' % cnode,
                           'X-Container-Partition': str(container_partition),
                           'X-Container-Device': cnode['device']})

            successes += 1
            self.stats_return_codes = self.stats_return_codes.get(2, 0) + 1
            self.logger.increment('return_codes.2')
      except ClientException as err:
            if self.logger.getEffectiveLevel() <= DEBUG:
                self.logger.exception(_('Exception with %(ip)s:%(port)s/%(device)s'), node)
            failures += 1
            self.logger.increment('objects_failures')
            self.stats_return_codes = self.stats_return_codes.get(err.http_status / 100, 0) + 1
            self.logger.increment('return_codes.%d' % (err.http_status / 100,))
      if successes > failures:
            self.stats_objects_deleted += 1
            self.logger.increment('objects_deleted')
      elif not successes:
            self.stats_objects_remaining += 1
            self.logger.increment('objects_remaining')
      else:
            self.stats_objects_possibly_remaining += 1
            self.logger.increment('objects_possibly_remaining')



3.1.获取指定account指定container指定object的分区号和所有副本所在节点信息;
3.2.遍历所有副本所在节点,针对每一个节点调用方法direct_delete_object实现发送调用'DELETE'方法的请求,实现从对象服务(器)直接删除指定对象数据;

转到3.2,来看方法direct_delete_object的实现:
def direct_delete_object(node, part, account, container, obj, conn_timeout=5, response_timeout=15, headers=None):
   """
   直接从对象服务(器)删除对象;
   建立一个HTTPConnection类的对象;
   发出的HTTP请求的方法'DELETE'到服务器;
   直接从对象服务(器)删除对象;
   获取来自服务器的响应;
   """
    if headers is None:
      headers = {}

    path = '/%s/%s/%s' % (account, container, obj)
    with Timeout(conn_timeout):
      # 建立一个HTTPConnection类的对象;
      # 发出的HTTP请求的方法'DELETE';
      # 返回HTTPConnection连接对象;
      conn = http_connect(node['ip'], node['port'], node['device'], part,
                            'DELETE', path, headers=gen_headers(headers, True))
    with Timeout(response_timeout):
      # getresponse:获取来自服务器的响应;
      resp = conn.getresponse()
      resp.read()
    if not is_success(resp.status):
      raise ClientException(
            'Object server %s:%s direct DELETE %s gave status %s' %
            (node['ip'], node['port'],
             repr('/%s/%s%s' % (node['device'], part, path)),
             resp.status),
            http_host=node['ip'], http_port=node['port'],
            http_device=node['device'], http_status=resp.status,
            http_reason=resp.reason)



转到4,来看方法direct_delete_container的实现:
def direct_delete_container(node, part, account, container, conn_timeout=5, response_timeout=15, headers=None):
   """
   发送调用'DELETE'方法的请求,实现从容器服务(器)直接删除container相关数据;
   """
    if headers is None:
      headers = {}

    path = '/%s/%s' % (account, container)
    with Timeout(conn_timeout):
      conn = http_connect(node['ip'], node['port'], node['device'], part,
                            'DELETE', path, headers=gen_headers(headers, True))
    with Timeout(response_timeout):
      resp = conn.getresponse()
      resp.read()
    if not is_success(resp.status):
      raise ClientException(
            'Container server %s:%s direct DELETE %s gave status %s' %
            (node['ip'], node['port'],
             repr('/%s/%s%s' % (node['device'], part, path)), resp.status),
            http_host=node['ip'], http_port=node['port'],
            http_device=node['device'], http_status=resp.status,
            http_reason=resp.reason)









页: [1]
查看完整版本: Swift源码分析----swift-account-reaper(2)