分享

Swift源码分析----swift-object-updater

tntzbzc 发表于 2014-11-20 15:34:57 回帖奖励 阅读模式 关闭右栏 1 13607
问题导读
1.方法object_sweep实现了什么功能?
2.update_path = /srv/node/device1/async_pending/prefix/update语句的含义是什么?






概述部分:
对象更新守护进程适用于这种情况:
在系统故障或者是服务器高负荷的情况下,账户或容器的数据可能不会立即被更新或者出现数据更新失败的情况;
如果出现这种情况,该次更新将会在本地文件系统上被加入队列,然后更新器将会继续处理这些队列中的更新工作;
对象更新守护进程就是用于处理队列中对象更新任务操作;
注:
1 这些更新任务存储在本地目录/srv/node/device1(设备)/async_pending/下面,这里存储了每一个设备下所有属于这个设备的要执行的更新操作;
2 当然这个目录下面每个对象的更新任务都是独立的一个文件,如/srv/node/device1/async_pending/prefix****/update****/****;
这个守护进程的总体流程如下:
1 遍历本地节点上的所有设备,获取每一个设备;
2 针对每一个设备,嵌套遍历到目录/srv/node/device1/async_pending/prefix****/update****层次,需要执行的更新任务就存储在此;
  调用方法process_object_update执行以下的操作:
  2.1 获取一个对象的副本节点和分区号;
  2.2 遍历一个对象的所有副本节点,针对每一个节点调用object_update方法,通过POST或DELETE方法实现对象的更新操作;
3 当执行完/srv/node/device1/async_pending/prefix****/update****指定的对象更新任务,删除这个文件;

源码解析部分:
下面是这部分代码的主要执行流程,代码中较重要的部分已经进行了相关的注释;
  1. from swift.obj.updater import ObjectUpdater
  2. from swift.common.utils import parse_options
  3. from swift.common.daemon import run_daemon
  4. if __name__ == '__main__':
  5.     conf_file, options = parse_options(once=True)
  6.     run_daemon(ObjectUpdater, conf_file, **options)
复制代码

  1. def run_once(self, *args, **kwargs):
  2.     """
  3.     运行一次更新;
  4.     """
  5.     self.logger.info(_('Begin object update single threaded sweep'))
  6.     begin = time.time()
  7.     self.successes = 0
  8.     self.failures = 0
  9.         
  10.     # self.devices = /srv/node
  11.     # ls /srv/node/
  12.     # account.builder  backups            container.ring.gz  object.builder
  13.     # account.ring.gz  container.builder  device1            object.ring.gz
  14.     for device in os.listdir(self.devices):
  15.         if self.mount_check and not ismount(os.path.join(self.devices, device)):
  16.             self.logger.increment('errors')
  17.             self.logger.warn(_('Skipping %s as it is not mounted'), device)
  18.             continue
  19.             
  20.         # object_sweep:扫描device上的async pendings目录,遍历每一个prefix目录并执行升级;
  21.         # self.devices = /srv/node
  22.         # /srv/node/device1
  23.         # /srv/node/device2
  24.         self.object_sweep(os.path.join(self.devices, device))
  25.     elapsed = time.time() - begin
  26.     self.logger.info(_('Object update single threaded sweep completed: '
  27.                        '%(elapsed).02fs, %(success)s successes, %(fail)s failures'),
  28.                        {'elapsed': elapsed, 'success': self.successes, 'fail': self.failures})
  29.     dump_recon_cache({'object_updater_sweep': elapsed}, self.rcache, self.logger)
复制代码




遍历每一个设备,调用方法object_sweep对其目录下的对象实现更新操作;
来看方法object_sweep的具体实现:
  1. def object_sweep(self, device):
  2.     """
  3.     扫描device上的async pendings目录,遍历每一个prefix目录并执行更新;
  4.     """
  5.     start_time = time.time()
  6.     # async_pending = /srv/node/device1/async_pending
  7.     async_pending = os.path.join(device, ASYNCDIR)
  8.     if not os.path.isdir(async_pending):
  9.         return
  10.         
  11.     for prefix in os.listdir(async_pending):
  12.         # 获取/srv/node/device1/async_pending/每一个目录的路径;
  13.         prefix_path = os.path.join(async_pending, prefix)
  14.         if not os.path.isdir(prefix_path):
  15.             continue
  16.             
  17.         last_obj_hash = None
  18.         # 按照从大到小的顺序遍历prefix_path下的文件夹;
  19.         for update in sorted(os.listdir(prefix_path), reverse=True):
  20.             # update_path = prefix_path/update
  21.             update_path = os.path.join(prefix_path, update)
  22.             if not os.path.isfile(update_path):
  23.                 continue
  24.                
  25.             try:
  26.                 obj_hash, timestamp = update.split('-')
  27.             except ValueError:
  28.                 self.logger.increment('errors')
  29.                 self.logger.error(_('ERROR async pending file with unexpected name %s') % (update_path))
  30.                 continue
  31.                
  32.             # 如果obj_hash = last_obj_hash,则删除update_path;
  33.             if obj_hash == last_obj_hash:
  34.                 self.logger.increment("unlinks")
  35.                 os.unlink(update_path)
  36.                     
  37.             else:
  38.                 # process_object_update:执行更新object;
  39.                 # device = /srv/node/device1
  40.                 # update_path = /srv/node/device1/async_pending/prefix****/update****
  41.                 self.process_object_update(update_path, device)
  42.                 last_obj_hash = obj_hash
  43.             time.sleep(self.slowdown)
  44.         try:
  45.             os.rmdir(prefix_path)
  46.         except OSError:
  47.             pass
  48.     self.logger.timing_since('timing', start_time)
