感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。
接续上篇:
转到3.2,来看方法audit_container的实现:
def audit_container(self, account, name, recurse=False):
"""
指定container的审计验证,并实现递归验证container下每个object;
"""
if (account, name) in self.in_progress:
self.in_progress[(account, name)].wait()
if (account, name) in self.list_cache:
return self.list_cache[(account, name)]
self.in_progress[(account, name)] = Event()
print 'Auditing container "%s"' % name
# # 指定指定account下的容器具体路径;
path = '/%s/%s' % (account, name)
# 获取指定account下的容器列表;
account_listing = self.audit_account(account)
consistent = True
if name not in account_listing:
consistent = False
print " Container %s not in account listing!" % path
# 获取指定name容器的所有副本的相关节点和分区号;
# 获取account/container/object所对应的分区号和节点(可能是多个,因为分区副本有多个,可能位于不同的节点上);
# 返回元组(分区,节点信息列表);
# 在节点信息列表中至少包含id、weight、zone、ip、port、device、meta;
part, nodes = self.container_ring.get_nodes(account, name.encode('utf-8'))
rec_d = {}
responses = {}
for node in nodes:
marker = ''
results = True
while results:
try:
conn = http_connect(node['ip'], node['port'],
node['device'], part, 'GET',
path.encode('utf-8'), {},
'format=json&marker=%s' %
quote(marker.encode('utf-8')))
# 获取来自服务器的响应;
resp = conn.getresponse()
if resp.status // 100 != 2:
self.container_not_found += 1
consistent = False
print(' Bad status GETting container "%s" on %s/%s' %
(path, node['ip'], node['device']))
break
if node['id'] not in responses:
responses[node['id']] = dict(resp.getheaders())
results = simplejson.loads(resp.read())
except Exception:
self.container_exceptions += 1
consistent = False
print ' Exception GETting container "%s" on %s/%s' % \
(path, node['ip'], node['device'])
break
if results:
marker = results[-1]['name']
for obj in results:
obj_name = obj['name']
if obj_name not in rec_d:
rec_d[obj_name] = obj
if (obj['last_modified'] !=
rec_d[obj_name]['last_modified']):
self.container_obj_mismatch += 1
consistent = False
print(" Different versions of %s/%s "
"in container dbs." % (name, obj['name']))
if (obj['last_modified'] >
rec_d[obj_name]['last_modified']):
rec_d[obj_name] = obj
obj_counts = [int(header['x-container-object-count'])
for header in responses.values()]
if not obj_counts:
consistent = False
print " Failed to fetch container %s at all!" % path
else:
if len(set(obj_counts)) != 1:
self.container_count_mismatch += 1
consistent = False
print " Container databases don't agree on number of objects."
print " Max: %s, Min: %s" % (max(obj_counts), min(obj_counts))
self.containers_checked += 1
self.list_cache[(account, name)] = rec_d
self.in_progress[(account, name)].send(True)
del self.in_progress[(account, name)]
# 递归验证container下每个object;
if recurse:
for obj in rec_d.keys():
self.pool.spawn_n(self.audit_object, account, name, obj)
if not consistent and self.error_file:
print >>open(self.error_file, 'a'), path
return rec_d
3.2.1 获取指定account下的容器具体路径;
转到3.3,来看方法audit_account的实现:
def audit_account(self, account, recurse=False):
"""
指定account的审计验证,并实现递归验证account下每个container,并且进一步实现递归验证container下每个object;
"""
if account in self.in_progress:
self.in_progress[account].wait()
if account in self.list_cache:
return self.list_cache[account]
self.in_progress[account] = Event()
print 'Auditing account "%s"' % account
consistent = True
path = '/%s' % account
# 获取指定name账户的所有副本的相关节点和分区号;
# 获取account所对应的分区号和节点(可能是多个,因为分区副本有多个,可能位于不同的节点上);
# 返回元组(分区,节点信息列表);
# 在节点信息列表中至少包含id、weight、zone、ip、port、device、meta;
part, nodes = self.account_ring.get_nodes(account)
responses = {}
for node in nodes:
marker = ''
results = True
while results:
node_id = node['id']
try:
# 建立一个HTTPConnection类的对象;
# 并获取来自服务器的响应信息;
conn = http_connect(node['ip'], node['port'],
node['device'], part, 'GET', path, {},
'format=json&marker=%s' %
quote(marker.encode('utf-8')))
resp = conn.getresponse()
if resp.status // 100 != 2:
self.account_not_found += 1
consistent = False
print(" Bad status GETting account '%s' "
" from %ss:%ss" %
(account, node['ip'], node['device']))
break
results = simplejson.loads(resp.read())
except Exception:
self.account_exceptions += 1
consistent = False
print(" Exception GETting account '%s' on %ss:%ss" %
(account, node['ip'], node['device']))
break
if node_id not in responses:
responses[node_id] = [dict(resp.getheaders()), []]
responses[node_id][1].extend(results)
if results:
marker = results[-1]['name']
headers = [resp[0] for resp in responses.values()]
cont_counts = [int(header['x-account-container-count'])
for header in headers]
if len(set(cont_counts)) != 1:
self.account_container_mismatch += 1
consistent = False
print(" Account databases for '%s' don't agree on"
" number of containers." % account)
if cont_counts:
print " Max: %s, Min: %s" % (max(cont_counts),
min(cont_counts))
obj_counts = [int(header['x-account-object-count'])
for header in headers]
if len(set(obj_counts)) != 1:
self.account_object_mismatch += 1
consistent = False
print(" Account databases for '%s' don't agree on"
" number of objects." % account)
if obj_counts:
print " Max: %s, Min: %s" % (max(obj_counts),
min(obj_counts))
containers = set()
for resp in responses.values():
containers.update(container['name'] for container in resp[1])
self.list_cache[account] = containers
self.in_progress[account].send(True)
del self.in_progress[account]
self.accounts_checked += 1
if recurse:
for container in containers:
self.pool.spawn_n(self.audit_container, account,
container, True)
if not consistent and self.error_file:
print >>open(self.error_file, 'a'), path
return containers
3.3.1 获取指定账户的具体路径;