问题导读
1.哪个函数实现运行数据同步进程的操作?
2.def collect_jobs(self)实现了什么功能?
概述部分:
实现运行数据同步进程的操作;
获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
针对rebalance操作后数据同步情况和某些对象数据损坏情况,
分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
源码解析部分:
下面是这部分代码的主要执行流程,代码中较重要的部分已经进行了相关的注释;
- from swift.obj.replicator import ObjectReplicator
- from swift.common.utils import parse_options
- from swift.common.daemon import run_daemon
- from optparse import OptionParser
- if __name__ == '__main__':
- parser = OptionParser("%prog CONFIG [options]")
- parser.add_option('-d', '--devices',
- help='Replicate only given devices. '
- 'Comma-separated list')
- parser.add_option('-p', '--partitions',
- help='Replicate only given partitions. '
- 'Comma-separated list')
- conf_file, options = parse_options(parser=parser, once=True)
- run_daemon(ObjectReplicator, conf_file, **options)
复制代码
- def run_once(self, *args, **kwargs):
- """
- 实现运行数据同步进程的操作;
- 获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
- 针对rebalance操作后数据同步情况和某些对象数据损坏情况,
- 分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
- 主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
- """
- start = time.time()
- self.logger.info(_("Running object replicator in script mode."))
- override_devices = list_from_csv(kwargs.get('devices'))
- override_partitions = list_from_csv(kwargs.get('partitions'))
-
- # 获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
- # 针对rebalance操作后数据同步情况和某些对象数据损坏情况,
- # 分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
- # 主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
- self.replicate(override_devices=override_devices, override_partitions=override_partitions)
- total = (time.time() - start) / 60
- self.logger.info(_("Object replication complete (once). (%.02f minutes)"), total)
- if not (override_partitions or override_devices):
- dump_recon_cache({'object_replication_time': total,
- 'object_replication_last': time.time()},
- self.rcache, self.logger)
复制代码
复制代码
1.调用方法collect_jobs获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
2.遍历所有任务列表,针对rebalance操作后数据同步情况,应用方法update_deleted,实现同步本地分区数据到远程副本相应分区的对象数据操作;
3.针对其他某些对象数据损坏情况,应用方法update,实现同步本地分区数据到远程副本相应分区的对象数据操作;
转到1,来看方法collect_jobs的实现:
- def collect_jobs(self):
- """
- 获取节点分区下的要实现数据同步操作的任务列表;
- 获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
-
- 注释引申----object相关数据存储结构:
- # 引申1----objects文件夹下的不同分区:
- # 遍历obj_path = /srv/node/local_dev['device']/objects下的分区;
- # 示例:
- # root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects# ls
- # 13069 133971 4799 58208 94238
-
- # 引申2:
- # 通过理解一致性hash算法可知,
- # 加入虚拟节点后每一个设备会多个虚拟节点和其对应,
- # 如果一个设备对应的分区数为n则,
- # objects下子文件夹数目会<=n,
- # 因为存入的所有文件并不一定都能映射到当前设备所对应的分区;
-
- # 引申3----object存储结构:
- # os.listdir(obj_path)列出objects目录下的所有partition,
- # 创建object是在objects文件夹下创建objects所映射的分区号的文件夹partition,
- # 例如:
- # ls /srv/node/device1/objects/
- # 130527 165683 17635 212065 214811 252830
- # 再在partition文件夹下创建以object的hash值后三位为名称的文件夹(表示不同的object),
- # 例如:
- # ls /srv/node/device1/objects/130527/
- # 201 hashes.pkl
- # 然后再在后缀文件夹下创建以object的hash值为文件夹名的文件夹,
- # 例如:
- # ls /srv/node/device1/objects/130527/201/
- # 7f77f99f076c5ddbb7c7922875ab6201
- # object会存储为以object上传时间戳为名.data为文件后缀的文件(表示object的数据文件);
- # 例如:
- # ls /srv/node/device1/objects/130527/201/7f77f99f076c5ddbb7c7922875ab6201/
- # 1405479927.09551.data
- """
- jobs = []
- ips = whataremyips()
-
- # 获取环上所有的本地设备;
- for local_dev in [dev for dev in self.object_ring.devs
- if dev and dev['replication_ip'] in ips and
- dev['replication_port'] == self.port]:
- # /srv/node/local_dev['device']
- dev_path = join(self.devices_dir, local_dev['device'])
- # /srv/node/local_dev['device']/objects
- obj_path = join(dev_path, 'objects')
- # /srv/node/local_dev['device']/tmp
- tmp_path = join(dev_path, 'tmp')
-
- if self.mount_check and not ismount(dev_path):
- self.logger.warn(_('%s is not mounted'), local_dev['device'])
- continue
-
- unlink_older_than(tmp_path, time.time() - self.reclaim_age)
-
- # 如果目录obj_path不存在,则建立obj_path目录;
- if not os.path.exists(obj_path):
- try:
- mkdirs(obj_path)
- except Exception:
- self.logger.exception('ERROR creating %s' % obj_path)
- continue
-
- # 遍历obj_path = /srv/node/local_dev['device']/objects下的分区;
- # 示例:
- # root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects# ls
- # 13069 133971 4799 58208 94238
- # 注释引申:
- # 通过理解一致性hash算法可知,
- # 加入虚拟节点后每一个设备会多个虚拟节点和其对应,
- # 如果一个设备对应的分区数为n则,
- # obj_path下子文件夹数目会<=n,
- # 因为存入的所有文件并不一定都能映射到当前设备所对应的分区;
- # 注释引申----object存储结构:
- # os.listdir(obj_path)列出objects目录下的所有partition,
- # 创建object是在objects文件夹下创建objects所映射的分区号的文件夹partition,
- # 再在partition文件夹下创建以object的hash值后三位为名称的文件夹(表示不同的object),
- # 然后再在后缀文件夹下创建以object的hash值为文件夹名的文件夹(表示同一个object的不同的文件),
- # object会存储为以object上传时间戳为名.data为文件后缀的文件(表示object的数据文件);
- for partition in os.listdir(obj_path):
- try:
- # /srv/node/local_dev['device']/objects/partition
- job_path = join(obj_path, partition)
-
- # 如果/srv/node/local_dev['device']/objects/partition是文件而不是分区,则删除;
- if isfile(job_path):
- # Clean up any (probably zero-byte) files where a
- # partition should be.
- self.logger.warning('Removing partition directory '
- 'which was a file: %s', job_path)
- os.remove(job_path)
- continue
-
- # 获取一个分区所有副本(replica)相关的节点信息;
- part_nodes = self.object_ring.get_part_nodes(int(partition))
-
- # nodes为不是本机器nodes的其他replica-1个nodes;
- nodes = [node for node in part_nodes
- if node['id'] != local_dev['id']]
-
- jobs.append(
- dict(
- path=job_path,# /srv/node/local_dev['device']/objects/partition
- device=local_dev['device'], # 本地设备;
- nodes=nodes,
- delete=len(nodes) > len(part_nodes) - 1,
- partition=partition))
- except (ValueError, OSError):
- continue
-
- # 打乱jobs的顺序;
- random.shuffle(jobs)
- if self.handoffs_first:
- # Move the handoff parts to the front of the list
- jobs.sort(key=lambda job: not job['delete'])
- self.job_count = len(jobs)
- return jobs
复制代码
1.1.获取环上所有的本地设备;
1.2.遍历每个设备下objects下的每个分区(/srv/node/local_dev['device']/objects/partition);
1.3.针对每个分区,获取一个分区所有副本(replica)除了本机的节点;
1.4.每个分区的相关信息作为一个任务,返回包含所有任务的字典;
下一篇博客将继续swift-object-replicator的分析工作。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn |