分享

Swift源码分析----swift-object-auditor(2)

tntzbzc 发表于 2014-11-20 15:34:51 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 14264
本帖最后由 pig2 于 2014-11-21 15:26 编辑
问题导读

1.检查文件的完整性,该方法封装了obj server的哪个类?
2.如果发现文件损坏,该如何处理?




上一篇
Swift源码分析----swift-object-auditor(1)

  1. def object_audit(self, location):
  2.     """
  3.     Audits the given object location.
  4.     对partition进行一些检查 如果有问题抛出相应的异常;
  5.     如果抛AuditException这样的异常,说明partition出现问题,需要隔离,然后同步;
  6.         
  7.     检查文件的完整性,该方法封装了obj server的DiskFile类;
  8.     该类有一个_handle_close_quarantine方法,用来检测文件是否需要被隔离;
  9.     如果发现损坏,则直接将文件移动到隔离目录下;
  10.         
  11.     1 对于location确定的对象数据进行检测,来判断文件是否损坏,检测方法包括:
  12.       检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同;
  13.     2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
  14.     3 移动损坏对象文件到隔离区域;
  15.     """
  16.    def raise_dfq(msg):
  17.        raise DiskFileQuarantined(msg)
  18.    try:
  19.      # 管理磁盘上的object文件类;
  20.      # 用来初始化object,其中包括了一切对于一个object的操作;
  21.             
  22.      # get_diskfile_from_audit_location:获取磁盘文件的具体路径;
  23.      # 获取由audit_location =(path, device, partition)所确定的具体路径;
  24.      # 应该就是指定对象的具体路径;
  25.      df = self.diskfile_mgr.get_diskfile_from_audit_location(location)
  26.             
  27.      with df.open():
  28.          # 获取对象元数据字典;
  29.          metadata = df.get_metadata()
  30.          # 获取对象的大小;
  31.          obj_size = int(metadata['Content-Length'])
  32.                
  33.          if self.stats_sizes:
  34.              self.record_stats(obj_size)
  35.                
  36.          # 对象数据没有被损坏;
  37.          if self.zero_byte_only_at_fps and obj_size:
  38.              self.passes += 1
  39.              return
  40.                
  41.          # df.reader方法在后面for循环中会调用类class DiskFileReader下的方法__iter__,
  42.          # 在这个方法中会进一步调用方法close,而在close方法中会实现以下步骤:
  43.          #     通过检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同,
  44.          #     来判断文件是否损坏,是否需要被隔离;
  45.          #     在文件损坏的情况下,设置损坏对象文件的哈希值为空;
  46.          #     并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
  47.          #     关闭打开的文件fp;
  48.          reader = df.reader(_quarantine_hook=raise_dfq)
  49.                
  50.      with closing(reader):
  51.          for chunk in reader:
  52.              chunk_len = len(chunk)
  53.              self.bytes_running_time = ratelimit_sleep(
  54.                   self.bytes_running_time,
  55.                   self.max_bytes_per_second,
  56.                   incr_by=chunk_len)
  57.                     
  58.              self.bytes_processed += chunk_len
  59.              self.total_bytes_processed += chunk_len
  60.          except DiskFileNotExist:
  61.              return
  62.          except DiskFileQuarantined as err:
  63.              self.quarantines += 1
  64.              self.logger.error(_('ERROR Object %(obj)s failed audit and was'
  65.                                  ' quarantined: %(err)s'),
  66.                                 {'obj': location, 'err': err})
  67.          self.passes += 1
复制代码



1.调用方法get_diskfile_from_audit_location获取指定对象的具体路径;
2.打开指定对象文件,获取其元数据信息,并且从元数据中获取指定对象的大小;
3.通过变量zero_byte_only_at_fps和obj_size判断指定对象数据是否被损坏,如果没有被损坏直接返回;
4.对打开的对象调用reader方法,reader方法在后面for循环中会调用类class DiskFileReader下的方法
__iter__,在这个方法中会进一步调用方法close,而在close方法中会实现以下步骤:
  4.1 通过检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同,来判断文件是否损坏,是否需要被隔离;
  4.2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
  4.3 并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
  4.4 关闭打开的指定对象文件fp;
  1. def reader(self, keep_cache=False, _quarantine_hook=lambda m: None):
  2.     dr = DiskFileReader(
  3.                 self._fp, self._data_file, int(self._metadata['Content-Length']),
  4.                 self._metadata['ETag'], self._threadpool, self._disk_chunk_size,
  5.                 self._mgr.keep_cache_size, self._device_path, self._logger,
  6.                 quarantine_hook=_quarantine_hook, keep_cache=keep_cache)
  7.     self._fp = None
  8.     return dr
复制代码

  1. def __iter__(self):
  2.     """Returns an iterator over the data file."""
  3.     try:
  4.             ......
  5.     finally:
  6.         # 这里的close方法:
  7.         # 通过检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同,
  8.         # 来判断文件是否损坏,是否需要被隔离;
  9.         # 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
  10.         # 并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
  11.         # 关闭打开的文件fp;
  12.         if not self._suppress_file_closing:
  13.             self.close()
