分享

Swift源码分析----swift-container-updater

tntzbzc 发表于 2014-11-20 15:34:43 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 12007
导读
代码诠释每个人都有自己的理解,尽信书不如无书,更多的是参考和交流









概述部分:
容器数据更新守护进程;
遍历所有设备下所有容器下的DB文件,
如果DB文件指定的容器发生数据更新,
通过HTTP协议应用PUT方法实现报告container信息给相应的account服务,
在account中实现相应的更新信息操作;
源码解析部分:
下面是这部分代码的主要执行流程,代码中较重要的部分已经进行了相关的注释;
  1. from swift.container.updater import ContainerUpdater
  2. from swift.common.utils import parse_options
  3. from swift.common.daemon import run_daemon
  4. if __name__ == '__main__':
  5.     conf_file, options = parse_options(once=True)
  6.     run_daemon(ContainerUpdater, conf_file, **options)
复制代码

  1. def run_once(self, *args, **kwargs):  
  2.      """
  3.      遍历所有设备下所有容器下的DB文件,如果DB文件指定的容器发生数据更新,
  4.      通过HTTP协议应用PUT方法实现报告container信息给相应的account服务,
  5.      在account中实现相应的更新信息操作;
  6.      """
  7.     patcher.monkey_patch(all=False, socket=True)
  8.     self.logger.info(_('Begin container update single threaded sweep'))
  9.     begin = time.time()
  10.     self.no_changes = 0
  11.     self.successes = 0
  12.     self.failures = 0
  13.         
  14.     # get_paths:获取所有设备下容器下的所有分区的具体路径;
  15.       # 遍历所有容器相关的所有分区的具体路径;
  16.     for path in self.get_paths():
  17.            # 遍历path下所有文件(树形遍历),查找DB文件,并调用方法process_container对文件进行处理;
  18.            # 如果DB文件指定的容器发生数据更新,通过HTTP协议应用PUT方法实现报告container信息给相应的account服务;
  19.            # 方法process_container实现了对container进行处理,并更新它在对应account中的信息;
  20.         self.container_sweep(path)
  21.     elapsed = time.time() - begin
  22.     self.logger.info(_(
  23.         'Container update single threaded sweep completed: '
  24.         '%(elapsed).02fs, %(success)s successes, %(fail)s failures, '
  25.         '%(no_change)s with no changes'),
  26.         {'elapsed': elapsed, 'success': self.successes,
  27.          'fail': self.failures, 'no_change': self.no_changes})
  28.     dump_recon_cache({'container_updater_sweep': elapsed},
  29.                      self.rcache, self.logger)
复制代码



1.获取所有设备下容器(containers)下的所有分区的具体路径;
2.遍历所有分区的具体路径,调用方法container_sweep实现获取分区下的DB文件,如果DB文件指定的容器发生数据更新,通过HTTP协议应用PUT方法实现报告container信息给相应的account服务,在account中实现相应的更新信息操作;

转到2,来看方法container_sweep的实现:
  1. def container_sweep(self, path):
  2.     """
  3.     遍历path下所有文件(树形遍历),查找DB文件,并调用方法process_container对文件进行处理;
  4.     如果DB文件指定的容器发生数据更新,通过HTTP协议应用PUT方法实现报告container信息给相应的account服务;
  5.     方法process_container实现了对container进行处理,并更新它在对应account中的信息;
  6.     """
  7.     for root, dirs, files in os.walk(path):
  8.         for file in files:
  9.             if file.endswith('.db'):
  10.                 # process_container:对container进行处理,并更新它在对应account中的信息;
  11.                 # 如果数据库文件指定的容器发生数据更新,通过HTTP协议应用PUT方法实现报告container信息给相应的account服务;
  12.                 self.process_container(os.path.join(root, file))
  13.                 time.sleep(self.slowdown)
复制代码



