tntzbzc 发表于 2014-11-20 15:34:55

Swift源码分析----swift-object-replicator(1)

问题导读
1.哪个函数实现运行数据同步进程的操作?
2.def collect_jobs(self)实现了什么功能?

static/image/hrline/4.gif


概述部分:
实现运行数据同步进程的操作;
获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
针对rebalance操作后数据同步情况和某些对象数据损坏情况,
分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
源码解析部分:
下面是这部分代码的主要执行流程,代码中较重要的部分已经进行了相关的注释;
from swift.obj.replicator import ObjectReplicator
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
from optparse import OptionParser
if __name__ == '__main__':
    parser = OptionParser("%prog CONFIG ")
    parser.add_option('-d', '--devices',
                      help='Replicate only given devices. '
                           'Comma-separated list')
    parser.add_option('-p', '--partitions',
                      help='Replicate only given partitions. '
                           'Comma-separated list')
    conf_file, options = parse_options(parser=parser, once=True)
    run_daemon(ObjectReplicator, conf_file, **options)

def run_once(self, *args, **kwargs):
    """
    实现运行数据同步进程的操作;
    获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
    针对rebalance操作后数据同步情况和某些对象数据损坏情况,
    分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
    主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
    """
    start = time.time()
    self.logger.info(_("Running object replicator in script mode."))
    override_devices = list_from_csv(kwargs.get('devices'))
    override_partitions = list_from_csv(kwargs.get('partitions'))   
      
    # 获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
    # 针对rebalance操作后数据同步情况和某些对象数据损坏情况,
    # 分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
    # 主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
    self.replicate(override_devices=override_devices, override_partitions=override_partitions)
    total = (time.time() - start) / 60
    self.logger.info(_("Object replication complete (once). (%.02f minutes)"), total)
    if not (override_partitions or override_devices):
      dump_recon_cache({'object_replication_time': total,
                        'object_replication_last': time.time()},
                        self.rcache, self.logger)


def replicate(self, override_devices=None, override_partitions=None):
    """
    实现运行复制进程的操作;
         
    注释引申----object相关数据存储结构:
    # 引申1----objects文件夹下的不同分区:
    # 遍历obj_path = /srv/node/local_dev['device']/objects下的分区;
    # 示例:
    # root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects# ls   
    # 1306913397147995820894238
         
    # 引申2:
    # 通过理解一致性hash算法可知,
    # 加入虚拟节点后每一个设备会多个虚拟节点和其对应,
    # 如果一个设备对应的分区数为n则, objects下子文件夹数目会<=n,
    # 因为存入的所有文件并不一定都能映射到当前设备所对应的分区;
         
    # 引申3----object存储结构:
    # os.listdir(obj_path)列出objects目录下的所有partition,
    # 创建object是在objects文件夹下创建objects所映射的分区号的文件夹partition,
    # 再在partition文件夹下创建以object的hash值后三位为名称的文件夹(表示不同的object),
    # 然后再在后缀文件夹下创建以object的hash值为文件夹名的文件夹(表示同一个object的不同的文件),
    # object会存储为以object上传时间戳为名.data为文件后缀的文件(表示object的数据文件);
    注:副本是针对分区来说的;
         
    获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
    针对rebalance操作后数据同步情况和某些对象数据损坏情况,
    分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
    主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
    """
    self.start = time.time()
    self.suffix_count = 0
    self.suffix_sync = 0
    self.suffix_hash = 0
    self.replication_count = 0
    self.last_replication_count = -1
    self.partition_times = []

    if override_devices is None:
      override_devices = []
    if override_partitions is None:
      override_partitions = []

    # 复制操作执行过程中运行在后台的心跳进程;
    stats = eventlet.spawn(self.heartbeat)
    # 检测和处理死锁程序;
    lockup_detector = eventlet.spawn(self.detect_lockups)
    eventlet.sleep()# Give spawns a cycle

    try:
      # 建立一个多线程并发池;
      self.run_pool = GreenPool(size=self.concurrency)
            
      # 获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
      jobs = self.collect_jobs()
      for job in jobs:
            if override_devices and job['device'] not in override_devices:
                continue
            if override_partitions and job['partition'] not in override_partitions:
                continue
                  
            # dev_path = /srv/node/job['device']
            dev_path = join(self.devices_dir, job['device'])
            if self.mount_check and not ismount(dev_path):
                self.logger.warn(_('%s is not mounted'), job['device'])
                continue
                  
            if not self.check_ring():
                self.logger.info(_("Ring change detected. Aborting "
                                 "current replication pass."))
                return
                  
            # 针对rebalance操作后数据同步情况和某些对象数据损坏情况,
            # 分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
            # 主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
            if job['delete']:
                # 同步本分区下数据到远程副本分区,并删除本分区下对象数据;
                # 1 获取指定分区目录下各个对象的suff----suffixes;
                # 2 遍历指定分区所有副本(除了本分区)节点,在每个副本节点上:
                #    2.1 调用方法sync,实现通过rsync命令行实现同步本地分区下suffixes确定的若干对象数据到远程节点相应的分区下;
                #    注意:这里并没由冗余复制数据的操作,因为命令rsync可以自动跳过完全相同的文件只更新不同的文件,大大的减低了网络传输负载;
                #    2.2 通过REPLICATE方法获取远程节点相应的分区下对象相应的哈希值;
                # 3 当本地分区到每个副本节点分区下的数据同步全部完成之后,则删除本分区下的数据;
                # 注:
                # 这里没有进行本地分区数据和远程副本数据的比较验证工作,说明这个方法需要直接同步分区下所有数据到远程副本节点;
                # 应该适用于如下情形:
                # 假设本分区所在设备节点号为1,所有副本设备节点号为1/2/3/4,当执行rebalance操作后,所有设备节点号为2/3/4/5,在rebalance操作过程中,
                # 1号设备上本分区则被标志为delete;此时,需要先同步1号设备本分区数据到2/3/4/5号设备上,然后删除1号设备本分区下的数据;
                # 在这里虽然也执行了复制1号设备数据到2/3/4号设备,但是这里并没由进行冗余复制操作,因为命令rsync可以自动跳过完全相同的文件只更新不同的文件,大大的减低了网络传输负载;
                self.run_pool.spawn(self.update_deleted, job)
            else:
                # 对于远程副本节点,循环执行,针对每一个节点实现以下操作:
                # 1 通过http连接远程节点,通过REPLICATE方法实现获取job['partition']下所有对象哈希值;
                # 2 找出本地分区下哈希值中后缀和远程分区下哈希值中后缀不同的,说明分区下的某些对象文件数据发生了变化;
                # 3 针对发生变化的数据,调用sync方法,通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
                self.run_pool.spawn(self.update, job)
      with Timeout(self.lockup_timeout):
            self.run_pool.waitall()
    except (Exception, Timeout):
      self.logger.exception(_("Exception in top-level replication loop"))
      self.kill_coros()
    finally:
      stats.kill()
      lockup_detector.kill()
      self.stats_line()


