感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。
概述部分:
指定账户下到期对象的处理;
指定账户下到期对象的处理主要实现寻找并删除指定的账户下所有到期的对象和容器;
对于一个指定账户的到期对象处理流程主要如下:
1 在指定账户下,遍历查询所有过期的容器;
2 在指定账户下,遍历查询并删除所有过期的对象;
3 在指定账户下,删除所有过期容器;
这里定义的once=True,说明系统默认调用守护进程类Daemon中的run_once方法;
从而最终实现调用ObjectExpirer类中的run_once方法;
源码解析部分:
下面是这部分代码的主要执行流程,代码中较重要的部分已经进行了相关的注释;
from swift.common.daemon import run_daemon
from swift.common.utils import parse_options
from swift.obj.expirer import ObjectExpirer
from optparse import OptionParser
if __name__ == '__main__':
parser = OptionParser("%prog CONFIG [options]")
parser.add_option('--processes', dest='processes',
help="Number of processes to use to do the work, don't "
"use this option to do all the work in one process")
parser.add_option('--process', dest='process',
help="Process number for this process, don't use "
"this option to do all the work in one process, this "
"is used to determine which part of the work this "
"process should do")
conf_file, options = parse_options(parser=parser, once=True)
run_daemon(ObjectExpirer, conf_file, **options)
def run_once(self, *args, **kwargs):
"""
运行一次操作,实现寻找并删除指定的账户下所有到期的对象和容器;
1 在指定账户下,遍历查询所有过期的容器;
2 在指定账户下,遍历查询并删除所有过期的对象;
3 在指定账户下,删除所有过期容器;
"""
processes, process = self.get_process_values(kwargs)
# 建立一个线程池;
pool = GreenPool(self.concurrency)
containers_to_delete = []
self.report_first_time = self.report_last_time = time()
self.report_objects = 0
try:
self.logger.debug(_('Run begin'))
# get_account_info:为指定的account返回相关的(container_count, object_count);
containers, objects = self.swift.get_account_info(self.expiring_objects_account)
self.logger.info(_('Pass beginning; %s possible containers; %s '
'possible objects') % (containers, objects))
# 循环遍历账户下的容器列表;
# 实现删除指定账户下过期的对象;
for c in self.swift.iter_containers(self.expiring_objects_account):
# 容器名称;
container = c['name']
timestamp = int(container)
# 满足这个时间戳的条件说明这个容器没有过期,则跳出循环;
# 否则说明这个容器是过期的,把其添加到列表containers_to_delete中;
if timestamp > int(time()):
break
containers_to_delete.append(container)
# 循环遍历容器下的对象列表;
for o in self.swift.iter_objects(self.expiring_objects_account, container):
# 对象名称;
obj = o['name'].encode('utf8')
if processes > 0:
obj_process = int(hashlib.md5('%s/%s' % (container, obj)). hexdigest(), 16)
if obj_process % processes != process:
continue
timestamp, actual_obj = obj.split('-', 1)
timestamp = int(timestamp)
# 满足这个时间戳的条件说明这个对象没有过期,则跳出循环;
# 否则说明这个对象是过期的,需要执行删除操作;
if timestamp > int(time()):
break
# delete_object:删除一个指定的object;
pool.spawn_n(self.delete_object, actual_obj, timestamp, container, obj)
pool.waitall()
# 实现删除上面获取的所有过期的容器;
for container in containers_to_delete:
try:
# 实现删除指定容器的操作;
self.swift.delete_container(
self.expiring_objects_account,
container,
acceptable_statuses=(2, HTTP_NOT_FOUND, HTTP_CONFLICT))
except (Exception, Timeout) as err:
self.logger.exception(_('Exception while deleting container %s %s') % (container, str(err)))
self.logger.debug(_('Run end'))
self.report(final=True)
except (Exception, Timeout):
self.logger.exception(_('Unhandled exception'))
1.获取指定的到期账户下所有的容器和对象列表;
转到3,来看方法delete_object的实现:
def delete_object(self, actual_obj, timestamp, container, obj):
start_time = time()
try:
self.delete_actual_object(actual_obj, timestamp)
# 删除一个指定的object;
self.swift.delete_object(self.expiring_objects_account, container, obj)
self.report_objects += 1
self.logger.increment('objects')
except (Exception, Timeout) as err:
self.logger.increment('errors')
self.logger.exception(_('Exception while deleting object %s %s %s') % (container, obj, str(err)))
self.logger.timing_since('timing', start_time)
self.report()
def delete_object(self, account, container, obj, acceptable_statuses=(2, HTTP_NOT_FOUND)):
"""
删除一个指定的object;
"""
path = self.make_path(account, container, obj)
self.make_request('DELETE', path, {}, acceptable_statuses)
def delete_container(self, account, container, acceptable_statuses=(2, HTTP_NOT_FOUND)):
"""
实现删除指定容器的操作;
"""
path = self.make_path(account, container)
self.make_request('DELETE', path, {}, acceptable_statuses)