查找给定分区目录下所有的以.db为后缀的文件,并调用方法process_container实现对container进行处理,并更新它在对应account中的信息;如果数据库文件指定的容器发生数据更新,通过HTTP协议应用PUT方法实现报告container信息给相应的account服务;
来看方法process_container的实现:
  1. def process_container(self, dbfile):
  2.     """
  3.     对container进行处理,并更新它在对应account中的信息;
  4.     如果容器发生数据更新,通过HTTP协议应用PUT方法实现报告container信息给相应的account服务;
  5.     """
  6.     start_time = time.time()
  7.         
  8.     # 容器数据库的封装类实现;
  9.     broker = ContainerBroker(dbfile, logger=self.logger)
  10.     # get_info:获取container的全局数据;
  11.     # 包括account/container/created_at/put_timestamp/delete_timestamp/object_count/bytes_used/reported_put_timestamp/reported_delete_timestamp/reported_object_count/reported_bytes_used/hash/id等信息;
  12.     info = broker.get_info()
  13.     # Don't send updates if the container was auto-created since it
  14.     # definitely doesn't have up to date statistics.
  15.     if float(info['put_timestamp'])  time.time():
  16.         return
  17.         
  18.     # 如果满足下述条件,说明容器发生数据更新;
  19.     # 通过HTTP协议应用PUT方法实现报告container信息给account服务;
  20.     if info['put_timestamp'] > info['reported_put_timestamp'] or info['delete_timestamp'] > info['reported_delete_timestamp'] \
  21.        or info['object_count'] != info['reported_object_count'] or info['bytes_used'] != info['reported_bytes_used']:
  22.         container = '/%s/%s' % (info['account'], info['container'])
  23.             
  24.         # 获取容器对应的account的相关分区和节点信息;
  25.         # 返回元组(分区,节点信息列表);
  26.         # 在节点信息列表中至少包含id、weight、zone、ip、port、device、meta;
  27.         part, nodes = self.get_account_ring().get_nodes(info['account'])
  28.             
  29.         # 在绿色线程中执行方法container_report,实现报告container信息给account服务;
  30.         # container_report:通过HTTP协议应用PUT方法实现报告container信息给account服务;
  31.         events = [spawn(self.container_report, node, part, container,
  32.                         info['put_timestamp'], info['delete_timestamp'],
  33.                         info['object_count'], info['bytes_used'])
  34.                   for node in nodes]
  35.             
  36.         successes = 0
  37.         for event in events:
  38.             if is_success(event.wait()):
  39.                 successes += 1
  40.         if successes >= quorum_size(len(events)):
  41.             self.logger.increment('successes')
  42.             self.successes += 1
  43.             self.logger.debug(_('Update report sent for %(container)s %(dbfile)s'),
  44.                                {'container': container, 'dbfile': dbfile})
  45.             # reported:更新数据库中container_stat表的reported_put_timestamp、reported_delete_timestamp、reported_object_count和reported_bytes_used;
  46.             broker.reported(info['put_timestamp'],
  47.                             info['delete_timestamp'], info['object_count'],
  48.                             info['bytes_used'])
  49.         else:
  50.             self.logger.increment('failures')
  51.             self.failures += 1
  52.             self.logger.debug(_('Update report failed for %(container)s %(dbfile)s'), {'container': container, 'dbfile': dbfile})
  53.             self.account_suppressions[info['account']] = until = \
  54.                 time.time() + self.account_suppression_time
  55.             if self.new_account_suppressions:
  56.                 print >>self.new_account_suppressions, info['account'], until
  57.         # Only track timing data for attempted updates:
  58.         self.logger.timing_since('timing', start_time)
  59.     else:
  60.         self.logger.increment('no_changes')
  61.         self.no_changes += 1
复制代码




2.1.根据指定的数据库文件(后缀为.db的文件),实现获取container的全局数据信息;
2.2.根据container的全局数据信息中若干属性的比较,判断该容器是否发生了数据的更新;
2.3 如果容器发生了数据更新,调用方法get_nodes获取容器所属account的分区号和副本节点;
2.4.针对容器所属account的每一个副本节点,调用方法container_report实现通过HTTP协议应用PUT方法实现报告container信息给account服务;
注:这里就是处理一个分区上所有的容器,首先验证容器是否发生数据更新,如果容器发生数据更新,则先获取该容器所属账户的分区,和所有账户的副本节点,再遍历每个副本节点,实现把容器数据更新的信息报告给账户的每一个副本节点;

转到2.4,来看方法container_report的实现:
  1. def container_report(self, node, part, container, put_timestamp, delete_timestamp, count, bytes):
  2.     """
  3.     通过HTTP协议应用PUT方法实现报告container信息给account服务;
  4.     """
  5.     with ConnectionTimeout(self.conn_timeout):
  6.         try:
  7.             headers = {
  8.                 'X-Put-Timestamp': put_timestamp,
  9.                 'X-Delete-Timestamp': delete_timestamp,
  10.                 'X-Object-Count': count,
  11.                 'X-Bytes-Used': bytes,
  12.                 'X-Account-Override-Deleted': 'yes',
  13.                 'user-agent': self.user_agent}
  14.                
  15.             # 通过HTTP协议应用PUT方法实现报告container信息给account服务;
  16.             # 建立一个HTTPConnection类的对象;
  17.             # 发出的HTTP请求的方法'PUT';
  18.             # 返回HTTPConnection连接对象;
  19.             conn = http_connect(node['ip'], node['port'], node['device'], part, 'PUT', container, headers=headers)
  20.         except (Exception, Timeout):
  21.             self.logger.exception(_(
  22.                 'ERROR account update failed with '
  23.                 '%(ip)s:%(port)s/%(device)s (will retry later): '), node)
  24.             return HTTP_INTERNAL_SERVER_ERROR
  25.     with Timeout(self.node_timeout):
  26.         try:
  27.             # 获取来自服务器的响应;
  28.             resp = conn.getresponse()
  29.             resp.read()
  30.             return resp.status
  31.         except (Exception, Timeout):
  32.             if self.logger.getEffectiveLevel()
复制代码





感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条