1.调用方法collect_jobs获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
2.遍历所有任务列表,针对rebalance操作后数据同步情况,应用方法update_deleted,实现同步本地分区数据到远程副本相应分区的对象数据操作;
3.针对其他某些对象数据损坏情况,应用方法update,实现同步本地分区数据到远程副本相应分区的对象数据操作;
转到1,来看方法collect_jobs的实现:


def collect_jobs(self):
    """
    获取节点分区下的要实现数据同步操作的任务列表;
    获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
         
    注释引申----object相关数据存储结构:
    # 引申1----objects文件夹下的不同分区:
    # 遍历obj_path = /srv/node/local_dev['device']/objects下的分区;
    # 示例:
    # root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects# ls   
    # 1306913397147995820894238
         
    # 引申2:
    # 通过理解一致性hash算法可知,
    # 加入虚拟节点后每一个设备会多个虚拟节点和其对应,
    # 如果一个设备对应的分区数为n则,
    # objects下子文件夹数目会<=n,
    # 因为存入的所有文件并不一定都能映射到当前设备所对应的分区;
         
    # 引申3----object存储结构:
    # os.listdir(obj_path)列出objects目录下的所有partition,
    # 创建object是在objects文件夹下创建objects所映射的分区号的文件夹partition,
    #   例如:
    #      ls /srv/node/device1/objects/
    #      13052716568317635212065214811252830
    # 再在partition文件夹下创建以object的hash值后三位为名称的文件夹(表示不同的object),
    #      例如:
    #      ls /srv/node/device1/objects/130527/
    #      201hashes.pkl
    # 然后再在后缀文件夹下创建以object的hash值为文件夹名的文件夹,
    #      例如:
    #      ls /srv/node/device1/objects/130527/201/
    #      7f77f99f076c5ddbb7c7922875ab6201
    # object会存储为以object上传时间戳为名.data为文件后缀的文件(表示object的数据文件);
    #      例如:
    #      ls /srv/node/device1/objects/130527/201/7f77f99f076c5ddbb7c7922875ab6201/
    #      1405479927.09551.data
    """
    jobs = []
    ips = whataremyips()
         
    # 获取环上所有的本地设备;
    for local_dev in [dev for dev in self.object_ring.devs
                      if dev and dev['replication_ip'] in ips and
                      dev['replication_port'] == self.port]:
      # /srv/node/local_dev['device']
      dev_path = join(self.devices_dir, local_dev['device'])
      # /srv/node/local_dev['device']/objects
      obj_path = join(dev_path, 'objects')
      # /srv/node/local_dev['device']/tmp
      tmp_path = join(dev_path, 'tmp')
            
      if self.mount_check and not ismount(dev_path):
            self.logger.warn(_('%s is not mounted'), local_dev['device'])
            continue
            
      unlink_older_than(tmp_path, time.time() - self.reclaim_age)
            
      # 如果目录obj_path不存在,则建立obj_path目录;
      if not os.path.exists(obj_path):
            try:
                mkdirs(obj_path)
            except Exception:
                self.logger.exception('ERROR creating %s' % obj_path)
            continue
            
      # 遍历obj_path = /srv/node/local_dev['device']/objects下的分区;
      # 示例:
      # root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects# ls   
      # 1306913397147995820894238
      # 注释引申:
      # 通过理解一致性hash算法可知,
      # 加入虚拟节点后每一个设备会多个虚拟节点和其对应,
      # 如果一个设备对应的分区数为n则,
      # obj_path下子文件夹数目会<=n,
      # 因为存入的所有文件并不一定都能映射到当前设备所对应的分区;
      # 注释引申----object存储结构:
      # os.listdir(obj_path)列出objects目录下的所有partition,
      # 创建object是在objects文件夹下创建objects所映射的分区号的文件夹partition,
      # 再在partition文件夹下创建以object的hash值后三位为名称的文件夹(表示不同的object),
      # 然后再在后缀文件夹下创建以object的hash值为文件夹名的文件夹(表示同一个object的不同的文件),
      # object会存储为以object上传时间戳为名.data为文件后缀的文件(表示object的数据文件);
      for partition in os.listdir(obj_path):
            try:
                # /srv/node/local_dev['device']/objects/partition
                job_path = join(obj_path, partition)
                     
                # 如果/srv/node/local_dev['device']/objects/partition是文件而不是分区,则删除;
                if isfile(job_path):
                  # Clean up any (probably zero-byte) files where a
                  # partition should be.
                  self.logger.warning('Removing partition directory '
                                        'which was a file: %s', job_path)
                  os.remove(job_path)
                  continue
                     
                # 获取一个分区所有副本(replica)相关的节点信息;
                part_nodes = self.object_ring.get_part_nodes(int(partition))
                     
                # nodes为不是本机器nodes的其他replica-1个nodes;
                nodes = [node for node in part_nodes
                         if node['id'] != local_dev['id']]
                     
                jobs.append(
                  dict(
                         path=job_path,# /srv/node/local_dev['device']/objects/partition
                         device=local_dev['device'], # 本地设备;
                         nodes=nodes,
                         delete=len(nodes) > len(part_nodes) - 1,
                         partition=partition))
            except (ValueError, OSError):
                continue
         
    # 打乱jobs的顺序;
    random.shuffle(jobs)
    if self.handoffs_first:
      # Move the handoff parts to the front of the list
      jobs.sort(key=lambda job: not job['delete'])
    self.job_count = len(jobs)
    return jobs

1.1.获取环上所有的本地设备;
1.2.遍历每个设备下objects下的每个分区(/srv/node/local_dev['device']/objects/partition);
1.3.针对每个分区,获取一个分区所有副本(replica)除了本机的节点;
1.4.每个分区的相关信息作为一个任务,返回包含所有任务的字典;
下一篇博客将继续swift-object-replicator的分析工作。



博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn

溜溜小哥 发表于 2015-5-22 17:16:10

你好,你这里引用了我几十篇博客,都不声明一下,你不觉得很多分么?至少应该尊重一下我的劳动成果吧?
http://blog.csdn.net/gaoxingnengjisuan

溜溜小哥 发表于 2015-5-22 17:21:43

你好,你这里引用了我几十篇博客,都不声明一下,你不觉得很多分么?还把我的信息删除了,至少应该尊重一下我的劳动成果吧?
http://blog.csdn.net/gaoxingnengjisuan
页: [1]
查看完整版本: Swift源码分析----swift-object-replicator(1)