分享

Swift源码分析----swift-object-replicator(1)

tntzbzc 发表于 2014-11-20 15:34:55 回帖奖励 阅读模式 关闭右栏 2 14475
问题导读
1.哪个函数实现运行数据同步进程的操作?
2.def collect_jobs(self)实现了什么功能?




概述部分:
实现运行数据同步进程的操作;
获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
针对rebalance操作后数据同步情况和某些对象数据损坏情况,
分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
源码解析部分:
下面是这部分代码的主要执行流程,代码中较重要的部分已经进行了相关的注释;
  1. from swift.obj.replicator import ObjectReplicator
  2. from swift.common.utils import parse_options
  3. from swift.common.daemon import run_daemon
  4. from optparse import OptionParser
  5. if __name__ == '__main__':
  6.     parser = OptionParser("%prog CONFIG [options]")
  7.     parser.add_option('-d', '--devices',
  8.                       help='Replicate only given devices. '
  9.                            'Comma-separated list')
  10.     parser.add_option('-p', '--partitions',
  11.                       help='Replicate only given partitions. '
  12.                            'Comma-separated list')
  13.     conf_file, options = parse_options(parser=parser, once=True)
  14.     run_daemon(ObjectReplicator, conf_file, **options)
复制代码

  1. def run_once(self, *args, **kwargs):
  2.     """
  3.     实现运行数据同步进程的操作;
  4.     获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
  5.     针对rebalance操作后数据同步情况和某些对象数据损坏情况,
  6.     分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
  7.     主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
  8.     """
  9.     start = time.time()
  10.     self.logger.info(_("Running object replicator in script mode."))
  11.     override_devices = list_from_csv(kwargs.get('devices'))
  12.     override_partitions = list_from_csv(kwargs.get('partitions'))   
  13.         
  14.     # 获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
  15.     # 针对rebalance操作后数据同步情况和某些对象数据损坏情况,
  16.     # 分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
  17.     # 主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
  18.     self.replicate(override_devices=override_devices, override_partitions=override_partitions)
  19.     total = (time.time() - start) / 60
  20.     self.logger.info(_("Object replication complete (once). (%.02f minutes)"), total)
  21.     if not (override_partitions or override_devices):
  22.         dump_recon_cache({'object_replication_time': total,
  23.                           'object_replication_last': time.time()},
  24.                           self.rcache, self.logger)
