问题导读
1.账户收割操作的主要职责是什么?
2.调用哪个方法实现收割指定device的操作?
概述部分:
账户下数据收割操作的守护进程;
账户收割操作的主要职责就是对于账户下状态标志为deleted的分区数据(容器和对象)实现删除操作;
具体的实现是调用了reap_account、reap_container和reap_object方法,实现递归删除账户下的相关数据;
对于分区的相关副本也要实现删除操作;
这里定义的once=True,说明系统默认调用守护进程类Daemon中的run_once方法;
从而最终实现调用AccountReaper类中的run_once方法;
如果调用的是AccountReaper类中的run_forever方法,则会实现循环实现对账户下状态标志为deleted的数据实现收割操作;
源码解析部分:
下面是这部分代码的主要执行流程,代码中较重要的部分已经进行了相关的注释;
- from swift.account.reaper import AccountReaper
- from swift.common.utils import parse_options
- from swift.common.daemon import run_daemon
- if __name__ == '__main__':
- conf_file, options = parse_options(once=True)
- run_daemon(AccountReaper, conf_file, **options)
复制代码
- def run_once(self, *args, **kwargs):
- """
- 进行一次任务收割操作;
- 对已经删除了的账户下的数据对象和容器进行删除操作;
- """
- self.logger.debug(_('Begin devices pass: %s'), self.devices)
- begin = time()
- try:
- # self.devices = conf.get('devices', '/srv/node')
- # 循环获取/srv/node/目录下的device;
- # 删除所有遍历的device,实现收割所有device;
- for device in os.listdir(self.devices):
- if self.mount_check and not ismount(os.path.join(self.devices, device)):
- self.logger.increment('errors')
- self.logger.debug(_('Skipping %s as it is not mounted'), device)
- continue
-
- # 实现收割指定device操作;
- self.reap_device(device)
- except (Exception, Timeout):
- self.logger.exception(_("Exception in top-level account reaper "
- "loop"))
- elapsed = time() - begin
- self.logger.info(_('Devices pass completed: %.02fs'), elapsed)
复制代码
循环获取/srv/node/目录下的device,调用方法reap_device实现收割指定device的操作;
- def reap_device(self, device):
- """
- 实现收割device操作;
- 这个方法将扫描设备的accounts目录;
- 找到相应的数据库文件,然后对数据库文件内容进行判断;
- 如果账户被标志为‘DELETE’,调用reap_account实现删除账户下的所有的containers相关数据;
- """
- # self.devices = conf.get('devices', '/srv/node')
- # datadir = /srv/node/device/accounts
- datadir = os.path.join(self.devices, device, DATADIR)
- if not os.path.exists(datadir):
- return
-
- # 循环获取/srv/node/device/accounts目录下的partition;
- for partition in os.listdir(datadir):
- # partition_path = /srv/node/device/accounts/partition
- partition_path = os.path.join(datadir, partition)
- if not partition.isdigit():
- continue
-
- # 获取一个分区partition所有副本相关的节点nodes;
- # get_account_ring:获取swift.common.ring.Ring对象,名称为'account';
- # get_part_nodes:获取一个分区所有副本相关的节点信息;
- nodes = self.get_account_ring().get_part_nodes(int(partition))
- if nodes[0]['ip'] not in self.myips or not os.path.isdir(partition_path):
- continue
- for suffix in os.listdir(partition_path):
- # suffix_path = /srv/node/device/accounts/partition/suffix
- suffix_path = os.path.join(partition_path, suffix)
- if not os.path.isdir(suffix_path):
- continue
- for hsh in os.listdir(suffix_path):
- # hsh_path = /srv/node/device/accounts/partition/suffix/hsh
- hsh_path = os.path.join(suffix_path, hsh)
- if not os.path.isdir(hsh_path):
- continue
-
- # 循环遍历hsh_path = /srv/node/device/accounts/partition/suffix/hsh/下所有的文件;
- # 针对后缀为.db的文件,执行收割操作,即如果其状态标志为‘DELETE’并且broker不为空,则执行删除对应账户数据操作;
- for fname in sorted(os.listdir(hsh_path), reverse=True):
- if fname.endswith('.ts'):
- break
- elif fname.endswith('.db'):
- self.start_time = time()
- broker = AccountBroker(os.path.join(hsh_path, fname))
-
- # 如果broker状态标志为‘DELETE’并且broker不为空,则执行删除操作;
- # reap_account:删除要删除的account下的所有的containers相关数据;
- if broker.is_status_deleted() and not broker.empty():
- self.reap_account(broker, partition, nodes)
复制代码
1.嵌套循环遍历目录/srv/node/device/accounts/下的所有文件;
2.针对后缀为.db的文件,执行收割操作,即如果其状态标志为‘DELETE’且数据库文件不为空,则调用reap_account方法执行账户下所有容器数据的删除操作;
- def reap_account(self, broker, partition, nodes):
- """
- 实现收割account操作;
- 删除要删除的account下的所有的containers相关数据;
- """
- begin = time()
- info = broker.get_info()
- if time() - float(info['delete_timestamp']) <= self.delay_reaping:
- return False
- account = info['account']
- self.logger.info(_('Beginning pass on account %s'), account)
- self.stats_return_codes = {}
- self.stats_containers_deleted = 0
- self.stats_objects_deleted = 0
- self.stats_containers_remaining = 0
- self.stats_objects_remaining = 0
- self.stats_containers_possibly_remaining = 0
- self.stats_objects_possibly_remaining = 0
-
- # 删除要删除的account下的所有的containers相关数据;
- try:
- marker = ''
- while True:
- # 获取要删除的account下的containers排序列表;
- # 返回列表(name, object_count, bytes_used, 0);
- containers = list(broker.list_containers_iter(1000, marker, None, None, None))
- if not containers:
- break
-
- try:
- for (container, _junk, _junk, _junk) in containers:
- # reap_container:实现删除容器container下数据和容器container本身;
- # 在一个绿色线程中运行方法reap_container,来实现删除container相关数据;
- self.container_pool.spawn(self.reap_container,account,partition,nodes,container)
- self.container_pool.waitall()
- except (Exception, Timeout):
- self.logger.exception(_('Exception with containers for account %s'), account)
-
- marker = containers[-1][0]
- if marker == '':
- break
- log = 'Completed pass on account %s' % account
- except (Exception, Timeout):
- self.logger.exception(_('Exception with account %s'), account)
- log = _('Incomplete pass on account %s') % account
- if self.stats_containers_deleted:
- log += _(', %s containers deleted') % self.stats_containers_deleted
- if self.stats_objects_deleted:
- log += _(', %s objects deleted') % self.stats_objects_deleted
- if self.stats_containers_remaining:
- log += _(', %s containers remaining') % self.stats_containers_remaining
- if self.stats_objects_remaining:
- log += _(', %s objects remaining') % self.stats_objects_remaining
- if self.stats_containers_possibly_remaining:
- log += _(', %s containers possibly remaining') % self.stats_containers_possibly_remaining
- if self.stats_objects_possibly_remaining:
- log += _(', %s objects possibly remaining') % self.stats_objects_possibly_remaining
- if self.stats_return_codes:
- log += _(', return codes: ')
- for code in sorted(self.stats_return_codes):
- log += '%s %sxxs, ' % (self.stats_return_codes[code], code)
- log = log[:-2]
- log += _(', elapsed: %.02fs') % (time() - begin)
- self.logger.info(log)
- self.logger.timing_since('timing', self.start_time)
- if self.stats_containers_remaining and begin - float(info['delete_timestamp']) >= self.reap_not_done_after:
- self.logger.warn(_('Account %s has not been reaped since %s') % (account, ctime(float(info['delete_timestamp']))))
- return True
复制代码
1.实现获取要删除的account下的container排序列表;
2.循环遍历所有container,针对每一个container,在一个绿色线程中运行方法reap_container来实现删除container的相关数据;
下一篇博客将继续swift-account-reaper的分析工作。
|