有时同一数据的各个副本之间会出现不一致的情况,比如更新一个Object时,依照NWR策略,只要有两个副本更新成功,这个更新操作就被认为是成功,剩下的那个没有更新成功的副本就会与其他两个副本不一致。那么就需要有一种机制来保证各个副本之间的一致性。
Swift引入了三种后台进程来解决数据的一致性问题:Auditor、Update、Replicator。
Auditor负责数据的审计,通过持续的扫描磁盘来检查Account、Container和Object的完整性,如果发现数据有所损坏,Auditor就会对文件进行隔离,然后从其他节点上获取一份完整的副本来取代,而这个复制副本的任务由Replicator来完成。另外,在Ring的rebalance操作中,需要Replicator来完成实际的数据迁移工作,在Object删除的时候,也是由Replicator来完成实际的删除操作。
Updater则负责处理那些因为负荷不足等原因而失败的Account或Container更新操作。Updater会扫描本地节点上的Container或Object数据,然后检查相应的Account或Container节点上是否存在这些数据的记录,如果没有的话就将这些数据的记录推送到该Account或Container节点上。只有Container和Object有对应的Updater进程,并不存在Account的Updater进程。
这三种进程实现过程类似,这里以Account的Replicator进程为例。
Swift中存在着两种Replicator:一种是Database Replicator,针对Account和Container这两种以数据库形式存在的数据,另外一种是Object Replicator,服务于Object数据。
Account的Replicator进程起点为/bin/swift-account-replicator,工作流程中的关键部分如下描述:
1 使用swift.common.daemon.run_deamon()创建后台进程
与Account Server、Container Server、Object Server与Proxy Server等通过run_wsgi()函数来启动WSGI Server不同,Swift的其他后台进程是使用run_daemon()函数来创建的。
对Account的Replicator进程来说,它对应的实现类swift.account.replicator.AccountReplicator继承自类swift.common.db_replicator.Replicator,而这个类又是swift.common.daemon.Daemon子类,并实现了run_once()以及run_forever()等方法来完成数据库文件的复制。
#swift/account/replicator.py
class AccountReplicator(db_replicator.Replicator):
server_type = 'account'
brokerclass = AccountBroker
datadir = DATADIR
default_port = 6002
#swift/common/db_replicator.py
class Replicator(Daemon):
"""
Implements the logic for directing db replication.
"""
#swift/common/daemon.py
class Daemon(object):
"""Daemon base class"""
def __init__(self, conf):
self.conf = conf
self.logger = utils.get_logger(conf, log_route='daemon')
def run_once(self, *args, **kwargs):
"""Override this to run the script once"""
raise NotImplementedError('run_once not implemented')
def run_forever(self, *args, **kwargs):
"""Override this to run forever"""
raise NotImplementedError('run_forever not implemented')
def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):
2 Replicator进程的主要工作由run_once()完成
# swift/common/db_replicator.py
def run_once(self, *args, **kwargs):
"""Run a replication pass once."""
self._zero_stats()
dirs = []
ips = whataremyips()
if not ips:
self.logger.error(_('ERROR Failed to get my own IPs?'))
return
self._local_device_ids = set()
#从Ring上获取所有设备,遍历并判断是否为本地设备,如果是则将该设备对应的
#datadir=/srv/node/node['device']/accounts和node[id]作为元素存在字典dirs中。
for node in self.ring.devs:
if (node and node['replication_ip'] in ips and
node['replication_port'] == self.port):
if self.mount_check and not ismount(
os.path.join(self.root, node['device'])):
self.logger.warn(
_('Skipping %(device)s as it is not mounted') % node)
continue
unlink_older_than(
os.path.join(self.root, node['device'], 'tmp'),
time.time() - self.reclaim_age)
datadir = os.path.join(self.root, node['device'], self.datadir)
if os.path.isdir(datadir):
self._local_device_ids.add(node['id'])
dirs.append((datadir, node['id']))
self.logger.info(_('Beginning replication run'))
#遍历node['device']/accounts下的每一个文件object_file
#(这个目录中具体partition以下以.db为后缀的文件,比如partition/suffix/*.db),并调用
#_replicate_object()复制本地指定partition中的数据到指定节点,从而
#实现各个副本之间的同步。
for part, object_file, node_id in roundrobin_datadirs(dirs):
self.cpool.spawn_n(
self._replicate_object, part, object_file, node_id)
self.cpool.waitall()
self.logger.info(_('Replication run OVER'))
self._report_stats()
下面看看/srv/node/node['device']/accounts具体目录分布
[root@localhost swiftloopback]# ll
total 32
drwxr-xr-x. 4 swift swift 4096 Apr 1 19:30 accounts
drwxr-xr-x. 5 swift swift 4096 Apr 1 19:30 containers
drwx------. 2 swift swift 16384 Mar 17 21:24 lost+found
drwxr-xr-x. 8 swift swift 4096 Apr 4 18:53 objects
drwxr-xr-x. 2 swift swift 4096 Apr 1 19:42 tmp
[root@localhost swiftloopback]# cd accounts/
[root@localhost accounts]# ll
total 8
drwxr-xr-x. 3 swift swift 4096 Mar 18 15:31 253910
drwxr-xr-x. 3 swift swift 4096 Apr 1 19:30 55860
[root@localhost accounts]# pwd
/srv/node/swiftloopback/accounts
[root@localhost accounts]# cd 253910/
[root@localhost 253910]# ll
total 4
drwxr-xr-x. 3 swift swift 4096 Mar 18 15:31 33a
[root@localhost 253910]# cd 33a
[root@localhost 33a]# ll
total 4
drwxr-xr-x. 2 swift swift 4096 Apr 1 19:31 f7f5b0d5494ca6dc001d96161cd8c33a
[root@localhost 33a]# cd f7f5b0d5494ca6dc001d96161cd8c33a/
[root@localhost f7f5b0d5494ca6dc001d96161cd8c33a]# ll
total 20
-rw-------. 1 swift swift 17408 Apr 1 19:31 f7f5b0d5494ca6dc001d96161cd8c33a.db
-rw-r--r--. 1 swift swift 0 Apr 1 19:31 f7f5b0d5494ca6dc001d96161cd8c33a.db.pending
3 具体的复制逻辑由_replicate_object()完成:
_replicate_object()首先获取该Partition所在的所有存储节点,并依次向这些目标节点发送HTTP REPLICATE复制请求,实现本地文件到远程指定节点的同步操作(采用push模式,而不是pull模式)。
收到响应后,通过比较Hash值和同步点来判断复制后的副本是否一致,即复制操作是否成功。如果判断不成功,则需要比较两个副本的差异程度,如果超过50%,即意味着差异比较大,则通过命令rsync实现全部数据同步,否则,只是发送自上一次同步以来的所有数据变化来实现两个副本的一致。