复制代码


  1. def replicate(self, override_devices=None, override_partitions=None):  
  2.     """
  3.     实现运行复制进程的操作;
  4.          
  5.     注释引申----object相关数据存储结构:
  6.     # 引申1----objects文件夹下的不同分区:
  7.     # 遍历obj_path = /srv/node/local_dev['device']/objects下的分区;
  8.     # 示例:
  9.     # root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects# ls   
  10.     # 13069  133971  4799  58208  94238
  11.          
  12.     # 引申2:
  13.     # 通过理解一致性hash算法可知,
  14.     # 加入虚拟节点后每一个设备会多个虚拟节点和其对应,
  15.     # 如果一个设备对应的分区数为n则, objects下子文件夹数目会<=n,
  16.     # 因为存入的所有文件并不一定都能映射到当前设备所对应的分区;
  17.          
  18.     # 引申3----object存储结构:
  19.     # os.listdir(obj_path)列出objects目录下的所有partition,
  20.     # 创建object是在objects文件夹下创建objects所映射的分区号的文件夹partition,
  21.     # 再在partition文件夹下创建以object的hash值后三位为名称的文件夹(表示不同的object),
  22.     # 然后再在后缀文件夹下创建以object的hash值为文件夹名的文件夹(表示同一个object的不同的文件),
  23.     # object会存储为以object上传时间戳为名.data为文件后缀的文件(表示object的数据文件);
  24.     注:副本是针对分区来说的;
  25.          
  26.     获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
  27.     针对rebalance操作后数据同步情况和某些对象数据损坏情况,
  28.     分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
  29.     主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
  30.     """  
  31.     self.start = time.time()  
  32.     self.suffix_count = 0  
  33.     self.suffix_sync = 0  
  34.     self.suffix_hash = 0  
  35.     self.replication_count = 0  
  36.     self.last_replication_count = -1  
  37.     self.partition_times = []  
  38.   
  39.     if override_devices is None:  
  40.         override_devices = []  
  41.     if override_partitions is None:  
  42.         override_partitions = []  
  43.   
  44.     # 复制操作执行过程中运行在后台的心跳进程;  
  45.     stats = eventlet.spawn(self.heartbeat)  
  46.     # 检测和处理死锁程序;  
  47.     lockup_detector = eventlet.spawn(self.detect_lockups)  
  48.     eventlet.sleep()  # Give spawns a cycle  
  49.   
  50.     try:  
  51.         # 建立一个多线程并发池;  
  52.         self.run_pool = GreenPool(size=self.concurrency)  
  53.               
  54.         # 获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;  
  55.         jobs = self.collect_jobs()  
  56.         for job in jobs:  
  57.             if override_devices and job['device'] not in override_devices:  
  58.                 continue  
  59.             if override_partitions and job['partition'] not in override_partitions:  
  60.                 continue  
  61.                   
  62.             # dev_path = /srv/node/job['device']  
  63.             dev_path = join(self.devices_dir, job['device'])  
  64.             if self.mount_check and not ismount(dev_path):  
  65.                 self.logger.warn(_('%s is not mounted'), job['device'])  
  66.                 continue  
  67.                   
  68.             if not self.check_ring():  
  69.                 self.logger.info(_("Ring change detected. Aborting "  
  70.                                    "current replication pass."))  
  71.                 return  
  72.                   
  73.             # 针对rebalance操作后数据同步情况和某些对象数据损坏情况,  
  74.             # 分别实现同步本地分区数据到远程副本相应分区的对象数据操作;  
  75.             # 主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;  
  76.             if job['delete']:  
  77.                 # 同步本分区下数据到远程副本分区,并删除本分区下对象数据;  
  78.                 # 1 获取指定分区目录下各个对象的suff----suffixes;  
  79.                 # 2 遍历指定分区所有副本(除了本分区)节点,在每个副本节点上:  
  80.                 #    2.1 调用方法sync,实现通过rsync命令行实现同步本地分区下suffixes确定的若干对象数据到远程节点相应的分区下;  
  81.                 #    注意:这里并没由冗余复制数据的操作,因为命令rsync可以自动跳过完全相同的文件只更新不同的文件,大大的减低了网络传输负载;  
  82.                 #    2.2 通过REPLICATE方法获取远程节点相应的分区下对象相应的哈希值;  
  83.                 # 3 当本地分区到每个副本节点分区下的数据同步全部完成之后,则删除本分区下的数据;  
  84.                 # 注:  
  85.                 # 这里没有进行本地分区数据和远程副本数据的比较验证工作,说明这个方法需要直接同步分区下所有数据到远程副本节点;  
  86.                 # 应该适用于如下情形:  
  87.                 # 假设本分区所在设备节点号为1,所有副本设备节点号为1/2/3/4,当执行rebalance操作后,所有设备节点号为2/3/4/5,在rebalance操作过程中,  
  88.                 # 1号设备上本分区则被标志为delete;此时,需要先同步1号设备本分区数据到2/3/4/5号设备上,然后删除1号设备本分区下的数据;  
  89.                 # 在这里虽然也执行了复制1号设备数据到2/3/4号设备,但是这里并没由进行冗余复制操作,因为命令rsync可以自动跳过完全相同的文件只更新不同的文件,大大的减低了网络传输负载;  
  90.                 self.run_pool.spawn(self.update_deleted, job)  
  91.             else:  
  92.                 # 对于远程副本节点,循环执行,针对每一个节点实现以下操作:  
  93.                 # 1 通过http连接远程节点,通过REPLICATE方法实现获取job['partition']下所有对象哈希值;  
  94.                 # 2 找出本地分区下哈希值中后缀和远程分区下哈希值中后缀不同的,说明分区下的某些对象文件数据发生了变化;  
  95.                 # 3 针对发生变化的数据,调用sync方法,通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;  
  96.                 self.run_pool.spawn(self.update, job)  
  97.         with Timeout(self.lockup_timeout):  
  98.             self.run_pool.waitall()  
  99.     except (Exception, Timeout):  
  100.         self.logger.exception(_("Exception in top-level replication loop"))  
  101.         self.kill_coros()  
  102.     finally:  
  103.         stats.kill()  
  104.         lockup_detector.kill()  
  105.         self.stats_line()  
