问题导读
1.def update_deleted(self, job)实现了什么功能?
2.在def update(self, job)方法中,对于远程副本节点,循环执行,针对每一个节点实现了什么操作?
转到2,来看方法update_deleted的实现:
- if job['delete']:
- self.run_pool.spawn(self.update_deleted, job)
复制代码
- def update_deleted(self, job):
- """
- 同步本分区下数据到远程副本分区,并删除本分区下对象数据;
- 1 获取指定分区目录下各个对象的suff----suffixes;
- 2 遍历指定分区所有副本(除了本分区)节点,在每个副本节点上:
- 2.1 调用方法sync,实现通过rsync命令行实现同步本地分区下suffixes确定的若干对象数据到远程节点相应的分区下;
- 注意:这里并没由冗余复制数据的操作,因为命令rsync可以自动跳过完全相同的文件只更新不同的文件,大大的减低了网络传输负载;
- 2.2 通过REPLICATE方法获取远程节点相应的分区下对象相应的哈希值;
- 3 当本地分区到每个副本节点分区下的数据同步全部完成之后,则删除本分区下的数据;
- 注:
- 这里没有进行本地分区数据和远程副本数据的比较验证工作,说明这个方法需要直接同步分区下所有数据到远程副本节点;
- 应该适用于如下情形:
- 假设本分区所在设备节点号为1,所有副本设备节点号为1/2/3/4,当执行rebalance操作后,所有设备节点号为2/3/4/5,在rebalance操作过程中,
- 1号设备上本分区则被标志为delete;此时,需要先同步1号设备本分区数据到2/3/4/5号设备上,然后删除1号设备本分区下的数据;
- 在这里虽然也执行了复制1号设备数据到2/3/4号设备,但是这里并没由进行冗余复制操作,因为命令rsync可以自动跳过完全相同的文件只更新不同的文件,大大的减低了网络传输负载;
- """
- def tpool_get_suffixes(path):
- """
- 获取指定分区目录下各个对象的suff;
- path = job['path'] = /srv/node/local_dev['device']/objects/partition
- """
- return [suff for suff in os.listdir(path)
- if len(suff) == 3 and isdir(join(path, suff))]
-
- self.replication_count += 1
- self.logger.increment('partition.delete.count.%s' % (job['device'],))
- begin = time.time()
-
- try:
- responses = []
-
- # 获取指定分区目录下各个对象的suff;
- # job['path'] = /srv/node/local_dev['device']/objects/partition
- suffixes = tpool.execute(tpool_get_suffixes, job['path'])
-
- if suffixes:
- # job['nodes']:同一个分区相关副本除了本节点的其他的节点(所以可能有多个);
- for node in job['nodes']:
-
- # 通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
- # 因为在命令行的构成过程中,本地数据的地址在前作为源数据地址,远程数据地址在后作为目标数据地址;
- # 可以通过一条命令实现suffixes所指定的数据的同步,源数据地址有多个,目标数据地址有一个;
- # 注:
- # 这里没有进行本地和远程数据的比较和验证操作,而是直接把本地数据拷贝到远程地址;
- # 这里并没由冗余复制数据的操作,因为命令rsync可以自动跳过完全相同的文件只更新不同的文件,大大的减低了网络传输负载;
- success = self.sync(node, job, suffixes)
- if success:
- with Timeout(self.http_timeout):
- # REPLICARE方法,对应sever里面的RELICATE方法;
- # REPLICATE方法就是获取指定分区下的哈希值文件(可能有多个,因为分区下可能映射了多个对象),用于判断对象数据是否发生改变;
- # 并获取方法执行的响应信息,即远程节点上副本的哈希值;
- conn = http_connect(
- node['replication_ip'],
- node['replication_port'],
- node['device'], job['partition'], 'REPLICATE',
- '/' + '-'.join(suffixes), headers=self.headers)
- conn.getresponse().read()
- responses.append(success)
- if self.handoff_delete:
- # delete handoff if we have had handoff_delete successes
- delete_handoff = len([resp for resp in responses if resp]) >= self.handoff_delete
- else:
- # delete handoff if all syncs were successful
- delete_handoff = len(responses) == len(job['nodes']) and all(responses)
-
- # suffixes为空或请求的三个已经都响应成功后删除本地partion下的文件;
- if not suffixes or delete_handoff:
- self.logger.info(_("Removing partition: %s"), job['path'])
- tpool.execute(shutil.rmtree, job['path'], ignore_errors=True)
- except (Exception, Timeout):
- self.logger.exception(_("Error syncing handoff partition"))
- finally:
- self.partition_times.append(time.time() - begin)
- self.logger.timing_since('partition.delete.timing', begin)
复制代码
2.1.获取指定分区目录下各个对象的suff----suffixes;
2.2.遍历指定分区所有副本(除了本节点)节点,在每个副本节点上:
2.2.1.调用方法sync,实现通过rsync命令行实现同步本地分区下suffixes确定的若干对象数据到远程节点相应的分区下;
注意:这里并没由冗余复制数据的操作,因为命令rsync可以自动跳过完全相同的文件只更新不同的文件,大大的减低了网络传输负载;
2.2.2.通过REPLICATE方法获取远程节点相应的分区下对象相应的哈希值;
2.3.当本地分区到每个副本节点分区下的数据同步全部完成之后,则删除本分区下的数据;
转到2.2,来看方法sync的实现:
- def sync(self, node, job, suffixes): # Just exists for doc anchor point
- """
- 通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
- 因为在命令行的构成过程中,本地数据的地址在前作为源数据地址,远程数据地址在后作为目标数据地址;
- 可以通过一条命令实现suffixes所指定的数据的同步,源数据地址有多个,目标数据地址有一个;
- """
- # def rsync;
- # 通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
- # 因为在命令行的构成过程中,本地数据的地址在前作为源数据地址,远程数据地址在后作为目标数据地址;
- # 可以通过一条命令实现suffixes所指定的数据的同步,源数据地址有多个,目标数据地址有一个;
- return self.sync_method(node, job, suffixes)
复制代码
注:这里调用的方法是rsync;
- def rsync(self, node, job, suffixes):
- """
- 通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
- 因为在命令行的构成过程中,本地数据的地址在前作为源数据地址,远程数据地址在后作为目标数据地址;
- 可以通过一条命令实现suffixes所指定的数据的同步,源数据地址有多个,目标数据地址有一个;
- """
- if not os.path.exists(job['path']):
- return False
-
- # Rsync(remote synchronize)是一个远程数据同步工具,
- # 可通过LAN/WAN快速同步多台主机间的文件。
- # Rsync使用所谓的“Rsync算法”来使本地和远程两个主机之间的文件达到同步,
- # 这个算法只传送两个文件的不同部分,而不是每次都整份传送,因此速度相当快;
- args = [
- 'rsync',
- '--recursive',
- '--whole-file',
- '--human-readable',
- '--xattrs',
- '--itemize-changes',
- '--ignore-existing',
- '--timeout=%s' % self.rsync_io_timeout,
- '--contimeout=%s' % self.rsync_io_timeout,
- '--bwlimit=%s' % self.rsync_bwlimit,
- ]
- # 获取远程节点的IP;
- node_ip = rsync_ip(node['replication_ip'])
-
- # rsync_module = node_ip::object
- if self.vm_test_mode:
- rsync_module = '%s::object%s' % (node_ip, node['replication_port'])
- else:
- rsync_module = '%s::object' % node_ip
-
- had_any = False
-
- # 遍历suffixes,分别生成suffix的具体路径,并加载到命令行变量args中;
- # 如果不存在suffixes,则说明前面获取损坏的对象数据的操作是错误的,则直接返回;
- # 这里也可以看到,命令rsync可以实现同时同步多个数据对象;
- for suffix in suffixes:
- # job['path'] = /srv/node/local_dev['device']/objects/partition
- # spath = /srv/node/local_dev['device']/objects/partition/suffix
- spath = join(job['path'], suffix)
- if os.path.exists(spath):
- args.append(spath)
- had_any = True
- if not had_any:
- return False
-
- # 添加远程数据路径到命令行变量args中;
- # rsync_module = node_ip::object;
- args.append(join(rsync_module, node['device'], 'objects', job['partition']))
-
- # 实现同步本地分区下若干数据到远程节点相应的分区下;
- return self._rsync(args) == 0
复制代码
2.2.1.遍历suffixes,分别生成suffix的具体路径(/srv/node/local_dev['device']/objects/partition/suffix),并加载到命令行变量args中;
2.2.2.添加远程数据路径到命令行变量args中;
2.2.3.调用方法_rsync实现同步本地分区下若干数据到远程节点相应的分区下;
转到2.2.3,来看方法_rsync的实现:
- def _rsync(self, args):
- """
- 实现同步本地分区下若干数据到远程节点相应的分区下
- """
- start_time = time.time()
- ret_val = None
- try:
- with Timeout(self.rsync_timeout):
- # 此处即为同步操作了,推送模式;
- proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
- results = proc.stdout.read()
- ret_val = proc.wait()
- except Timeout:
- self.logger.error(_("Killing long-running rsync: %s"), str(args))
- proc.kill()
- return 1 # failure response code
- total_time = time.time() - start_time
- for result in results.split('\n'):
- if result == '':
- continue
- if result.startswith('cd+'):
- continue
- if not ret_val:
- self.logger.info(result)
- else:
- self.logger.error(result)
- if ret_val:
- error_line = _('Bad rsync return code: %(ret)d <- %(args)s') % {'args': str(args), 'ret': ret_val}
- if self.rsync_error_log_line_length:
- error_line = error_line[:self.rsync_error_log_line_length]
- self.logger.error(error_line)
- elif results:
- self.logger.info(_("Successful rsync of %(src)s at %(dst)s (%(time).03f)"), {'src': args[-2], 'dst': args[-1], 'time': total_time})
- else:
- self.logger.debug(_("Successful rsync of %(src)s at %(dst)s (%(time).03f)"), {'src': args[-2], 'dst': args[-1], 'time': total_time})
- return ret_val
复制代码
转到3,来看方法update的实现:
- def update(self, job):
- """
- 实现复制一个分区的高级方法;
- 对于远程副本节点,循环执行,针对每一个节点实现以下操作:
- 1 通过http连接远程节点,通过REPLICATE方法实现获取job['partition']下所有对象的哈希值;
- 2 找出本地分区下哈希值中后缀和远程分区下哈希值中后缀不同的,说明分区下的某些对象文件数据发生了变化;
- 3 针对发生变化的数据,调用sync方法,通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
- """
- self.replication_count += 1
- self.logger.increment('partition.update.count.%s' % (job['device'],))
- begin = time.time()
-
- try:
- # 方法get_hashes从具体的分区(具体的object)的哈希值文件hashes.pkl获取hashes值并更新,获取本地的hashes;
- # job[path]为job_path = join(obj_path, partition) = /srv/node/local_dev['device']/objects/partition;
- # local_hash为hashes.pkl中的反序列化回来的内容;
- # hashed为改变的数目;
- hashed, local_hash = tpool_reraise(
- get_hashes,
- job['path'], # job['path'] = /srv/node/local_dev['device']/objects/partition
- do_listdir=(self.replication_count % 10) == 0,
- reclaim_age=self.reclaim_age)
-
- self.suffix_hash += hashed
- self.logger.update_stats('suffix.hashes', hashed)
- # 其他副本对应的节点数目;
- # 此时attempts_left 为2 若果replica为3;
- attempts_left = len(job['nodes'])
-
-
- # 此时的nodes为除去本节点外的所有节点;
- # 因为job['nodes]不包含本地节点,
- # get_more_nodes(int(job['partition']))能获得除去本partion所对应节点外的其他所有节点;
- nodes = itertools.chain(
- job['nodes'],
- # get_more_nodes:这个方法实现了获取其他副本的节点;
- # 这个方法说明了三副本带来的高可用性;
- # 如果replicator进程检测到对远程node执行同步操作失败;
- # 那么它就会通过ring类提供的get_more_nodes接口来获得其他副本存放的node进行同步;
- self.object_ring.get_more_nodes(int(job['partition'])))
-
- # 其他副本对应的节点数目;
- # 此时attempts_left 为2 若果replica为3;
- # 对于远程副本节点,循环执行,针对每一个节点实现以下操作:
- # 通过http连接远程节点,通过REPLICATE方法实现获取job['partition']下所有对象的哈希值;
- # 找出本地分区下哈希值中后缀和远程分区下哈希值中后缀不同的,说明分区下的某些对象文件数据发生了变化;
- # 针对发生变化的数据,调用sync方法,通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
- while attempts_left > 0:
- # If this throws StopIterator it will be caught way below
- node = next(nodes)
- attempts_left -= 1
- try:
- with Timeout(self.http_timeout):
- # REPLICARE方法,对应sever里面的RELICATE方法;
- # REPLICATE方法就是获取指定分区下的哈希值文件(可能有多个,因为分区下可能映射了多个对象),用于判断对象数据是否发生改变;
- #并获取方法执行的响应信息,即远程节点上副本的哈希值;
- resp = http_connect(
- node['replication_ip'], node['replication_port'],
- node['device'], job['partition'], 'REPLICATE',
- '', headers=self.headers).getresponse()
-
- if resp.status == HTTP_INSUFFICIENT_STORAGE:
- self.logger.error(_('%(ip)s/%(device)s responded as unmounted'), node)
- attempts_left += 1
- continue
-
- if resp.status != HTTP_OK:
- self.logger.error(_("Invalid response %(resp)s from %(ip)s"), {'resp': resp.status, 'ip': node['replication_ip']})
- continue
-
- # 获取远程节点上分区的哈希值;
- remote_hash = pickle.loads(resp.read())
- del resp
-
- # 找出本地分区下哈希值中后缀和远程分区下哈希值中后缀不同的;
- # 如果分区下某些对象数据发生改变,其对应的哈希值文件也会发生改变;
- # 如果有不同,说明分区下的某些对象文件数据发生了变化;
- # 示例:
- # 假如 local_hash 为 123 321 122 remote_hash 123 321 124 则 122为变化的
- # 文件路径hash值后三位会不会重复
- suffixes = [suffix for suffix in local_hash if
- local_hash[suffix] !=
- remote_hash.get(suffix, -1)]
-
- # 如果没有不同,说明对象数据都没有变化,则继续请求下一个节点;
- if not suffixes:
- continue
-
- # 针对那些和远程节点分区上不同的哈希值,这里进行重新计算;
- # 然后再一次和远程节点分区上的哈希值进行比较;
- # 这样做的目的是确保筛选的完全准确性;
- hashed, recalc_hash = tpool_reraise(
- get_hashes,
- job['path'], recalculate=suffixes,
- reclaim_age=self.reclaim_age)
- self.logger.update_stats('suffix.hashes', hashed)
- local_hash = recalc_hash
- suffixes = [suffix for suffix in local_hash if
- local_hash[suffix] !=
- remote_hash.get(suffix, -1)]
-
- # sync方法:
- # 通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
- # 因为在命令行的构成过程中,本地数据的地址在前作为源数据地址,远程数据地址在后作为目标数据地址;
- # 可以通过一条命令实现suffixes所指定的数据的同步,源数据地址有多个,目标数据地址有一个;
- self.sync(node, job, suffixes)
-
- with Timeout(self.http_timeout):
- conn = http_connect(node['replication_ip'], node['replication_port'], node['device'], job['partition'], 'REPLICATE',
- '/' + '-'.join(suffixes), headers=self.headers)
- conn.getresponse().read()
- self.suffix_sync += len(suffixes)
- self.logger.update_stats('suffix.syncs', len(suffixes))
- except (Exception, Timeout):
- self.logger.exception(_("Error syncing with node: %s") % node)
- self.suffix_count += len(local_hash)
- except (Exception, Timeout):
- self.logger.exception(_("Error syncing partition"))
- finally:
- self.partition_times.append(time.time() - begin)
- self.logger.timing_since('partition.update.timing', begin)
复制代码
3.1.通过http连接远程节点,通过REPLICATE方法实现获取job['partition']下所有对象的哈希值;
3.2.找出本地分区下哈希值中后缀和远程分区下哈希值中后缀不同的,说明分区下的某些对象文件数据发生了变化;
3.3.针对发生变化的数据,调用sync方法,通过rsync命令行实现同步本地分区下若干数据到远程节点相应的分区下;
注:方法sync的解析,前面已经完成,这里不再进行赘述;
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
|