Swift源码分析----swift-object-replicator(1)
问题导读1.哪个函数实现运行数据同步进程的操作?
2.def collect_jobs(self)实现了什么功能?
static/image/hrline/4.gif
概述部分:
实现运行数据同步进程的操作;
获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
针对rebalance操作后数据同步情况和某些对象数据损坏情况,
分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
源码解析部分:
下面是这部分代码的主要执行流程,代码中较重要的部分已经进行了相关的注释;
from swift.obj.replicator import ObjectReplicator
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
from optparse import OptionParser
if __name__ == '__main__':
parser = OptionParser("%prog CONFIG ")
parser.add_option('-d', '--devices',
help='Replicate only given devices. '
'Comma-separated list')
parser.add_option('-p', '--partitions',
help='Replicate only given partitions. '
'Comma-separated list')
conf_file, options = parse_options(parser=parser, once=True)
run_daemon(ObjectReplicator, conf_file, **options)
def run_once(self, *args, **kwargs):
"""
实现运行数据同步进程的操作;
获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
针对rebalance操作后数据同步情况和某些对象数据损坏情况,
分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
"""
start = time.time()
self.logger.info(_("Running object replicator in script mode."))
override_devices = list_from_csv(kwargs.get('devices'))
override_partitions = list_from_csv(kwargs.get('partitions'))
# 获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
# 针对rebalance操作后数据同步情况和某些对象数据损坏情况,
# 分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
# 主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
self.replicate(override_devices=override_devices, override_partitions=override_partitions)
total = (time.time() - start) / 60
self.logger.info(_("Object replication complete (once). (%.02f minutes)"), total)
if not (override_partitions or override_devices):
dump_recon_cache({'object_replication_time': total,
'object_replication_last': time.time()},
self.rcache, self.logger)
def replicate(self, override_devices=None, override_partitions=None):
"""
实现运行复制进程的操作;
注释引申----object相关数据存储结构:
# 引申1----objects文件夹下的不同分区:
# 遍历obj_path = /srv/node/local_dev['device']/objects下的分区;
# 示例:
# root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects# ls
# 1306913397147995820894238
# 引申2:
# 通过理解一致性hash算法可知,
# 加入虚拟节点后每一个设备会多个虚拟节点和其对应,
# 如果一个设备对应的分区数为n则, objects下子文件夹数目会<=n,
# 因为存入的所有文件并不一定都能映射到当前设备所对应的分区;
# 引申3----object存储结构:
# os.listdir(obj_path)列出objects目录下的所有partition,
# 创建object是在objects文件夹下创建objects所映射的分区号的文件夹partition,
# 再在partition文件夹下创建以object的hash值后三位为名称的文件夹(表示不同的object),
# 然后再在后缀文件夹下创建以object的hash值为文件夹名的文件夹(表示同一个object的不同的文件),
# object会存储为以object上传时间戳为名.data为文件后缀的文件(表示object的数据文件);
注:副本是针对分区来说的;
获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
针对rebalance操作后数据同步情况和某些对象数据损坏情况,
分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
"""
self.start = time.time()
self.suffix_count = 0
self.suffix_sync = 0
self.suffix_hash = 0
self.replication_count = 0
self.last_replication_count = -1
self.partition_times = []
if override_devices is None:
override_devices = []
if override_partitions is None:
override_partitions = []
# 复制操作执行过程中运行在后台的心跳进程;
stats = eventlet.spawn(self.heartbeat)
# 检测和处理死锁程序;
lockup_detector = eventlet.spawn(self.detect_lockups)
eventlet.sleep()# Give spawns a cycle
try:
# 建立一个多线程并发池;
self.run_pool = GreenPool(size=self.concurrency)
# 获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
jobs = self.collect_jobs()
for job in jobs:
if override_devices and job['device'] not in override_devices:
continue
if override_partitions and job['partition'] not in override_partitions:
continue
# dev_path = /srv/node/job['device']
dev_path = join(self.devices_dir, job['device'])
if self.mount_check and not ismount(dev_path):
self.logger.warn(_('%s is not mounted'), job['device'])
continue
if not self.check_ring():
self.logger.info(_("Ring change detected. Aborting "
"current replication pass."))
return
# 针对rebalance操作后数据同步情况和某些对象数据损坏情况,
# 分别实现同步本地分区数据到远程副本相应分区的对象数据操作;
# 主要用于rebalance操作后的数据同步和数据损坏后的数据修复工作;
if job['delete']:
# 同步本分区下数据到远程副本分区,并删除本分区下对象数据;
# 1 获取指定分区目录下各个对象的suff----suffixes;
# 2 遍历指定分区所有副本(除了本分区)节点,在每个副本节点上:
# 2.1 调用方法sync,实现通过rsync命令行实现同步本地分区下suffixes确定的若干对象数据到远程节点相应的分区下;
# 注意:这里并没由冗余复制数据的操作,因为命令rsync可以自动跳过完全相同的文件只更新不同的文件,大大的减低了网络传输负载;
# 2.2 通过REPLICATE方法获取远程节点相应的分区下对象相应的哈希值;
# 3 当本地分区到每个副本节点分区下的数据同步全部完成之后,则删除本分区下的数据;
# 注:
# 这里没有进行本地分区数据和远程副本数据的比较验证工作,说明这个方法需要直接同步分区下所有数据到远程副本节点;
# 应该适用于如下情形:
# 假设本分区所在设备节点号为1,所有副本设备节点号为1/2/3/4,当执行rebalance操作后,所有设备节点号为2/3/4/5,在rebalance操作过程中,
# 1号设备上本分区则被标志为delete;此时,需要先同步1号设备本分区数据到2/3/4/5号设备上,然后删除1号设备本分区下的数据;
# 在这里虽然也执行了复制1号设备数据到2/3/4号设备,但是这里并没由进行冗余复制操作,因为命令rsync可以自动跳过完全相同的文件只更新不同的文件,大大的减低了网络传输负载;
self.run_pool.spawn(self.update_deleted, job)
else:
# 对于远程副本节点,循环执行,针对每一个节点实现以下操作:
# 1 通过http连接远程节点,通过REPLICATE方法实现获取job['partition']下所有对象哈希值;
# 2 找出本地分区下哈希值中后缀和远程分区下哈希值中后缀不同的,说明分区下的某些对象文件数据发生了变化;
# 3 针对发生变化的数据,调用sync方法,通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
self.run_pool.spawn(self.update, job)
with Timeout(self.lockup_timeout):
self.run_pool.waitall()
except (Exception, Timeout):
self.logger.exception(_("Exception in top-level replication loop"))
self.kill_coros()
finally:
stats.kill()
lockup_detector.kill()
self.stats_line()
1.调用方法collect_jobs获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
2.遍历所有任务列表,针对rebalance操作后数据同步情况,应用方法update_deleted,实现同步本地分区数据到远程副本相应分区的对象数据操作;
3.针对其他某些对象数据损坏情况,应用方法update,实现同步本地分区数据到远程副本相应分区的对象数据操作;
转到1,来看方法collect_jobs的实现:
def collect_jobs(self):
"""
获取节点分区下的要实现数据同步操作的任务列表;
获取对象环确定的本地设备下需要进行数据验证和数据同步(修复)操作的分区的任务列表;
注释引申----object相关数据存储结构:
# 引申1----objects文件夹下的不同分区:
# 遍历obj_path = /srv/node/local_dev['device']/objects下的分区;
# 示例:
# root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects# ls
# 1306913397147995820894238
# 引申2:
# 通过理解一致性hash算法可知,
# 加入虚拟节点后每一个设备会多个虚拟节点和其对应,
# 如果一个设备对应的分区数为n则,
# objects下子文件夹数目会<=n,
# 因为存入的所有文件并不一定都能映射到当前设备所对应的分区;
# 引申3----object存储结构:
# os.listdir(obj_path)列出objects目录下的所有partition,
# 创建object是在objects文件夹下创建objects所映射的分区号的文件夹partition,
# 例如:
# ls /srv/node/device1/objects/
# 13052716568317635212065214811252830
# 再在partition文件夹下创建以object的hash值后三位为名称的文件夹(表示不同的object),
# 例如:
# ls /srv/node/device1/objects/130527/
# 201hashes.pkl
# 然后再在后缀文件夹下创建以object的hash值为文件夹名的文件夹,
# 例如:
# ls /srv/node/device1/objects/130527/201/
# 7f77f99f076c5ddbb7c7922875ab6201
# object会存储为以object上传时间戳为名.data为文件后缀的文件(表示object的数据文件);
# 例如:
# ls /srv/node/device1/objects/130527/201/7f77f99f076c5ddbb7c7922875ab6201/
# 1405479927.09551.data
"""
jobs = []
ips = whataremyips()
# 获取环上所有的本地设备;
for local_dev in [dev for dev in self.object_ring.devs
if dev and dev['replication_ip'] in ips and
dev['replication_port'] == self.port]:
# /srv/node/local_dev['device']
dev_path = join(self.devices_dir, local_dev['device'])
# /srv/node/local_dev['device']/objects
obj_path = join(dev_path, 'objects')
# /srv/node/local_dev['device']/tmp
tmp_path = join(dev_path, 'tmp')
if self.mount_check and not ismount(dev_path):
self.logger.warn(_('%s is not mounted'), local_dev['device'])
continue
unlink_older_than(tmp_path, time.time() - self.reclaim_age)
# 如果目录obj_path不存在,则建立obj_path目录;
if not os.path.exists(obj_path):
try:
mkdirs(obj_path)
except Exception:
self.logger.exception('ERROR creating %s' % obj_path)
continue
# 遍历obj_path = /srv/node/local_dev['device']/objects下的分区;
# 示例:
# root@kinglion-Lenovo-Product:/srv/1/node/sdb1/objects# ls
# 1306913397147995820894238
# 注释引申:
# 通过理解一致性hash算法可知,
# 加入虚拟节点后每一个设备会多个虚拟节点和其对应,
# 如果一个设备对应的分区数为n则,
# obj_path下子文件夹数目会<=n,
# 因为存入的所有文件并不一定都能映射到当前设备所对应的分区;
# 注释引申----object存储结构:
# os.listdir(obj_path)列出objects目录下的所有partition,
# 创建object是在objects文件夹下创建objects所映射的分区号的文件夹partition,
# 再在partition文件夹下创建以object的hash值后三位为名称的文件夹(表示不同的object),
# 然后再在后缀文件夹下创建以object的hash值为文件夹名的文件夹(表示同一个object的不同的文件),
# object会存储为以object上传时间戳为名.data为文件后缀的文件(表示object的数据文件);
for partition in os.listdir(obj_path):
try:
# /srv/node/local_dev['device']/objects/partition
job_path = join(obj_path, partition)
# 如果/srv/node/local_dev['device']/objects/partition是文件而不是分区,则删除;
if isfile(job_path):
# Clean up any (probably zero-byte) files where a
# partition should be.
self.logger.warning('Removing partition directory '
'which was a file: %s', job_path)
os.remove(job_path)
continue
# 获取一个分区所有副本(replica)相关的节点信息;
part_nodes = self.object_ring.get_part_nodes(int(partition))
# nodes为不是本机器nodes的其他replica-1个nodes;
nodes = [node for node in part_nodes
if node['id'] != local_dev['id']]
jobs.append(
dict(
path=job_path,# /srv/node/local_dev['device']/objects/partition
device=local_dev['device'], # 本地设备;
nodes=nodes,
delete=len(nodes) > len(part_nodes) - 1,
partition=partition))
except (ValueError, OSError):
continue
# 打乱jobs的顺序;
random.shuffle(jobs)
if self.handoffs_first:
# Move the handoff parts to the front of the list
jobs.sort(key=lambda job: not job['delete'])
self.job_count = len(jobs)
return jobs
1.1.获取环上所有的本地设备;
1.2.遍历每个设备下objects下的每个分区(/srv/node/local_dev['device']/objects/partition);
1.3.针对每个分区,获取一个分区所有副本(replica)除了本机的节点;
1.4.每个分区的相关信息作为一个任务,返回包含所有任务的字典;
下一篇博客将继续swift-object-replicator的分析工作。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn 你好,你这里引用了我几十篇博客,都不声明一下,你不觉得很多分么?至少应该尊重一下我的劳动成果吧?
http://blog.csdn.net/gaoxingnengjisuan 你好,你这里引用了我几十篇博客,都不声明一下,你不觉得很多分么?还把我的信息删除了,至少应该尊重一下我的劳动成果吧?
http://blog.csdn.net/gaoxingnengjisuan
页:
[1]