复制代码



1.调用方法collect_jobs获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
2.遍历所有任务列表,针对rebalance操作后数据同步情况,应用方法update_deleted,实现同步本地分区数据到远程副本相应分区的对象数据操作;
3.针对其他某些对象数据损坏情况,应用方法update,实现同步本地分区数据到远程副本相应分区的对象数据操作;
转到1,来看方法collect_jobs的实现:


  1. def collect_jobs(self):  
  2.     """
  3.     获取节点分区下的要实现数据同步操作的任务列表;
  4.     获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
  5.          
  6.     注释引申----object相关数据存储结构:
  7.     # 引申1----objects文件夹下的不同分区:
  8.     # 遍历obj_path = /srv/node/local_dev['device']/objects下的分区;
  9.     # 示例:
  10.     # root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects# ls   
  11.     # 13069  133971  4799  58208  94238
  12.          
  13.     # 引申2:
  14.     # 通过理解一致性hash算法可知,
  15.     # 加入虚拟节点后每一个设备会多个虚拟节点和其对应,
  16.     # 如果一个设备对应的分区数为n则,
  17.     # objects下子文件夹数目会<=n,
  18.     # 因为存入的所有文件并不一定都能映射到当前设备所对应的分区;
  19.          
  20.     # 引申3----object存储结构:
  21.     # os.listdir(obj_path)列出objects目录下的所有partition,
  22.     # 创建object是在objects文件夹下创建objects所映射的分区号的文件夹partition,
  23.     #     例如:
  24.     #      ls /srv/node/device1/objects/
  25.     #      130527  165683  17635  212065  214811  252830
  26.     # 再在partition文件夹下创建以object的hash值后三位为名称的文件夹(表示不同的object),
  27.     #      例如:
  28.     #      ls /srv/node/device1/objects/130527/
  29.     #      201  hashes.pkl
  30.     # 然后再在后缀文件夹下创建以object的hash值为文件夹名的文件夹,
  31.     #      例如:
  32.     #      ls /srv/node/device1/objects/130527/201/
  33.     #      7f77f99f076c5ddbb7c7922875ab6201
  34.     # object会存储为以object上传时间戳为名.data为文件后缀的文件(表示object的数据文件);
  35.     #      例如:
  36.     #      ls /srv/node/device1/objects/130527/201/7f77f99f076c5ddbb7c7922875ab6201/
  37.     #      1405479927.09551.data
  38.     """  
  39.     jobs = []  
  40.     ips = whataremyips()  
  41.          
  42.     # 获取环上所有的本地设备;  
  43.     for local_dev in [dev for dev in self.object_ring.devs  
  44.                       if dev and dev['replication_ip'] in ips and  
  45.                       dev['replication_port'] == self.port]:  
  46.         # /srv/node/local_dev['device']  
  47.         dev_path = join(self.devices_dir, local_dev['device'])  
  48.         # /srv/node/local_dev['device']/objects  
  49.         obj_path = join(dev_path, 'objects')  
  50.         # /srv/node/local_dev['device']/tmp  
  51.         tmp_path = join(dev_path, 'tmp')  
  52.               
  53.         if self.mount_check and not ismount(dev_path):  
  54.             self.logger.warn(_('%s is not mounted'), local_dev['device'])  
  55.             continue  
  56.               
  57.         unlink_older_than(tmp_path, time.time() - self.reclaim_age)  
  58.               
  59.         # 如果目录obj_path不存在,则建立obj_path目录;  
  60.         if not os.path.exists(obj_path):  
  61.             try:  
  62.                 mkdirs(obj_path)  
  63.             except Exception:  
  64.                 self.logger.exception('ERROR creating %s' % obj_path)  
  65.             continue  
  66.               
  67.         # 遍历obj_path = /srv/node/local_dev['device']/objects下的分区;  
  68.         # 示例:  
  69.         # root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects# ls   
  70.         # 13069  133971  4799  58208  94238  
  71.         # 注释引申:  
  72.         # 通过理解一致性hash算法可知,  
  73.         # 加入虚拟节点后每一个设备会多个虚拟节点和其对应,  
  74.         # 如果一个设备对应的分区数为n则,  
  75.         # obj_path下子文件夹数目会<=n,  
  76.         # 因为存入的所有文件并不一定都能映射到当前设备所对应的分区;  
  77.         # 注释引申----object存储结构:  
  78.         # os.listdir(obj_path)列出objects目录下的所有partition,  
  79.         # 创建object是在objects文件夹下创建objects所映射的分区号的文件夹partition,  
  80.         # 再在partition文件夹下创建以object的hash值后三位为名称的文件夹(表示不同的object),  
  81.         # 然后再在后缀文件夹下创建以object的hash值为文件夹名的文件夹(表示同一个object的不同的文件),  
  82.         # object会存储为以object上传时间戳为名.data为文件后缀的文件(表示object的数据文件);  
  83.         for partition in os.listdir(obj_path):  
  84.             try:  
  85.                 # /srv/node/local_dev['device']/objects/partition  
  86.                 job_path = join(obj_path, partition)  
  87.                      
  88.                 # 如果/srv/node/local_dev['device']/objects/partition是文件而不是分区,则删除;  
  89.                 if isfile(job_path):  
  90.                     # Clean up any (probably zero-byte) files where a  
  91.                     # partition should be.  
  92.                     self.logger.warning('Removing partition directory '  
  93.                                         'which was a file: %s', job_path)  
  94.                     os.remove(job_path)  
  95.                     continue  
  96.                      
  97.                 # 获取一个分区所有副本(replica)相关的节点信息;  
  98.                 part_nodes = self.object_ring.get_part_nodes(int(partition))  
  99.                      
  100.                 # nodes为不是本机器nodes的其他replica-1个nodes;  
  101.                 nodes = [node for node in part_nodes  
  102.                          if node['id'] != local_dev['id']]  
  103.                      
  104.                 jobs.append(  
  105.                     dict(  
  106.                          path=job_path,# /srv/node/local_dev['device']/objects/partition  
  107.                          device=local_dev['device'], # 本地设备;  
  108.                          nodes=nodes,  
  109.                          delete=len(nodes) > len(part_nodes) - 1,  
  110.                          partition=partition))  
  111.             except (ValueError, OSError):  
  112.                 continue  
  113.          
  114.     # 打乱jobs的顺序;  
  115.     random.shuffle(jobs)  
  116.     if self.handoffs_first:  
  117.         # Move the handoff parts to the front of the list  
  118.         jobs.sort(key=lambda job: not job['delete'])  
  119.     self.job_count = len(jobs)  
  120.     return jobs  
复制代码


1.1.获取环上所有的本地设备;
1.2.遍历每个设备下objects下的每个分区(/srv/node/local_dev['device']/objects/partition);
1.3.针对每个分区,获取一个分区所有副本(replica)除了本机的节点;
1.4.每个分区的相关信息作为一个任务,返回包含所有任务的字典;
下一篇博客将继续swift-object-replicator的分析工作。



博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn

已有(0)人评论

跳转到指定楼层
溜溜小哥 发表于 2015-5-22 17:16:10
你好,你这里引用了我几十篇博客,都不声明一下,你不觉得很多分么?至少应该尊重一下我的劳动成果吧?
http://blog.csdn.net/gaoxingnengjisuan

溜溜小哥 发表于 2015-5-22 17:21:43
你好,你这里引用了我几十篇博客,都不声明一下,你不觉得很多分么?还把我的信息删除了,至少应该尊重一下我的劳动成果吧?
http://blog.csdn.net/gaoxingnengjisuan

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条