本帖最后由 pig2 于 2014-11-21 15:26 编辑
问题导读
1.检查文件的完整性,该方法封装了obj server的哪个类?
2.如果发现文件损坏,该如何处理?
上一篇
Swift源码分析----swift-object-auditor(1)
- def object_audit(self, location):
- """
- Audits the given object location.
- 对partition进行一些检查 如果有问题抛出相应的异常;
- 如果抛AuditException这样的异常,说明partition出现问题,需要隔离,然后同步;
-
- 检查文件的完整性,该方法封装了obj server的DiskFile类;
- 该类有一个_handle_close_quarantine方法,用来检测文件是否需要被隔离;
- 如果发现损坏,则直接将文件移动到隔离目录下;
-
- 1 对于location确定的对象数据进行检测,来判断文件是否损坏,检测方法包括:
- 检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同;
- 2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
- 3 移动损坏对象文件到隔离区域;
- """
- def raise_dfq(msg):
- raise DiskFileQuarantined(msg)
- try:
- # 管理磁盘上的object文件类;
- # 用来初始化object,其中包括了一切对于一个object的操作;
-
- # get_diskfile_from_audit_location:获取磁盘文件的具体路径;
- # 获取由audit_location =(path, device, partition)所确定的具体路径;
- # 应该就是指定对象的具体路径;
- df = self.diskfile_mgr.get_diskfile_from_audit_location(location)
-
- with df.open():
- # 获取对象元数据字典;
- metadata = df.get_metadata()
- # 获取对象的大小;
- obj_size = int(metadata['Content-Length'])
-
- if self.stats_sizes:
- self.record_stats(obj_size)
-
- # 对象数据没有被损坏;
- if self.zero_byte_only_at_fps and obj_size:
- self.passes += 1
- return
-
- # df.reader方法在后面for循环中会调用类class DiskFileReader下的方法__iter__,
- # 在这个方法中会进一步调用方法close,而在close方法中会实现以下步骤:
- # 通过检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同,
- # 来判断文件是否损坏,是否需要被隔离;
- # 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
- # 并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
- # 关闭打开的文件fp;
- reader = df.reader(_quarantine_hook=raise_dfq)
-
- with closing(reader):
- for chunk in reader:
- chunk_len = len(chunk)
- self.bytes_running_time = ratelimit_sleep(
- self.bytes_running_time,
- self.max_bytes_per_second,
- incr_by=chunk_len)
-
- self.bytes_processed += chunk_len
- self.total_bytes_processed += chunk_len
- except DiskFileNotExist:
- return
- except DiskFileQuarantined as err:
- self.quarantines += 1
- self.logger.error(_('ERROR Object %(obj)s failed audit and was'
- ' quarantined: %(err)s'),
- {'obj': location, 'err': err})
- self.passes += 1
复制代码
1.调用方法get_diskfile_from_audit_location获取指定对象的具体路径;
2.打开指定对象文件,获取其元数据信息,并且从元数据中获取指定对象的大小;
3.通过变量zero_byte_only_at_fps和obj_size判断指定对象数据是否被损坏,如果没有被损坏直接返回;
4.对打开的对象调用reader方法,reader方法在后面for循环中会调用类class DiskFileReader下的方法
__iter__,在这个方法中会进一步调用方法close,而在close方法中会实现以下步骤:
4.1 通过检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同,来判断文件是否损坏,是否需要被隔离;
4.2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
4.3 并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
4.4 关闭打开的指定对象文件fp;
- def reader(self, keep_cache=False, _quarantine_hook=lambda m: None):
- dr = DiskFileReader(
- self._fp, self._data_file, int(self._metadata['Content-Length']),
- self._metadata['ETag'], self._threadpool, self._disk_chunk_size,
- self._mgr.keep_cache_size, self._device_path, self._logger,
- quarantine_hook=_quarantine_hook, keep_cache=keep_cache)
- self._fp = None
- return dr
复制代码
- def __iter__(self):
- """Returns an iterator over the data file."""
- try:
- ......
- finally:
- # 这里的close方法:
- # 通过检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同,
- # 来判断文件是否损坏,是否需要被隔离;
- # 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
- # 并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
- # 关闭打开的文件fp;
- if not self._suppress_file_closing:
- self.close()
复制代码
- def close(self):
- """
- Close the open file handle if present.
- For this specific implementation, this method will handle quarantining
- the file if necessary.
-
- 通过检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同,
- 来判断文件是否损坏,是否需要被隔离;
- 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
- 并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
- 关闭打开的文件fp;
- """
- if self._fp:
-
- # 通过检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同,
- # 来判断文件是否损坏,是否需要被隔离;
- # 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
- # 并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
- try:
- if self._started_at_0 and self._read_to_eof:
- self._handle_close_quarantine()
- except DiskFileQuarantined:
- raise
- except (Exception, Timeout) as e:
- self._logger.error(_(
- 'ERROR DiskFile %(data_file)s'
- ' close failure: %(exc)s : %(stack)s'),
- {'exc': e, 'stack': ''.join(traceback.format_stack()),
- 'data_file': self._data_file})
- finally:
- fp, self._fp = self._fp, None
- fp.close()
复制代码
- def _handle_close_quarantine(self):
- """
- Check if file needs to be quarantined
- 通过检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同,
- 来判断文件是否损坏,是否需要被隔离;
- 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
- 并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
- """
-
- # 如果文件的长度和读取文件长度不相同,则为_quarantine方法传入的是读取的长度不匹配;
- # 如果etag不相同,则为_quarantine方法传入的是md5值不匹配;
- if self._bytes_read != self._obj_size:
- self._quarantine(
- "Bytes read: %s, does not match metadata: %s" % (
- self._bytes_read, self._obj_size))
- elif self._iter_etag and self._etag != self._iter_etag.hexdigest():
- self._quarantine(
- "ETag %s and file's md5 %s do not match" % (
- self._etag, self._iter_etag.hexdigest()))
复制代码
- def _quarantine(self, msg):
- """
- 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
- 并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
- """
-
- # 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
- # 并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
- self._quarantined_dir = self._threadpool.run_in_thread(
- quarantine_renamer, self._device_path, self._data_file)
- self._logger.warn("Quarantined object %s: %s" % (self._data_file, msg))
- self._logger.increment('quarantines')
- self._quarantine_hook(msg)
复制代码
- def quarantine_renamer(device_path, corrupted_file_path):
- """
- 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
- 并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
- """
- # 损坏文件的路径;
- from_dir = dirname(corrupted_file_path)
- # 文件隔离区域的路径;
- to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir))
- # 设置损坏对象文件的哈希值为空;
- invalidate_hash(dirname(from_dir))
-
- # 实现复制损坏对象文件到隔离区域;
- try:
- renamer(from_dir, to_dir)
- except OSError as e:
- if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
- raise
- to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
- renamer(from_dir, to_dir)
- return to_dir
复制代码
- def invalidate_hash(suffix_dir):
- """
- Invalidates the hash for a suffix_dir in the partition's hashes file.
- 设置suffix_dir的哈希值为空;
- """
- suffix = basename(suffix_dir)
- partition_dir = dirname(suffix_dir)
- hashes_file = join(partition_dir, HASH_FILE)
- with lock_path(partition_dir):
- try:
- with open(hashes_file, 'rb') as fp:
- hashes = pickle.load(fp)
- if suffix in hashes and not hashes[suffix]:
- return
- except Exception:
- return
- hashes[suffix] = None
- write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
复制代码
- def renamer(old, new):
- """
- 实现复制损坏对象文件到隔离区域;
- """
- try:
- mkdirs(os.path.dirname(new))
- os.rename(old, new)
- except OSError:
- mkdirs(os.path.dirname(new))
- os.rename(old, new)
复制代码
|