分享

Swift源码分析----swift-object-auditor(1)

tntzbzc 发表于 2014-11-20 15:35:04 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 16973
本帖最后由 pig2 于 2014-11-21 15:25 编辑
问题导读
1、如何对于指定的对象数据进行检测?
2、在什么情况下,系统默认调用守护进程类Daemon中的run_once方法?
3、一个对象的审计操作流程是怎样的?








概述部分:
对象审计守护进程;  
对象审计守护进程为主要实现审计设备目录下所有的object的操作;
对于一个对象的审计操作流程如下:
1 对于指定的对象数据进行检测,来判断文件是否损坏,检测方法包括:
检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同;
2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
3 移动损坏对象文件到隔离区域;
这里定义的once=True,说明系统默认调用守护进程类Daemon中的run_once方法;
从而最终实现调用ObjectAuditor类中的run_once方法;


源码解析部分:
下面是这部分代码的主要执行流程,代码中较重要的部分已经进行了相关的注释;
  1. from swift.obj.auditor import ObjectAuditor
  2. from swift.common.utils import parse_options
  3. from swift.common.daemon import run_daemon
  4. from optparse import OptionParser
  5. if __name__ == '__main__':
  6.     parser = OptionParser("%prog CONFIG [options]")
  7.     parser.add_option('-z', '--zero_byte_fps',
  8.                       help='Audit only zero byte files at specified files/sec')
  9.     parser.add_option('-d', '--devices',
  10.                       help='Audit only given devices. Comma-separated list')
  11.     conf_file, options = parse_options(parser=parser, once=True)
  12.     run_daemon(ObjectAuditor, conf_file, **options)
  13. def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):
  14.     """
  15.     从配置文件加载设置,然后实例化守护进程“klass”并运行这个守护进程通过指定的参数kwarg;
  16.     """
  17.     ......
  18.    try:
  19.        klass(conf).run(once=once, **kwargs)
  20.    except KeyboardInterrupt:
  21.        logger.info('User quit')
  22.    logger.info('Exited')
  23. def run(self, once=False, **kwargs):
  24.     """
  25.     运行守护进程程序;
  26.     即运行方法run_once,或者方法run_forever;
  27.     """
  28.    # 配置参数相关;
  29.    utils.validate_configuration()
  30.    utils.drop_privileges(self.conf.get('user', 'swift'))
  31.    # 日志相关处理;
  32.    utils.capture_stdio(self.logger, **kwargs)
  33.    def kill_children(*args):
  34.        signal.signal(signal.SIGTERM, signal.SIG_IGN)
  35.        os.killpg(0, signal.SIGTERM)
  36.        sys.exit()
  37.    signal.signal(signal.SIGTERM, kill_children)
  38.    if once:
  39.        self.run_once(**kwargs)
  40.    else:
  41.        self.run_forever(**kwargs)
  42. def run_once(self, *args, **kwargs):
  43.     """
  44.    Override this to run the script once
  45.     子类中的方法需要被重写;
  46.     """
  47.    raise NotImplementedError('run_once not implemented')
  48. def run_once(self, *args, **kwargs):
  49.     """
复制代码

   
    对于一个对象的审计操作流程如下:
    1 对于指定的对象数据进行检测,来判断文件是否损坏,检测方法包括:
      检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同;
    2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
    3 移动损坏对象文件到隔离区域;
  1.     """
  2.    # zero byte only command line option
  3.    zbo_fps = kwargs.get('zero_byte_fps', 0)
  4.    override_devices = list_from_csv(kwargs.get('devices'))
  5.    # Remove bogus entries and duplicates from override_devices
  6.    override_devices = list(
  7.         set(listdir(self.devices)).intersection(set(override_devices)))
  8.    parent = False
  9.    if zbo_fps:
  10.        # only start parent
  11.        parent = True
  12.    kwargs = {'mode': 'once'}
  13.    try:
  14.        self.audit_loop(parent, zbo_fps, override_devices=override_devices,
  15.                        **kwargs)
  16.    except (Exception, Timeout):
  17.        self.logger.exception(_('ERROR auditing'))
  18.     def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs):
  19.         """Audit loop"""
  20.         self.clear_recon_cache('ALL')
  21.         self.clear_recon_cache('ZBF')
  22.         
  23.         kwargs['device_dirs'] = override_devices
  24.         
  25.         if parent:
  26.             kwargs['zero_byte_fps'] = zbo_fps
  27.             self.run_audit(**kwargs)
  28.         else:
  29.             pids = []
  30.             if self.conf_zero_byte_fps:
  31.                 zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
  32.                 pids.append(zbf_pid)
  33.             pids.append(self.fork_child(**kwargs))
  34.             while pids:
  35.                 pid = os.wait()[0]
  36.                 # ZBF scanner must be restarted as soon as it finishes
  37.                 if self.conf_zero_byte_fps and pid == zbf_pid and \
  38.                    len(pids) > 1:
  39.                     kwargs['device_dirs'] = override_devices
  40.                     zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
  41.                     pids.append(zbf_pid)
  42.                 pids.remove(pid)
  43. def run_audit(self, **kwargs):
  44.     """
复制代码

   Run the object audit
    实现审计设备目录下所有的object的操作;
    对于一个对象的审计操作流程如下:
    1 对于指定的对象数据进行检测,来判断文件是否损坏,检测方法包括:
      检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同;
    2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
    3 移动损坏对象文件到隔离区域;
  1.     """
  2.    mode = kwargs.get('mode')
  3.    zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0)
  4.    device_dirs = kwargs.get('device_dirs')
  5.         
  6.    # AuditorWorker:遍历文件系统实现对象的审计类;
  7.    worker = AuditorWorker(self.conf, self.logger, self.rcache,
  8.                           self.devices,
  9.                           zero_byte_only_at_fps=zero_byte_only_at_fps)
  10.    # 实现审计设备目录下所有的object的操作;
  11.    # 对于一个对象的审计操作流程如下:
  12.    # 1 对于指定的对象数据进行检测,来判断文件是否损坏,检测方法包括:
  13.    #   检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同;
  14.    # 2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
  15.    # 3 移动损坏对象文件到隔离区域;
  16.    worker.audit_all_objects(mode=mode, device_dirs=device_dirs)
  17. def audit_all_objects(self, mode='once', device_dirs=None):
  18.     """
  19.     实现审计设备目录下所有的object的操作;
复制代码

    对于一个对象的审计操作流程如下:
    1 对于指定的对象数据进行检测,来判断文件是否损坏,检测方法包括:
      检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同;
    2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
    3 移动损坏对象文件到隔离区域;
  1.     """
  2.    description = ''
  3.    if device_dirs:
  4.        device_dir_str = ','.join(sorted(device_dirs))
  5.        description = _(' - %s') % device_dir_str
  6.    self.logger.info(_('Begin object audit "%s" mode (%s%s)') % (mode, self.auditor_type,
  7. description))
  8.    begin = reported = time.time()
  9.    self.total_bytes_processed = 0
  10.    self.total_files_processed = 0
  11.    total_quarantines = 0
  12.    total_errors = 0
  13.    time_auditing = 0
  14.         
  15.    # 基于给定的设备路径和数据目录,为在这个数据目录中的所有文件生成(path, device,
  16. partition);
  17.    # 通过方法audit_location_generator方法计算获得path,device,partition;
  18.    # 实际上是获取所有分区的相关信息;
  19.         
  20.    # 返回的是 device_dirs 下的文件hash列表  
  21.    # 返回内容为 hsh_path, device, partition  
  22.    # all_locs 为设备self.device中device_dirs下的所有文件,为 AuditLocation(hsh_path, device,
  23. partition)对象;  
  24.    all_locs = self.diskfile_mgr.object_audit_location_generator(device_dirs=device_dirs)
  25.         
  26.    for location in all_locs:
  27.        loop_time = time.time()
  28.             
  29.        # 1 对于location确定的对象数据进行检测,来判断文件是否损坏,检测方法包括:
  30.        #   检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同;
  31.        # 2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
  32.        # 3 移动损坏对象文件到隔离区域;
  33.        self.failsafe_object_audit(location)
  34.             
  35.        self.logger.timing_since('timing', loop_time)
  36.        self.files_running_time = ratelimit_sleep(
  37.            self.files_running_time, self.max_files_per_second)
  38.             
  39.        self.total_files_processed += 1
  40.        now = time.time()
  41.        if now - reported >= self.log_time:
  42.            self.logger.info(_(
  43.                'Object audit (%(type)s). '
  44.                'Since %(start_time)s: Locally: %(passes)d passed, '
  45.                '%(quars)d quarantined, %(errors)d errors '
  46.                'files/sec: %(frate).2f , bytes/sec: %(brate).2f, '
  47.                'Total time: %(total).2f, Auditing time: %(audit).2f, '
  48.                'Rate: %(audit_rate).2f') % {
  49.                    'type': '%s%s' % (self.auditor_type, description),
  50.                    'start_time': time.ctime(reported),
  51.                    'passes': self.passes, 'quars': self.quarantines,
  52.                    'errors': self.errors,
  53.                    'frate': self.passes / (now - reported),
  54.                    'brate': self.bytes_processed / (now - reported),
  55.                    'total': (now - begin), 'audit': time_auditing,
  56.                    'audit_rate': time_auditing / (now - begin)})
  57.            cache_entry = self.create_recon_nested_dict(
  58.                'object_auditor_stats_%s' % (self.auditor_type),
  59.                device_dirs,
  60.                {'errors': self.errors, 'passes': self.passes,
  61.                 'quarantined': self.quarantines,
  62.                 'bytes_processed': self.bytes_processed,
  63.                 'start_time': reported, 'audit_time': time_auditing})
  64.                
  65.            dump_recon_cache(cache_entry, self.rcache, self.logger)
  66.            reported = now
  67.            total_quarantines += self.quarantines
  68.            total_errors += self.errors
  69.            self.passes = 0
  70.            self.quarantines = 0
  71.            self.errors = 0
  72.            self.bytes_processed = 0
  73.             
  74.             
  75.        # 计算审核花费的总时间数据;
  76.        time_auditing += (now - loop_time)
  77.             
  78.    # Avoid divide by zero during very short runs
  79.    # 如果时间很短,可以设置值为0.000001,从而避免得到的数值为0;
  80.    elapsed = (time.time() - begin) or 0.000001
  81.    self.logger.info(_(
  82.        'Object audit (%(type)s) "%(mode)s" mode '
  83.        'completed: %(elapsed).02fs. Total quarantined: %(quars)d, '
  84.        'Total errors: %(errors)d, Total files/sec: %(frate).2f, '
  85.        'Total bytes/sec: %(brate).2f, Auditing time: %(audit).2f, '
  86.        'Rate: %(audit_rate).2f') % {
  87.            'type': '%s%s' % (self.auditor_type, description),
  88.            'mode': mode, 'elapsed': elapsed,
  89.            'quars': total_quarantines + self.quarantines,
  90.            'errors': total_errors + self.errors,
  91.            'frate': self.total_files_processed / elapsed,
  92.            'brate': self.total_bytes_processed / elapsed,
  93.            'audit': time_auditing, 'audit_rate': time_auditing / elapsed})
  94.    # Clear recon cache entry if device_dirs is set
  95.    if device_dirs:
  96.        cache_entry = self.create_recon_nested_dict(
  97.            'object_auditor_stats_%s' % (self.auditor_type),
  98.            device_dirs, {})
  99.        dump_recon_cache(cache_entry, self.rcache, self.logger)
  100.    if self.stats_sizes:
  101.        self.logger.info(_('Object audit stats: %s') % json.dumps(self.stats_buckets))
  102. def failsafe_object_audit(self, location):
  103.     """
  104.    Entrypoint to object_audit, with a failsafe generic exception handler.
复制代码


    1 对于location确定的对象数据进行检测,来判断文件是否损坏,检测方法包括:
     检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同;
    2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
    3 移动损坏对象文件到隔离区域;
  1.     """
  2.    # object_audit:检查文件的完整性,该方法封装了obj server的DiskFile类;
  3.    # 该类有一个_handle_close_quarantine方法,用来检测文件是否需要被隔离;
  4.    # 如果发现损坏,则直接将文件移动到隔离目录下;
  5.    try:
  6.        self.object_audit(location)
  7.    except (Exception, Timeout):
  8.        self.logger.increment('errors')
  9.        self.errors += 1
  10.        self.logger.exception(_('ERROR Trying to audit %s'), location)下一篇博客将继续swift-object-auditor的分析工作。
复制代码


下一篇Swift源码分析----swift-object-auditor(2)



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条