感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。
概述部分:
对象更新守护进程适用于这种情况:
在系统故障或者是服务器高负荷的情况下,账户或容器的数据可能不会立即被更新或者出现数据更新失败的情况;
如果出现这种情况,该次更新将会在本地文件系统上被加入队列,然后更新器将会继续处理这些队列中的更新工作;
对象更新守护进程就是用于处理队列中对象更新任务操作;
注:
1 这些更新任务存储在本地目录/srv/node/device1(设备)/async_pending/下面,这里存储了每一个设备下所有属于这个设备的要执行的更新操作;
2 当然这个目录下面每个对象的更新任务都是独立的一个文件,如/srv/node/device1/async_pending/prefix****/update****/****;
这个守护进程的总体流程如下:
1 遍历本地节点上的所有设备,获取每一个设备;
2 针对每一个设备,嵌套遍历到目录/srv/node/device1/async_pending/prefix****/update****层次,需要执行的更新任务就存储在此;
调用方法process_object_update执行以下的操作:
2.1 获取一个对象的副本节点和分区号;
2.2 遍历一个对象的所有副本节点,针对每一个节点调用object_update方法,通过POST或DELETE方法实现对象的更新操作;
3 当执行完/srv/node/device1/async_pending/prefix****/update****指定的对象更新任务,删除这个文件;
源码解析部分:
下面是这部分代码的主要执行流程,代码中较重要的部分已经进行了相关的注释;
from swift.obj.updater import ObjectUpdater
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
if __name__ == '__main__':
conf_file, options = parse_options(once=True)
run_daemon(ObjectUpdater, conf_file, **options)
def run_once(self, *args, **kwargs):
"""
运行一次更新;
"""
self.logger.info(_('Begin object update single threaded sweep'))
begin = time.time()
self.successes = 0
self.failures = 0
# self.devices = /srv/node
# ls /srv/node/
# account.builder backups container.ring.gz object.builder
# account.ring.gz container.builder device1 object.ring.gz
for device in os.listdir(self.devices):
if self.mount_check and not ismount(os.path.join(self.devices, device)):
self.logger.increment('errors')
self.logger.warn(_('Skipping %s as it is not mounted'), device)
continue
# object_sweep:扫描device上的async pendings目录,遍历每一个prefix目录并执行升级;
# self.devices = /srv/node
# /srv/node/device1
# /srv/node/device2
self.object_sweep(os.path.join(self.devices, device))
elapsed = time.time() - begin
self.logger.info(_('Object update single threaded sweep completed: '
'%(elapsed).02fs, %(success)s successes, %(fail)s failures'),
{'elapsed': elapsed, 'success': self.successes, 'fail': self.failures})
dump_recon_cache({'object_updater_sweep': elapsed}, self.rcache, self.logger)
遍历每一个设备,调用方法object_sweep对其目录下的对象实现更新操作;
来看方法object_sweep的具体实现:
def object_sweep(self, device):
"""
扫描device上的async pendings目录,遍历每一个prefix目录并执行更新;
"""
start_time = time.time()
# async_pending = /srv/node/device1/async_pending
async_pending = os.path.join(device, ASYNCDIR)
if not os.path.isdir(async_pending):
return
for prefix in os.listdir(async_pending):
# 获取/srv/node/device1/async_pending/每一个目录的路径;
prefix_path = os.path.join(async_pending, prefix)
if not os.path.isdir(prefix_path):
continue
last_obj_hash = None
# 按照从大到小的顺序遍历prefix_path下的文件夹;
for update in sorted(os.listdir(prefix_path), reverse=True):
# update_path = prefix_path/update
update_path = os.path.join(prefix_path, update)
if not os.path.isfile(update_path):
continue
try:
obj_hash, timestamp = update.split('-')
except ValueError:
self.logger.increment('errors')
self.logger.error(_('ERROR async pending file with unexpected name %s') % (update_path))
continue
# 如果obj_hash = last_obj_hash,则删除update_path;
if obj_hash == last_obj_hash:
self.logger.increment("unlinks")
os.unlink(update_path)
else:
# process_object_update:执行更新object;
# device = /srv/node/device1
# update_path = /srv/node/device1/async_pending/prefix****/update****
self.process_object_update(update_path, device)
last_obj_hash = obj_hash
time.sleep(self.slowdown)
try:
os.rmdir(prefix_path)
except OSError:
pass
self.logger.timing_since('timing', start_time)
update_path = /srv/node/device1/async_pending/prefix/update 来看方法process_object_update的实现:
def process_object_update(self, update_path, device):
"""
执行更新object;
# device = /srv/node/device1
# update_path = /srv/node/device1/async_pending/prefix****/update****
"""
try:
update = pickle.load(open(update_path, 'rb'))
except Exception:
self.logger.exception(_('ERROR Pickle problem, quarantining %s'), update_path)
self.logger.increment('quarantines')
renamer(update_path, os.path.join(device, 'quarantined', 'objects', os.path.basename(update_path)))
return
successes = update.get('successes', [])
# get_container_ring:获取或建立container的环,且加载它;
# get_nodes:获取account/container/object所对应的分区号和节点(可能是多个,因为分区副本有多个,可能位于不同的节点上);
# 返回元组(分区,节点信息列表);
# 在节点信息列表中至少包含id、weight、zone、ip、port、device、meta;
part, nodes = self.get_container_ring().get_nodes(update['account'], update['container'])
# 明确当前对象的层次关系;
obj = '/%s/%s/%s' % (update['account'], update['container'], update['obj'])
success = True
new_successes = False
# 遍历对象的所有副本节点;
for node in nodes:
# 如果某个节点上的对象没有执行更新操作,则调用方法object_update执行更新操作;
if node['id'] not in successes:
# object_update:通过POST或DELETE方法执行container中的object的更新;
# update['op']:对象更新操作具体调用的方法(如POST或DELETE);
# 更新操作的相关信息保存在update['headers']中;
status = self.object_update(node, part, update['op'], obj, update['headers'])
if not is_success(status) and status != HTTP_NOT_FOUND:
success = False
else:
successes.append(node['id'])
new_successes = True
if success:
self.successes += 1
self.logger.increment('successes')
self.logger.debug(_('Update sent for %(obj)s %(path)s'), {'obj': obj, 'path': update_path})
self.logger.increment("unlinks")
os.unlink(update_path)
else:
self.failures += 1
self.logger.increment('failures')
self.logger.debug(_('Update failed for %(obj)s %(path)s'), {'obj': obj, 'path': update_path})
if new_successes:
update['successes'] = successes
write_pickle(update, update_path, os.path.join(device, 'tmp'))
1.读取指定的对象数据更新文件update_path = /srv/node/device1/async_pending/prefix****/update****;
转到3,来看方法object_update的具体实现:
def object_update(self, node, part, op, obj, headers):
"""
执行container中的object的更新;
"""
headers_out = headers.copy()
headers_out['user-agent'] = 'obj-updater %s' % os.getpid()
try:
# 通过POST或DELETE方法实现对象的更新操作;
with ConnectionTimeout(self.conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part, op, obj, headers_out)
with Timeout(self.node_timeout):
resp = conn.getresponse()
resp.read()
return resp.status
except (Exception, Timeout):
self.logger.exception(_('ERROR with remote server %(ip)s:%(port)s/%(device)s'), node)
return HTTP_INTERNAL_SERVER_ERROR