tntzbzc 发表于 2014-11-20 15:34:57

Swift源码分析----swift-object-updater

问题导读
1.方法object_sweep实现了什么功能?
2.update_path = /srv/node/device1/async_pending/prefix/update语句的含义是什么?


static/image/hrline/4.gif



概述部分:
对象更新守护进程适用于这种情况:
在系统故障或者是服务器高负荷的情况下,账户或容器的数据可能不会立即被更新或者出现数据更新失败的情况;
如果出现这种情况,该次更新将会在本地文件系统上被加入队列,然后更新器将会继续处理这些队列中的更新工作;
对象更新守护进程就是用于处理队列中对象更新任务操作;
注:
1 这些更新任务存储在本地目录/srv/node/device1(设备)/async_pending/下面,这里存储了每一个设备下所有属于这个设备的要执行的更新操作;
2 当然这个目录下面每个对象的更新任务都是独立的一个文件,如/srv/node/device1/async_pending/prefix****/update****/****;
这个守护进程的总体流程如下:
1 遍历本地节点上的所有设备,获取每一个设备;
2 针对每一个设备,嵌套遍历到目录/srv/node/device1/async_pending/prefix****/update****层次,需要执行的更新任务就存储在此;
调用方法process_object_update执行以下的操作:
2.1 获取一个对象的副本节点和分区号;
2.2 遍历一个对象的所有副本节点,针对每一个节点调用object_update方法,通过POST或DELETE方法实现对象的更新操作;
3 当执行完/srv/node/device1/async_pending/prefix****/update****指定的对象更新任务,删除这个文件;

源码解析部分:
下面是这部分代码的主要执行流程,代码中较重要的部分已经进行了相关的注释;
from swift.obj.updater import ObjectUpdater
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
if __name__ == '__main__':
    conf_file, options = parse_options(once=True)
    run_daemon(ObjectUpdater, conf_file, **options)

def run_once(self, *args, **kwargs):
    """
    运行一次更新;
    """
    self.logger.info(_('Begin object update single threaded sweep'))
    begin = time.time()
    self.successes = 0
    self.failures = 0
      
    # self.devices = /srv/node
    # ls /srv/node/
    # account.builderbackups            container.ring.gzobject.builder
    # account.ring.gzcontainer.builderdevice1            object.ring.gz
    for device in os.listdir(self.devices):
      if self.mount_check and not ismount(os.path.join(self.devices, device)):
            self.logger.increment('errors')
            self.logger.warn(_('Skipping %s as it is not mounted'), device)
            continue
            
      # object_sweep:扫描device上的async pendings目录,遍历每一个prefix目录并执行升级;
      # self.devices = /srv/node
      # /srv/node/device1
      # /srv/node/device2
      self.object_sweep(os.path.join(self.devices, device))
    elapsed = time.time() - begin
    self.logger.info(_('Object update single threaded sweep completed: '
                     '%(elapsed).02fs, %(success)s successes, %(fail)s failures'),
                     {'elapsed': elapsed, 'success': self.successes, 'fail': self.failures})
    dump_recon_cache({'object_updater_sweep': elapsed}, self.rcache, self.logger)



遍历每一个设备,调用方法object_sweep对其目录下的对象实现更新操作;
来看方法object_sweep的具体实现:
def object_sweep(self, device):
    """
    扫描device上的async pendings目录,遍历每一个prefix目录并执行更新;
    """
    start_time = time.time()
    # async_pending = /srv/node/device1/async_pending
    async_pending = os.path.join(device, ASYNCDIR)
    if not os.path.isdir(async_pending):
      return
      
    for prefix in os.listdir(async_pending):
      # 获取/srv/node/device1/async_pending/每一个目录的路径;
      prefix_path = os.path.join(async_pending, prefix)
      if not os.path.isdir(prefix_path):
            continue
            
      last_obj_hash = None
      # 按照从大到小的顺序遍历prefix_path下的文件夹;
      for update in sorted(os.listdir(prefix_path), reverse=True):
            # update_path = prefix_path/update
            update_path = os.path.join(prefix_path, update)
            if not os.path.isfile(update_path):
                continue
               
            try:
                obj_hash, timestamp = update.split('-')
            except ValueError:
                self.logger.increment('errors')
                self.logger.error(_('ERROR async pending file with unexpected name %s') % (update_path))
                continue
               
            # 如果obj_hash = last_obj_hash,则删除update_path;
            if obj_hash == last_obj_hash:
                self.logger.increment("unlinks")
                os.unlink(update_path)
                  
            else:
                # process_object_update:执行更新object;
                # device = /srv/node/device1
                # update_path = /srv/node/device1/async_pending/prefix****/update****
                self.process_object_update(update_path, device)
                last_obj_hash = obj_hash
            time.sleep(self.slowdown)
      try:
            os.rmdir(prefix_path)
      except OSError:
            pass
    self.logger.timing_since('timing', start_time)



