tntzbzc 发表于 2014-11-20 15:35:04

Swift源码分析----swift-object-auditor(1)

本帖最后由 pig2 于 2014-11-21 15:25 编辑

问题导读
1、如何对于指定的对象数据进行检测?
2、在什么情况下,系统默认调用守护进程类Daemon中的run_once方法?
3、一个对象的审计操作流程是怎样的?

static/image/hrline/4.gif





概述部分:
对象审计守护进程;
对象审计守护进程为主要实现审计设备目录下所有的object的操作;
对于一个对象的审计操作流程如下:
1 对于指定的对象数据进行检测,来判断文件是否损坏,检测方法包括:
检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同;
2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
3 移动损坏对象文件到隔离区域;
这里定义的once=True,说明系统默认调用守护进程类Daemon中的run_once方法;
从而最终实现调用ObjectAuditor类中的run_once方法;


源码解析部分:
下面是这部分代码的主要执行流程,代码中较重要的部分已经进行了相关的注释;
from swift.obj.auditor import ObjectAuditor
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
from optparse import OptionParser
if __name__ == '__main__':
    parser = OptionParser("%prog CONFIG ")
    parser.add_option('-z', '--zero_byte_fps',
                      help='Audit only zero byte files at specified files/sec')
    parser.add_option('-d', '--devices',
                      help='Audit only given devices. Comma-separated list')
    conf_file, options = parse_options(parser=parser, once=True)
    run_daemon(ObjectAuditor, conf_file, **options)
def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):
    """
    从配置文件加载设置,然后实例化守护进程“klass”并运行这个守护进程通过指定的参数kwarg;
    """
    ......
   try:
       klass(conf).run(once=once, **kwargs)
   except KeyboardInterrupt:
       logger.info('User quit')
   logger.info('Exited')
def run(self, once=False, **kwargs):
    """
    运行守护进程程序;
    即运行方法run_once,或者方法run_forever;
    """
   # 配置参数相关;
   utils.validate_configuration()
   utils.drop_privileges(self.conf.get('user', 'swift'))
   # 日志相关处理;
   utils.capture_stdio(self.logger, **kwargs)
   def kill_children(*args):
       signal.signal(signal.SIGTERM, signal.SIG_IGN)
       os.killpg(0, signal.SIGTERM)
       sys.exit()
   signal.signal(signal.SIGTERM, kill_children)
   if once:
       self.run_once(**kwargs)
   else:
       self.run_forever(**kwargs)
def run_once(self, *args, **kwargs):
    """
   Override this to run the script once
    子类中的方法需要被重写;
    """
   raise NotImplementedError('run_once not implemented')
def run_once(self, *args, **kwargs):
    """
   
    对于一个对象的审计操作流程如下:
    1 对于指定的对象数据进行检测,来判断文件是否损坏,检测方法包括:
      检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同;
    2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
    3 移动损坏对象文件到隔离区域;
    """
   # zero byte only command line option
   zbo_fps = kwargs.get('zero_byte_fps', 0)
   override_devices = list_from_csv(kwargs.get('devices'))
   # Remove bogus entries and duplicates from override_devices
   override_devices = list(
      set(listdir(self.devices)).intersection(set(override_devices)))
   parent = False
   if zbo_fps:
       # only start parent
       parent = True
   kwargs = {'mode': 'once'}
   try:
       self.audit_loop(parent, zbo_fps, override_devices=override_devices,
                     **kwargs)
   except (Exception, Timeout):
       self.logger.exception(_('ERROR auditing'))
    def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs):
      """Audit loop"""
      self.clear_recon_cache('ALL')
      self.clear_recon_cache('ZBF')
      
      kwargs['device_dirs'] = override_devices
      
      if parent:
            kwargs['zero_byte_fps'] = zbo_fps
            self.run_audit(**kwargs)
      else:
            pids = []
            if self.conf_zero_byte_fps:
                zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
                pids.append(zbf_pid)
            pids.append(self.fork_child(**kwargs))
            while pids:
                pid = os.wait()
                # ZBF scanner must be restarted as soon as it finishes
                if self.conf_zero_byte_fps and pid == zbf_pid and \
                   len(pids) > 1:
                  kwargs['device_dirs'] = override_devices
                  zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
                  pids.append(zbf_pid)
                pids.remove(pid)
def run_audit(self, **kwargs):
    """
   Run the object audit
    实现审计设备目录下所有的object的操作;
    对于一个对象的审计操作流程如下:
    1 对于指定的对象数据进行检测,来判断文件是否损坏,检测方法包括:
      检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同;
    2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
    3 移动损坏对象文件到隔离区域;
    """
   mode = kwargs.get('mode')
   zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0)
   device_dirs = kwargs.get('device_dirs')
      
   # AuditorWorker:遍历文件系统实现对象的审计类;
   worker = AuditorWorker(self.conf, self.logger, self.rcache,
                        self.devices,
                        zero_byte_only_at_fps=zero_byte_only_at_fps)
   # 实现审计设备目录下所有的object的操作;
   # 对于一个对象的审计操作流程如下:
   # 1 对于指定的对象数据进行检测,来判断文件是否损坏,检测方法包括:
   #   检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同;
   # 2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
   # 3 移动损坏对象文件到隔离区域;
   worker.audit_all_objects(mode=mode, device_dirs=device_dirs)
