问题导读
1.哪个类实现复制指定分区数据到指定节点(用以实现数据副本之间的同步)?
2.复制指定分区数据到指定节点有哪些步骤?
3.法_repl_to_node的实现了什么?
概述部分:
实现复制指定分区(账户)数据到指定节点(用以实现数据副本之间的同步);
这里定义的once=True,说明系统默认调用守护进程类Daemon中的run_once方法;
从而最终实现调用Replicator类中的run_once方法;
注:账户之间同步数据主要就是对形如object_file = /srv/node/node['device']/accounts/partition/suffix/hsh****.db的数据库文件执行复制操作;
源码解析部分:
下面是这部分代码的主要执行流程,代码中较重要的部分已经进行了相关的注释;
- from swift.account.replicator import AccountReplicator
- from swift.common.utils import parse_options
- from swift.common.daemon import run_daemon
-
- if __name__ == '__main__':
- conf_file, options = parse_options(once=True)
- run_daemon(AccountReplicator, conf_file, **options)
复制代码
- class AccountReplicator(db_replicator.Replicator):
- server_type = 'account'
- brokerclass = AccountBroker
- datadir = DATADIR
- default_port = 6002
复制代码
- class Replicator(Daemon)----def run_once(self, *args, **kwargs):
- """
- 实现复制指定分区数据到指定节点(用以实现数据副本之间的同步);
- 数据类型可能是account或container或object;
- """
-
- # 初始化若干参数的操作;
- # self.stats = {'attempted': 0, 'success': 0, 'failure': 0, 'ts_repl': 0,
- # 'no_change': 0, 'hashmatch': 0, 'rsync': 0, 'diff': 0,
- # 'remove': 0, 'empty': 0, 'remote_merge': 0,
- # 'start': time.time(), 'diff_capped': 0}
- self._zero_stats()
- dirs = []
- ips = whataremyips()
- if not ips:
- self.logger.error(_('ERROR Failed to get my own IPs?'))
- return
-
- # 获取环上的设备信息;
- for node in self.ring.devs:
- if (node and node['replication_ip'] in ips and node['replication_port'] == self.port):
- if self.mount_check and not ismount(os.path.join(self.root, node['device'])):
- self.logger.warn(_('Skipping %(device)s as it is not mounted') % node)
- continue
-
- # 删除若干过期文件;
- unlink_older_than(
- os.path.join(self.root, node['device'], 'tmp'),
- time.time() - self.reclaim_age)
-
- datadir = os.path.join(self.root, node['device'], self.datadir)
- if os.path.isdir(datadir):
- dirs.append((datadir, node['id']))
-
- self.logger.info(_('Beginning replication run'))
- for part, object_file, node_id in roundrobin_datadirs(dirs):
-
- # _replicate_object:复制指定分区数据到指定节点(用以实现数据副本之间的同步),具体步骤如下;
- # 获取指定分区所在的所有节点nodes(一个分区可能对应多个节点,因为可能有多个副本);
- # 判断node_id是否在nodes的范围之内(这是合理的);
- # 循环实现数据到各个目标节点上(的分区)的复制操作;
- # 通过比较同步点和哈希值来判断复制后的两个版本是否是同步的,即复制操作是否成功;
- self.cpool.spawn_n(self._replicate_object, part, object_file, node_id)
- self.cpool.waitall()
- self.logger.info(_('Replication run OVER'))
- self._report_stats()
复制代码
1.for node in self.ring.devs:从环上获取所有设备,遍历并执行以下操作:
通过IP地址判断并获取属于本机的且已经挂载的设备,并存储设备对应的datadir = /srv/node/node['device']/accounts和node['id']作为元素储存在字典dirs中;
注:这里实际上就是获取属于本机的设备,且明确文件路径/srv/node/node['device']/accounts(对应于账户);
2.循环遍历node['device']/accounts下面的每一个文件object_file(文件路径形如object_file = /srv/node/node['device']/accounts/partition/suffix/hsh****.db,为账户中具体分区下的以.db为后缀的文件),调用方法_replicate_object实现复制本地指定分区数据到指定节点(用以实现数据副本之间的同步);
转到2,来看方法_replicate_object的实现:
- def _replicate_object(self, partition, object_file, node_id):
- """
- 复制指定分区数据到指定节点(用以实现数据副本之间的同步),具体步骤如下;
- 获取指定分区所在的所有节点nodes(一个分区可能对应多个节点,因为可能有多个副本);
- 判断node_id是否在nodes的范围之内(这是合理的);
- 循环实现数据到各个目标节点上(的分区)的复制操作;
- 通过比较同步点和哈希值来判断复制后的两个版本是否是同步的,即复制操作是否成功;
-
- object_file = /srv/node/node['device']/accounts/partition/suffix/hsh****.db
- """
- start_time = now = time.time()
- self.logger.debug(_('Replicating db %s'), object_file)
- self.stats['attempted'] += 1
- self.logger.increment('attempts')
- shouldbehere = True
-
- try:
- broker = self.brokerclass(object_file, pending_timeout=30)
- broker.reclaim(now - self.reclaim_age, now - (self.reclaim_age * 2))
- # 获取关于数据库复制需求的信息;
- info = broker.get_replication_info()
- full_info = broker.get_info()
- bpart = self.ring.get_part(full_info['account'], full_info.get('container'))
- if bpart != int(partition):
- partition = bpart
- # Important to set this false here since the later check only
- # checks if it's on the proper device, not partition.
- shouldbehere = False
- name = '/' + quote(full_info['account'])
- if 'container' in full_info:
- name += '/' + quote(full_info['container'])
- self.logger.error(
- 'Found %s for %s when it should be on partition %s; will '
- 'replicate out and remove.' % (object_file, name, bpart))
- except (Exception, Timeout) as e:
- if 'no such table' in str(e):
- self.logger.error(_('Quarantining DB %s'), object_file)
- quarantine_db(broker.db_file, broker.db_type)
- else:
- self.logger.exception(_('ERROR reading db %s'), object_file)
- self.stats['failure'] += 1
- self.logger.increment('failures')
- return
-
- # The db is considered deleted if the delete_timestamp value is greater
- # than the put_timestamp, and there are no objects.
- delete_timestamp = 0
- try:
- delete_timestamp = float(info['delete_timestamp'])
- except ValueError:
- pass
- put_timestamp = 0
- try:
- put_timestamp = float(info['put_timestamp'])
- except ValueError:
- pass
- if delete_timestamp < (now - self.reclaim_age) and delete_timestamp > put_timestamp and info['count'] in (None, '', 0, '0'):
- if self.report_up_to_date(full_info):
- self.delete_db(object_file)
- self.logger.timing_since('timing', start_time)
- return
- responses = []
-
- # 获取指定分区所在的所有节点(一个分区可能对应多个节点,因为可能有多个副本);
- nodes = self.ring.get_part_nodes(int(partition))
- if shouldbehere:
- shouldbehere = bool([n for n in nodes if n['id'] == node_id])
- # See Footnote [1] for an explanation of the repl_nodes assignment.
- i = 0
- while i < len(nodes) and nodes[i]['id'] != node_id:
- i += 1
- repl_nodes = nodes[i + 1:] + nodes[:i]
- more_nodes = self.ring.get_more_nodes(int(partition))
-
- # 实现数据到各个目标节点上(的分区)的复制操作;
- for node in repl_nodes:
- success = False
-
- # _repl_to_node:复制数据库文件到指定node;
- # 建立到目标分区的连接;
- # 实现一个HTTP REPLICATE复制请求;
- # 获取请求操作的响应信息;
- # 通过比较同步点和哈希值来判断复制后的两个副本是否是同步的,即复制操作是否成功;
- # 如果复制成功则直接返回True;
- try:
- success = self._repl_to_node(node, broker, partition, info)
- except DriveNotMounted:
- repl_nodes.append(more_nodes.next())
- self.logger.error(_('ERROR Remote drive not mounted %s'), node)
- except (Exception, Timeout):
- self.logger.exception(_('ERROR syncing %(file)s with node %(node)s'),
- {'file': object_file, 'node': node})
- self.stats['success' if success else 'failure'] += 1
- self.logger.increment('successes' if success else 'failures')
- responses.append(success)
- if not shouldbehere and all(responses):
- # If the db shouldn't be on this node and has been successfully
- # synced to all of its peers, it can be removed.
- self.delete_db(object_file)
- self.logger.timing_since('timing', start_time)
复制代码
2.1.获取指定文件object_file = /srv/node/node['device']/accounts/partition/suffix/hsh****.db所处在的分区;
2.2.获取上面的分区所在的所有节点nodes(一个分区可能对应多个节点,因为可能有多个副本);
2.3.循环遍历所有副本节点(除去本节点),调用方法_repl_to_node实现复制本地数据到副本节点上;
2.4.当针对所有副本节点的数据同步操作都完成之后,并且判断数据不需要再存储到本地,则执行操作实现删除本地的数据文件;
继续
转到2.3,来看方法_repl_to_node的实现:
- def _repl_to_node(self, node, broker, partition, info):
- """
- 复制数据库文件到指定node;
- 建立到目标分区的连接;
- 实现一个HTTP REPLICATE复制请求;
- 获取请求操作的响应信息;
- 通过比较同步点和哈希值来判断复制后的两个副本是否是同步的,即复制操作是否成功;
- 如果复制成功则直接返回True;
- """
- with ConnectionTimeout(self.conn_timeout):
- http = self._http_connect(node, partition, broker.db_file)
- if not http:
- self.logger.error(_('ERROR Unable to connect to remote server: %s'), node)
- return False
-
- # 实现一个HTTP REPLICATE复制请求;
- with Timeout(self.node_timeout):
- response = http.replicate(
- 'sync', info['max_row'], info['hash'], info['id'],
- info['created_at'], info['put_timestamp'],
- info['delete_timestamp'], info['metadata'])
-
- # 若干异常处理;
- if not response:
- return False
- elif response.status == HTTP_NOT_FOUND: # completely missing, rsync
- self.stats['rsync'] += 1
- self.logger.increment('rsyncs')
- return self._rsync_db(broker, node, http, info['id'])
- elif response.status == HTTP_INSUFFICIENT_STORAGE:
- raise DriveNotMounted()
-
- # 响应状态说明了复制操作完成;
- # 通过比较同步点和哈希值来判断复制后的两个副本是否是同步的,即复制操作是否成功;
- # 如果复制成功则直接返回True;
- elif response.status >= 200 and response.status < 300:
- rinfo = simplejson.loads(response.data)
- local_sync = broker.get_sync(rinfo['id'], incoming=False)
-
- # 比较rinfo(远程复制数据信息)和info(本地复制数据信息)的同步点和哈希值,
- # 来判断完成复制操作的两个副本间是否是同步的;
- if self._in_sync(rinfo, info, broker, local_sync):
- return True
-
- # if the difference in rowids between the two differs by
- # more than 50%, rsync then do a remote merge.
- if rinfo['max_row'] / float(info['max_row']) < 0.5:
- self.stats['remote_merge'] += 1
- self.logger.increment('remote_merges')
- return self._rsync_db(broker, node, http, info['id'],
- replicate_method='rsync_then_merge',
- replicate_timeout=(info['count'] / 2000))
-
- # else send diffs over to the remote server
- return self._usync_db(max(rinfo['point'], local_sync), broker, http, rinfo['id'], info['id'])
复制代码
2.3.1.调用方法replicate实现通过HTTP协议调用REPLICATE方法,实现本地指定文件到远程指定节点的同步操作,并获取相应信息;
注:这里具体还有很多细节,就不进行进一步解析了;
2.3.2.通过上述的响应信息判断同步操作是否完成,如果完成,则进一步比较rinfo(远程复制数据信息)和info(本地复制数据信息)的同步点和哈希值,来判断完成复制操作的两个副本间是否是同步的,如果同步,则说明复制操作成功,直接返回True;
2.3.3.如果同步操作不成功,则通过rinfo['max_row']/float(info['max_row'])的比值判断,如果远程节点数据和本地复制数据差异超过50%,说明数据差异较大,则调用方法_rsync_db通过命令rsync实现全部数据的同步;
2.3.4.通过rinfo['max_row']/float(info['max_row'])的比值判断,如果远程节点数据和本地复制数据差异没有超过50%,说明数据差异较小,则调用方法_usync_db实现数据的同步;
转到2.3.3,来看方法_rsync_db的实现:
- def _rsync_db(self, broker, device, http, local_id, replicate_method='complete_rsync', replicate_timeout=None):
- """
- 通过命令rsync实现节点间全部数据的同步;
- """
- device_ip = rsync_ip(device['replication_ip'])
- if self.vm_test_mode:
- remote_file = '%s::%s%s/%s/tmp/%s' % (device_ip, self.server_type, device['replication_port'], device['device'], local_id)
- else:
- remote_file = '%s::%s/%s/tmp/%s' % (device_ip, self.server_type, device['device'], local_id)
- mtime = os.path.getmtime(broker.db_file)
- if not self._rsync_file(broker.db_file, remote_file):
- return False
- # perform block-level sync if the db was modified during the first sync
- if os.path.exists(broker.db_file + '-journal') or os.path.getmtime(broker.db_file) > mtime:
- # grab a lock so nobody else can modify it
- with broker.lock():
- if not self._rsync_file(broker.db_file, remote_file, False):
- return False
- with Timeout(replicate_timeout or self.node_timeout):
- response = http.replicate(replicate_method, local_id)
- return response and response.status >= 200 and response.status
复制代码
2.3.3.1 第一次调用方法_rsync_file,通过应用命令rsync,实现两个节点间数据的同步,这里设置whole_file=True,说明进行了全数据的复制操作;
2.3.3.2.如果文件路径下存在以-journal为后缀的文件,说明在第一次数据同步的过程中,数据文件有被修改;所以第二次调用方法_rsync_file,但是这里对文件处理操作加锁,以防止在数据同步的过程中,数据文件再被修改;这里设置参数whole_file=Flase,说明没有进行全数据的复制操作,而是进行了差异部分的数据复制操作;
2.3.3.3.在执行完成数据同步操作之后,调用方法replicate实现通过HTTP推送REPLICATE方法,进而调用设定的rsync_then_merge方法,实现部分+部分地从container数据表中整合相关属性信息到container数据表中(这里有些细节需要进一步理解);
来看方法_rsync_file的实现:
- def _rsync_file(self, db_file, remote_file, whole_file=True):
- """
- 通过应用命令rsync,实现两个节点间数据的同步;
- """
- popen_args = ['rsync', '--quiet', '--no-motd',
- '--timeout=%s' % int(math.ceil(self.node_timeout)),
- '--contimeout=%s' % int(math.ceil(self.conn_timeout))]
- if whole_file:
- popen_args.append('--whole-file')
- popen_args.extend([db_file, remote_file])
- proc = subprocess.Popen(popen_args)
- proc.communicate()
- if proc.returncode != 0:
- self.logger.error(_('ERROR rsync failed with %(code)s: %(args)s'),
- {'code': proc.returncode, 'args': popen_args})
- return proc.returncode == 0
复制代码
注:(1)可见当命令行组成之后,将会调用方法communicate实现命令行的远程执行;
(2)命令rsync常用于节点间的数据同步和备份操作;rsync命令有特性:第一次同步时rsync会复制全部内容,但在下一次只传输修改过的文件,这样的效率就比较高的;
(3)whole_file参数决定了是否进行全数据的复制操作;
再来看方法rsync_then_merge的实现:
- def rsync_then_merge(self, drive, db_file, args):
- old_filename = os.path.join(self.root, drive, 'tmp', args[0])
-
- if not os.path.exists(db_file) or not os.path.exists(old_filename):
- return HTTPNotFound()
-
- new_broker = self.broker_class(old_filename)
- existing_broker = self.broker_class(db_file)
-
- point = -1
- objects = existing_broker.get_items_since(point, 1000)
- while len(objects):
- new_broker.merge_items(objects)
- point = objects[-1]['ROWID']
- objects = existing_broker.get_items_since(point, 1000)
- sleep()
- new_broker.newid(args[0])
- renamer(old_filename, db_file)
- return HTTPNoContent()
复制代码
- def get_items_since(self, start, count):
- self._commit_puts_stale_ok()
- with self.get() as conn:
- curs = conn.execute('''
- SELECT * FROM %s WHERE ROWID > ? ORDER BY ROWID ASC LIMIT ?
- ''' % self.db_contains_type, (start, count))
- curs.row_factory = dict_factory
- return [r for r in curs]
复制代码
- def merge_items(self, item_list, source=None):
- """
- 整合指定账户下container数据表中的相关属性信息到目标container数据表中;
- """
- with self.get() as conn:
- max_rowid = -1
- for rec in item_list:
- record = [rec['name'], rec['put_timestamp'],
- rec['delete_timestamp'], rec['object_count'],
- rec['bytes_used'], rec['deleted']]
- query = '''''
- SELECT name, put_timestamp, delete_timestamp,object_count, bytes_used, deleted
- FROM container WHERE name = ?'''
-
- if self.get_db_version(conn) >= 1:
- query += ' AND deleted IN (0, 1)'
- curs = conn.execute(query, (rec['name'],))
- curs.row_factory = None
- row = curs.fetchone()
- if row:
- row = list(row)
- for i in xrange(5):
- if record[i] is None and row[i] is not None:
- record[i] = row[i]
- if row[1] > record[1]: # Keep newest put_timestamp
- record[1] = row[1]
- if row[2] > record[2]: # Keep newest delete_timestamp
- record[2] = row[2]
- # If deleted, mark as such
- if record[2] > record[1] and record[3] in (None, '', 0, '0'):
- record[5] = 1
- else:
- record[5] = 0
- conn.execute('''''DELETE FROM container WHERE name = ? AND deleted IN (0, 1)''', (record[0],))
- conn.execute('''''INSERT INTO container (name, put_timestamp, delete_timestamp, object_count, bytes_used, deleted) VALUES (?, ?, ?, ?, ?, ?)''', record)
- if source:
- max_rowid = max(max_rowid, rec['ROWID'])
-
- if source:
- try:
- conn.execute('''''INSERT INTO incoming_sync (sync_point, remote_id) VALUES (?, ?)''', (max_rowid, source))
- except sqlite3.IntegrityError:
- conn.execute('''''UPDATE incoming_sync SET sync_point=max(?, sync_point) WHERE remote_id=?''', (max_rowid, source))
- conn.commit()
复制代码
转到2.3.4,来看方法_usync_db的实现:
- def _usync_db(self, point, broker, http, remote_id, local_id):
- """
- 由于节点间数据差异不超过50%,所有通过发送自从上一次同步操作以来的所有数据变化记录来实现节点间的数据同步操作;
- """
- self.stats['diff'] += 1
- self.logger.increment('diffs')
- self.logger.debug(_('Syncing chunks with %s'), http.host)
- sync_table = broker.get_syncs()
- objects = broker.get_items_since(point, self.per_diff)
- diffs = 0
- while len(objects) and diffs < self.max_diffs:
- diffs += 1
- with Timeout(self.node_timeout):
- response = http.replicate('merge_items', objects, local_id)
- if not response or response.status >= 300 or response.status < 200:
- if response:
- self.logger.error(_('ERROR Bad response %(status)s from %(host)s'),
- {'status': response.status, 'host': http.host})
- return False
-
- point = objects[-1]['ROWID']
- objects = broker.get_items_since(point, self.per_diff)
-
- if objects:
- self.logger.debug(_('Synchronization for %s has fallen more than '
- '%s rows behind; moving on and will try again next pass.'),
- broker, self.max_diffs * self.per_diff)
- self.stats['diff_capped'] += 1
- self.logger.increment('diff_caps')
- else:
- with Timeout(self.node_timeout):
- response = http.replicate('merge_syncs', sync_table)
- if response and response.status >= 200 and response.status < 300:
- broker.merge_syncs([{'remote_id': remote_id, 'sync_point': point}],
- incoming=False)
- return True
- return False
复制代码
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
|