本帖最后由 xioaxu790 于 2014-11-20 21:10 编辑
问题导读
1、如何实现递归验证container下每个object?
2、如何对账户的所有副本相关节点,进行遍历?
3、怎样 递归调用方法audit_container实现审计验证账户?
接续上篇:Swift源码分析----swift-account-audit(1)
转到3.2,来看方法audit_container的实现:
- def audit_container(self, account, name, recurse=False):
- """
- 指定container的审计验证,并实现递归验证container下每个object;
- """
- if (account, name) in self.in_progress:
- self.in_progress[(account, name)].wait()
- if (account, name) in self.list_cache:
- return self.list_cache[(account, name)]
- self.in_progress[(account, name)] = Event()
- print 'Auditing container "%s"' % name
-
- # # 指定指定account下的容器具体路径;
- path = '/%s/%s' % (account, name)
-
- # 获取指定account下的容器列表;
- account_listing = self.audit_account(account)
-
- consistent = True
- if name not in account_listing:
- consistent = False
- print " Container %s not in account listing!" % path
-
- # 获取指定name容器的所有副本的相关节点和分区号;
- # 获取account/container/object所对应的分区号和节点(可能是多个,因为分区副本有多个,可能位于不同的节点上);
- # 返回元组(分区,节点信息列表);
- # 在节点信息列表中至少包含id、weight、zone、ip、port、device、meta;
- part, nodes = self.container_ring.get_nodes(account, name.encode('utf-8'))
- rec_d = {}
- responses = {}
- for node in nodes:
- marker = ''
- results = True
- while results:
- try:
- conn = http_connect(node['ip'], node['port'],
- node['device'], part, 'GET',
- path.encode('utf-8'), {},
- 'format=json&marker=%s' %
- quote(marker.encode('utf-8')))
- # 获取来自服务器的响应;
- resp = conn.getresponse()
- if resp.status // 100 != 2:
- self.container_not_found += 1
- consistent = False
- print(' Bad status GETting container "%s" on %s/%s' %
- (path, node['ip'], node['device']))
- break
-
- if node['id'] not in responses:
- responses[node['id']] = dict(resp.getheaders())
- results = simplejson.loads(resp.read())
- except Exception:
- self.container_exceptions += 1
- consistent = False
- print ' Exception GETting container "%s" on %s/%s' % \
- (path, node['ip'], node['device'])
- break
- if results:
- marker = results[-1]['name']
- for obj in results:
- obj_name = obj['name']
- if obj_name not in rec_d:
- rec_d[obj_name] = obj
- if (obj['last_modified'] !=
- rec_d[obj_name]['last_modified']):
- self.container_obj_mismatch += 1
- consistent = False
- print(" Different versions of %s/%s "
- "in container dbs." % (name, obj['name']))
- if (obj['last_modified'] >
- rec_d[obj_name]['last_modified']):
- rec_d[obj_name] = obj
- obj_counts = [int(header['x-container-object-count'])
- for header in responses.values()]
- if not obj_counts:
- consistent = False
- print " Failed to fetch container %s at all!" % path
- else:
- if len(set(obj_counts)) != 1:
- self.container_count_mismatch += 1
- consistent = False
- print " Container databases don't agree on number of objects."
- print " Max: %s, Min: %s" % (max(obj_counts), min(obj_counts))
- self.containers_checked += 1
- self.list_cache[(account, name)] = rec_d
- self.in_progress[(account, name)].send(True)
- del self.in_progress[(account, name)]
-
- # 递归验证container下每个object;
- if recurse:
- for obj in rec_d.keys():
- self.pool.spawn_n(self.audit_object, account, name, obj)
- if not consistent and self.error_file:
- print >>open(self.error_file, 'a'), path
- return rec_d3.2.1 获取指定account下的容器具体路径;
复制代码
3.2.2 调用方法audit_account实现获取指定account下的容器列表,验证当前指定容器是否包含其中;
3.2.3 获取指定name容器的所有副本的相关节点和分区号;
3.2.4 针对容器的所有副本相关节点,进行遍历,对于每个节点执行以下操作:
通过HTTP应用GET方法远程获取节点的验证响应信息,通过响应信息的状态值,判断远程节点副本容器是否存在;
3.2.5 递归调用方法audit_object实现审计验证容器下每个object;
转到3.3,来看方法audit_account的实现:
- def audit_account(self, account, recurse=False):
- """
- 指定account的审计验证,并实现递归验证account下每个container,并且进一步实现递归验证container下每个object;
- """
- if account in self.in_progress:
- self.in_progress[account].wait()
- if account in self.list_cache:
- return self.list_cache[account]
- self.in_progress[account] = Event()
- print 'Auditing account "%s"' % account
- consistent = True
- path = '/%s' % account
-
- # 获取指定name账户的所有副本的相关节点和分区号;
- # 获取account所对应的分区号和节点(可能是多个,因为分区副本有多个,可能位于不同的节点上);
- # 返回元组(分区,节点信息列表);
- # 在节点信息列表中至少包含id、weight、zone、ip、port、device、meta;
- part, nodes = self.account_ring.get_nodes(account)
-
- responses = {}
- for node in nodes:
- marker = ''
- results = True
- while results:
- node_id = node['id']
- try:
- # 建立一个HTTPConnection类的对象;
- # 并获取来自服务器的响应信息;
- conn = http_connect(node['ip'], node['port'],
- node['device'], part, 'GET', path, {},
- 'format=json&marker=%s' %
- quote(marker.encode('utf-8')))
- resp = conn.getresponse()
- if resp.status // 100 != 2:
- self.account_not_found += 1
- consistent = False
- print(" Bad status GETting account '%s' "
- " from %ss:%ss" %
- (account, node['ip'], node['device']))
- break
- results = simplejson.loads(resp.read())
- except Exception:
- self.account_exceptions += 1
- consistent = False
- print(" Exception GETting account '%s' on %ss:%ss" %
- (account, node['ip'], node['device']))
- break
-
- if node_id not in responses:
- responses[node_id] = [dict(resp.getheaders()), []]
- responses[node_id][1].extend(results)
- if results:
- marker = results[-1]['name']
-
- headers = [resp[0] for resp in responses.values()]
- cont_counts = [int(header['x-account-container-count'])
- for header in headers]
- if len(set(cont_counts)) != 1:
- self.account_container_mismatch += 1
- consistent = False
- print(" Account databases for '%s' don't agree on"
- " number of containers." % account)
- if cont_counts:
- print " Max: %s, Min: %s" % (max(cont_counts),
- min(cont_counts))
- obj_counts = [int(header['x-account-object-count'])
- for header in headers]
- if len(set(obj_counts)) != 1:
- self.account_object_mismatch += 1
- consistent = False
- print(" Account databases for '%s' don't agree on"
- " number of objects." % account)
- if obj_counts:
- print " Max: %s, Min: %s" % (max(obj_counts),
- min(obj_counts))
- containers = set()
- for resp in responses.values():
- containers.update(container['name'] for container in resp[1])
- self.list_cache[account] = containers
- self.in_progress[account].send(True)
- del self.in_progress[account]
- self.accounts_checked += 1
- if recurse:
- for container in containers:
- self.pool.spawn_n(self.audit_container, account,
- container, True)
- if not consistent and self.error_file:
- print >>open(self.error_file, 'a'), path
- return containers3.3.1 获取指定账户的具体路径;
复制代码
3.3.2 获取指定name账户的所有副本的相关节点和分区号;
3.3.4 针对账户的所有副本相关节点,进行遍历,对于每个节点执行以下操作:
通过HTTP应用GET方法远程获取节点的验证响应信息,通过响应信息的状态值,判断远程副本账户是否存在;
3.3.5 递归调用方法audit_container实现审计验证账户下每个container;
至此,这个脚本的账户/容器/脚本的审计验证流程分析完成,当然在具体审计验证过程中还有很多其他细节,这里不再赘述。
|