复制代码

  1. def close(self):
  2.      """
  3.     Close the open file handle if present.
  4.     For this specific implementation, this method will handle quarantining
  5.     the file if necessary.
  6.         
  7.      通过检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同,
  8.      来判断文件是否损坏,是否需要被隔离;
  9.      在文件损坏的情况下,设置损坏对象文件的哈希值为空;
  10.      并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
  11.      关闭打开的文件fp;
  12.      """
  13.     if self._fp:
  14.             
  15.         # 通过检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同,
  16.         # 来判断文件是否损坏,是否需要被隔离;
  17.         # 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
  18.         # 并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
  19.         try:
  20.             if self._started_at_0 and self._read_to_eof:
  21.                 self._handle_close_quarantine()
  22.             except DiskFileQuarantined:
  23.                 raise
  24.             except (Exception, Timeout) as e:
  25.                 self._logger.error(_(
  26.                     'ERROR DiskFile %(data_file)s'
  27.                     ' close failure: %(exc)s : %(stack)s'),
  28.                     {'exc': e, 'stack': ''.join(traceback.format_stack()),
  29.                      'data_file': self._data_file})
  30.             finally:
  31.                 fp, self._fp = self._fp, None
  32.                 fp.close()
复制代码

  1. def _handle_close_quarantine(self):
  2.      """
  3.     Check if file needs to be quarantined
  4.      通过检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同,
  5.      来判断文件是否损坏,是否需要被隔离;
  6.      在文件损坏的情况下,设置损坏对象文件的哈希值为空;
  7.      并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
  8.      """
  9.         
  10.     # 如果文件的长度和读取文件长度不相同,则为_quarantine方法传入的是读取的长度不匹配;
  11.     # 如果etag不相同,则为_quarantine方法传入的是md5值不匹配;
  12.     if self._bytes_read != self._obj_size:
  13.         self._quarantine(
  14.                 "Bytes read: %s, does not match metadata: %s" % (
  15.                  self._bytes_read, self._obj_size))
  16.     elif self._iter_etag and self._etag != self._iter_etag.hexdigest():
  17.         self._quarantine(
  18.                 "ETag %s and file's md5 %s do not match" % (
  19.                  self._etag, self._iter_etag.hexdigest()))
复制代码

  1. def _quarantine(self, msg):
  2.      """
  3.      在文件损坏的情况下,设置损坏对象文件的哈希值为空;
  4.      并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
  5.      """
  6.         
  7.     # 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
  8.     # 并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
  9.     self._quarantined_dir = self._threadpool.run_in_thread(
  10.          quarantine_renamer, self._device_path, self._data_file)
  11.     self._logger.warn("Quarantined object %s: %s" % (self._data_file, msg))
  12.     self._logger.increment('quarantines')
  13.     self._quarantine_hook(msg)
复制代码

  1. def quarantine_renamer(device_path, corrupted_file_path):
  2.      """
  3.     在文件损坏的情况下,设置损坏对象文件的哈希值为空;
  4.     并移动损坏对象文件到隔离区域,以便后续通过复制操作实现损坏文件的恢复;
  5.      """
  6.     # 损坏文件的路径;
  7.     from_dir = dirname(corrupted_file_path)
  8.     # 文件隔离区域的路径;
  9.     to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir))
  10.     # 设置损坏对象文件的哈希值为空;
  11.     invalidate_hash(dirname(from_dir))
  12.    
  13.     # 实现复制损坏对象文件到隔离区域;
  14.     try:
  15.         renamer(from_dir, to_dir)
  16.     except OSError as e:
  17.         if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
  18.             raise
  19.         to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
  20.         renamer(from_dir, to_dir)
  21.     return to_dir
复制代码

  1. def invalidate_hash(suffix_dir):
  2.      """
  3.     Invalidates the hash for a suffix_dir in the partition's hashes file.
  4.     设置suffix_dir的哈希值为空;
  5.      """
  6.     suffix = basename(suffix_dir)
  7.     partition_dir = dirname(suffix_dir)
  8.     hashes_file = join(partition_dir, HASH_FILE)
  9.     with lock_path(partition_dir):
  10.         try:
  11.             with open(hashes_file, 'rb') as fp:
  12.                 hashes = pickle.load(fp)
  13.             if suffix in hashes and not hashes[suffix]:
  14.                 return
  15.         except Exception:
  16.             return
  17.         hashes[suffix] = None
  18.         write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
复制代码
  1. def renamer(old, new):
  2.      """
  3.      实现复制损坏对象文件到隔离区域;
  4.      """
  5.     try:
  6.         mkdirs(os.path.dirname(new))
  7.         os.rename(old, new)
  8.     except OSError:
  9.         mkdirs(os.path.dirname(new))
  10.         os.rename(old, new)
复制代码










没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条