update_path = /srv/node/device1/async_pending/prefix/update
嵌套遍历指定的设备device下的文件,遍历所有形如update_path = /srv/node/device1/async_pending/prefix****/update****的文件;
针对每一个update_path文件,调用方法process_object_update,实现为每一个object进行更新操作;
来看方法process_object_update的实现:
def process_object_update(self, update_path, device):
    """
    执行更新object;      
    # device = /srv/node/device1
    # update_path = /srv/node/device1/async_pending/prefix****/update****
    """
    try:
      update = pickle.load(open(update_path, 'rb'))
    except Exception:
      self.logger.exception(_('ERROR Pickle problem, quarantining %s'), update_path)
      self.logger.increment('quarantines')
      renamer(update_path, os.path.join(device, 'quarantined', 'objects', os.path.basename(update_path)))
      return
      
    successes = update.get('successes', [])
    # get_container_ring:获取或建立container的环,且加载它;
    # get_nodes:获取account/container/object所对应的分区号和节点(可能是多个,因为分区副本有多个,可能位于不同的节点上);
    # 返回元组(分区,节点信息列表);
    # 在节点信息列表中至少包含id、weight、zone、ip、port、device、meta;
    part, nodes = self.get_container_ring().get_nodes(update['account'], update['container'])
    # 明确当前对象的层次关系;
    obj = '/%s/%s/%s' % (update['account'], update['container'], update['obj'])
      
    success = True
    new_successes = False
      
    # 遍历对象的所有副本节点;
    for node in nodes:
      # 如果某个节点上的对象没有执行更新操作,则调用方法object_update执行更新操作;
      if node['id'] not in successes:
            # object_update:通过POST或DELETE方法执行container中的object的更新;
            # update['op']:对象更新操作具体调用的方法(如POST或DELETE);
            # 更新操作的相关信息保存在update['headers']中;
            status = self.object_update(node, part, update['op'], obj, update['headers'])
               
            if not is_success(status) and status != HTTP_NOT_FOUND:
                success = False
            else:
                successes.append(node['id'])
                new_successes = True
      
    if success:
      self.successes += 1
      self.logger.increment('successes')
      self.logger.debug(_('Update sent for %(obj)s %(path)s'), {'obj': obj, 'path': update_path})
      self.logger.increment("unlinks")
      os.unlink(update_path)
    else:
      self.failures += 1
      self.logger.increment('failures')
      self.logger.debug(_('Update failed for %(obj)s %(path)s'), {'obj': obj, 'path': update_path})
      if new_successes:
            update['successes'] = successes
            write_pickle(update, update_path, os.path.join(device, 'tmp'))



1.读取指定的对象数据更新文件update_path = /srv/node/device1/async_pending/prefix****/update****;
注:每个update_path文件记录了一个object要执行的更新信息;
在系统故障或者是服务器高负荷的情况下,账户或容器的数据可能不会立即被更新或者出现数据更新失败的情况;
如果出现这种情况,该次更新将会在本地文件系统上被加入队列,然后更新器将会继续处理这些队列中的更新工作;
这些没有及时执行的更新操作,就是存储在这些update_path文件中,每个update_path文件对应一个object的更新操作;
2.根据读取的对象数据更新文件获取对象所属的账户(account)和容器(container),并获取指定容器的分区号和所有副本节点;
3.针对每个容器的副本节点,调用方法object_update实现通过POST或DELETE方法执行container中的object的更新;
注:在方法object_update的调用传递参数中,指定了对象更新操作具体调用的方法(如POST或DELETE),也是由前面读取的数据更新文件中获取的;
转到3,来看方法object_update的具体实现:
def object_update(self, node, part, op, obj, headers):
   """
   执行container中的object的更新;
   """
    headers_out = headers.copy()
    headers_out['user-agent'] = 'obj-updater %s' % os.getpid()
    try:
      # 通过POST或DELETE方法实现对象的更新操作;
      with ConnectionTimeout(self.conn_timeout):
            conn = http_connect(node['ip'], node['port'], node['device'], part, op, obj, headers_out)
      with Timeout(self.node_timeout):
            resp = conn.getresponse()
            resp.read()
            return resp.status
    except (Exception, Timeout):
      self.logger.exception(_('ERROR with remote server %(ip)s:%(port)s/%(device)s'), node)
    return HTTP_INTERNAL_SERVER_ERROR

博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn

溜溜小哥 发表于 2015-5-22 17:11:55

朋友你好,你这里引用了我几十篇博客,都不声明一下,你不觉得很多分么?至少应该尊重一下我的劳动成果吧?
http://blog.csdn.net/gaoxingnengjisuan
页: [1]
查看完整版本: Swift源码分析----swift-object-updater