def audit_all_objects(self, mode='once', device_dirs=None):
    """
    实现审计设备目录下所有的object的操作;
    对于一个对象的审计操作流程如下:
    1 对于指定的对象数据进行检测,来判断文件是否损坏,检测方法包括:
      检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同;
    2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
    3 移动损坏对象文件到隔离区域;
    """
   description = ''
   if device_dirs:
       device_dir_str = ','.join(sorted(device_dirs))
       description = _(' - %s') % device_dir_str
   self.logger.info(_('Begin object audit "%s" mode (%s%s)') % (mode, self.auditor_type,
description))
   begin = reported = time.time()
   self.total_bytes_processed = 0
   self.total_files_processed = 0
   total_quarantines = 0
   total_errors = 0
   time_auditing = 0
      
   # 基于给定的设备路径和数据目录,为在这个数据目录中的所有文件生成(path, device,
partition);
   # 通过方法audit_location_generator方法计算获得path,device,partition;
   # 实际上是获取所有分区的相关信息;
      
   # 返回的是 device_dirs 下的文件hash列表
   # 返回内容为 hsh_path, device, partition
   # all_locs 为设备self.device中device_dirs下的所有文件,为 AuditLocation(hsh_path, device,
partition)对象;
   all_locs = self.diskfile_mgr.object_audit_location_generator(device_dirs=device_dirs)
      
   for location in all_locs:
       loop_time = time.time()
            
       # 1 对于location确定的对象数据进行检测,来判断文件是否损坏,检测方法包括:
       #   检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同;
       # 2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
       # 3 移动损坏对象文件到隔离区域;
       self.failsafe_object_audit(location)
            
       self.logger.timing_since('timing', loop_time)
       self.files_running_time = ratelimit_sleep(
         self.files_running_time, self.max_files_per_second)
            
       self.total_files_processed += 1
       now = time.time()
       if now - reported >= self.log_time:
         self.logger.info(_(
               'Object audit (%(type)s). '
               'Since %(start_time)s: Locally: %(passes)d passed, '
               '%(quars)d quarantined, %(errors)d errors '
               'files/sec: %(frate).2f , bytes/sec: %(brate).2f, '
               'Total time: %(total).2f, Auditing time: %(audit).2f, '
               'Rate: %(audit_rate).2f') % {
                   'type': '%s%s' % (self.auditor_type, description),
                   'start_time': time.ctime(reported),
                   'passes': self.passes, 'quars': self.quarantines,
                   'errors': self.errors,
                   'frate': self.passes / (now - reported),
                   'brate': self.bytes_processed / (now - reported),
                   'total': (now - begin), 'audit': time_auditing,
                   'audit_rate': time_auditing / (now - begin)})
         cache_entry = self.create_recon_nested_dict(
               'object_auditor_stats_%s' % (self.auditor_type),
               device_dirs,
               {'errors': self.errors, 'passes': self.passes,
                'quarantined': self.quarantines,
                'bytes_processed': self.bytes_processed,
                'start_time': reported, 'audit_time': time_auditing})
               
         dump_recon_cache(cache_entry, self.rcache, self.logger)
         reported = now
         total_quarantines += self.quarantines
         total_errors += self.errors
         self.passes = 0
         self.quarantines = 0
         self.errors = 0
         self.bytes_processed = 0
            
            
       # 计算审核花费的总时间数据;
       time_auditing += (now - loop_time)
            
   # Avoid divide by zero during very short runs
   # 如果时间很短,可以设置值为0.000001,从而避免得到的数值为0;
   elapsed = (time.time() - begin) or 0.000001
   self.logger.info(_(
       'Object audit (%(type)s) "%(mode)s" mode '
       'completed: %(elapsed).02fs. Total quarantined: %(quars)d, '
       'Total errors: %(errors)d, Total files/sec: %(frate).2f, '
       'Total bytes/sec: %(brate).2f, Auditing time: %(audit).2f, '
       'Rate: %(audit_rate).2f') % {
         'type': '%s%s' % (self.auditor_type, description),
         'mode': mode, 'elapsed': elapsed,
         'quars': total_quarantines + self.quarantines,
         'errors': total_errors + self.errors,
         'frate': self.total_files_processed / elapsed,
         'brate': self.total_bytes_processed / elapsed,
         'audit': time_auditing, 'audit_rate': time_auditing / elapsed})
   # Clear recon cache entry if device_dirs is set
   if device_dirs:
       cache_entry = self.create_recon_nested_dict(
         'object_auditor_stats_%s' % (self.auditor_type),
         device_dirs, {})
       dump_recon_cache(cache_entry, self.rcache, self.logger)
   if self.stats_sizes:
       self.logger.info(_('Object audit stats: %s') % json.dumps(self.stats_buckets))
def failsafe_object_audit(self, location):
    """
   Entrypoint to object_audit, with a failsafe generic exception handler.

    1 对于location确定的对象数据进行检测,来判断文件是否损坏,检测方法包括:
   检测文件长度和读取文件长度值是否相同,以及通过检测etag值是否相同;
    2 在文件损坏的情况下,设置损坏对象文件的哈希值为空;
    3 移动损坏对象文件到隔离区域;
    """
   # object_audit:检查文件的完整性,该方法封装了obj server的DiskFile类;
   # 该类有一个_handle_close_quarantine方法,用来检测文件是否需要被隔离;
   # 如果发现损坏,则直接将文件移动到隔离目录下;
   try:
       self.object_audit(location)
   except (Exception, Timeout):
       self.logger.increment('errors')
       self.errors += 1
       self.logger.exception(_('ERROR Trying to audit %s'), location)下一篇博客将继续swift-object-auditor的分析工作。

下一篇Swift源码分析----swift-object-auditor(2)



页: [1]
查看完整版本: Swift源码分析----swift-object-auditor(1)