复制代码




update_path = /srv/node/device1/async_pending/prefix/update
嵌套遍历指定的设备device下的文件,遍历所有形如update_path = /srv/node/device1/async_pending/prefix****/update****的文件;
针对每一个update_path文件,调用方法process_object_update,实现为每一个object进行更新操作;
来看方法process_object_update的实现:
  1. def process_object_update(self, update_path, device):
  2.     """
  3.     执行更新object;      
  4.     # device = /srv/node/device1
  5.     # update_path = /srv/node/device1/async_pending/prefix****/update****
  6.     """
  7.     try:
  8.         update = pickle.load(open(update_path, 'rb'))
  9.     except Exception:
  10.         self.logger.exception(_('ERROR Pickle problem, quarantining %s'), update_path)
  11.         self.logger.increment('quarantines')
  12.         renamer(update_path, os.path.join(device, 'quarantined', 'objects', os.path.basename(update_path)))
  13.         return
  14.         
  15.     successes = update.get('successes', [])
  16.     # get_container_ring:获取或建立container的环,且加载它;
  17.     # get_nodes:获取account/container/object所对应的分区号和节点(可能是多个,因为分区副本有多个,可能位于不同的节点上);
  18.     # 返回元组(分区,节点信息列表);
  19.     # 在节点信息列表中至少包含id、weight、zone、ip、port、device、meta;
  20.     part, nodes = self.get_container_ring().get_nodes(update['account'], update['container'])
  21.     # 明确当前对象的层次关系;
  22.     obj = '/%s/%s/%s' % (update['account'], update['container'], update['obj'])
  23.         
  24.     success = True
  25.     new_successes = False
  26.         
  27.     # 遍历对象的所有副本节点;
  28.     for node in nodes:
  29.         # 如果某个节点上的对象没有执行更新操作,则调用方法object_update执行更新操作;
  30.         if node['id'] not in successes:
  31.             # object_update:通过POST或DELETE方法执行container中的object的更新;
  32.             # update['op']:对象更新操作具体调用的方法(如POST或DELETE);
  33.             # 更新操作的相关信息保存在update['headers']中;
  34.             status = self.object_update(node, part, update['op'], obj, update['headers'])
  35.                
  36.             if not is_success(status) and status != HTTP_NOT_FOUND:
  37.                 success = False
  38.             else:
  39.                 successes.append(node['id'])
  40.                 new_successes = True
  41.         
  42.     if success:
  43.         self.successes += 1
  44.         self.logger.increment('successes')
  45.         self.logger.debug(_('Update sent for %(obj)s %(path)s'), {'obj': obj, 'path': update_path})
  46.         self.logger.increment("unlinks")
  47.         os.unlink(update_path)
  48.     else:
  49.         self.failures += 1
  50.         self.logger.increment('failures')
  51.         self.logger.debug(_('Update failed for %(obj)s %(path)s'), {'obj': obj, 'path': update_path})
  52.         if new_successes:
  53.             update['successes'] = successes
  54.             write_pickle(update, update_path, os.path.join(device, 'tmp'))
复制代码




1.读取指定的对象数据更新文件update_path = /srv/node/device1/async_pending/prefix****/update****;
  注:每个update_path文件记录了一个object要执行的更新信息;
  在系统故障或者是服务器高负荷的情况下,账户或容器的数据可能不会立即被更新或者出现数据更新失败的情况;
  如果出现这种情况,该次更新将会在本地文件系统上被加入队列,然后更新器将会继续处理这些队列中的更新工作;
  这些没有及时执行的更新操作,就是存储在这些update_path文件中,每个update_path文件对应一个object的更新操作;
2.根据读取的对象数据更新文件获取对象所属的账户(account)和容器(container),并获取指定容器的分区号和所有副本节点;
3.针对每个容器的副本节点,调用方法object_update实现通过POST或DELETE方法执行container中的object的更新;
  注:在方法object_update的调用传递参数中,指定了对象更新操作具体调用的方法(如POST或DELETE),也是由前面读取的数据更新文件中获取的;
转到3,来看方法object_update的具体实现:
  1. def object_update(self, node, part, op, obj, headers):
  2.      """
  3.      执行container中的object的更新;
  4.      """
  5.     headers_out = headers.copy()
  6.     headers_out['user-agent'] = 'obj-updater %s' % os.getpid()
  7.     try:
  8.         # 通过POST或DELETE方法实现对象的更新操作;
  9.         with ConnectionTimeout(self.conn_timeout):
  10.             conn = http_connect(node['ip'], node['port'], node['device'], part, op, obj, headers_out)
  11.         with Timeout(self.node_timeout):
  12.             resp = conn.getresponse()
  13.             resp.read()
  14.             return resp.status
  15.     except (Exception, Timeout):
  16.         self.logger.exception(_('ERROR with remote server %(ip)s:%(port)s/%(device)s'), node)
  17.     return HTTP_INTERNAL_SERVER_ERROR
复制代码


博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn

已有(0)人评论

跳转到指定楼层
溜溜小哥 发表于 2015-5-22 17:11:55
朋友你好,你这里引用了我几十篇博客,都不声明一下,你不觉得很多分么?至少应该尊重一下我的劳动成果吧?
http://blog.csdn.net/gaoxingnengjisuan

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条