感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。
概述:
这篇博客主要关注swift-proxy与swift-container服务中PUT,POST,DELETE,GET,HEAD等方法的对应调用实现;
源码解析部分(代码中较重要的部分已经进行了相关的注释):
GETorHEAD
/swift/proxy/controllers/container.py----class ContainerController(Controller)----def GETorHEAD
def GETorHEAD(self, req):
"""Handler for HTTP GET/HEAD requests."""
if not self.account_info(self.account_name, req)[1]:
return HTTPNotFound(request=req)
# 获取指定container的对应分区号;
part = self.app.container_ring.get_part(self.account_name, self.container_name)
resp = self.GETorHEAD_base(
req, _('Container'), self.app.container_ring, part,
req.swift_entity_path)
if 'swift.authorize' in req.environ:
req.acl = resp.headers.get('x-container-read')
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
if not req.environ.get('swift_owner', False):
for key in self.app.swift_owner_headers:
if key in resp.headers:
del resp.headers[key]
return resp
def HEAD(self, req):
"""
HEAD请求返回container的基本信息(元数据信息),并以key-value的形式保存在HTTPHEAD中返回;
"""
# 从原始请求中获取drive, part, account, container, obj等信息;
drive, part, account, container, obj = split_and_validate_path(req, 4, 5, True)
out_content_type = get_listing_content_type(req)
# mount_check是是否进行mount检查;
# 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
# 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
broker = self._get_container_broker(drive, part, account, container,
pending_timeout=0.1,
stale_reads_ok=True)
if broker.is_deleted():
return HTTPNotFound(request=req)
# 获取全局数据;
# 返回包括account, created_at, put_timestamp, delete_timestamp, container_count, object_count, bytes_used, hash, id等值的字典;
info = broker.get_info()
headers = {
'X-Container-Object-Count': info['object_count'],
'X-Container-Bytes-Used': info['bytes_used'],
'X-Timestamp': info['created_at'],
'X-PUT-Timestamp': info['put_timestamp'],
}
headers.update(
(key, value)
for key, (value, timestamp) in broker.metadata.iteritems()
if value != '' and (key.lower() in self.save_headers or
is_sys_or_user_meta('container', key)))
headers['Content-Type'] = out_content_type
return HTTPNoContent(request=req, headers=headers, charset='utf-8')
/swift/container/server.py----class ContainerController(object)----def GET
def GET(self, req):
"""
处理HTTP协议的GET请求;
GET同HEAD一样,都是请求返回container的基本信息,并以key-value的形式保存在HTTPHEAD当中;
不同之处在于GET方法中获取了指定container下的object列表,存储在body中,同HTTPHEAD一同返回;
"""
# 从原始请求中获取drive, part, account, container, obj等信息;
drive, part, account, container, obj = split_and_validate_path(req, 4, 5, True)
path = get_param(req, 'path')
prefix = get_param(req, 'prefix')
delimiter = get_param(req, 'delimiter')
# mount_check是是否进行mount检查;
# 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):
# delimiters can be made more flexible later
return HTTPPreconditionFailed(body='Bad delimiter')
marker = get_param(req, 'marker', '')
end_marker = get_param(req, 'end_marker')
limit = CONTAINER_LISTING_LIMIT
given_limit = get_param(req, 'limit')
if given_limit and given_limit.isdigit():
limit = int(given_limit)
if limit > CONTAINER_LISTING_LIMIT:
return HTTPPreconditionFailed(
request=req,
body='Maximum limit is %d' % CONTAINER_LISTING_LIMIT)
out_content_type = get_listing_content_type(req)
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
# 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
broker = self._get_container_broker(drive, part, account, container,
pending_timeout=0.1,
stale_reads_ok=True)
if broker.is_deleted():
return HTTPNotFound(request=req)
# 获取全局数据;
# 返回包括account, created_at, put_timestamp, delete_timestamp, container_count, object_count, bytes_used, hash, id等值的字典;
info = broker.get_info()
resp_headers = {
'X-Container-Object-Count': info['object_count'],
'X-Container-Bytes-Used': info['bytes_used'],
'X-Timestamp': info['created_at'],
'X-PUT-Timestamp': info['put_timestamp'],
}
for key, (value, timestamp) in broker.metadata.iteritems():
if value and (key.lower() in self.save_headers or is_sys_or_user_meta('container', key)):
resp_headers[key] = value
ret = Response(request=req, headers=resp_headers, content_type=out_content_type, charset='utf-8')
# 获取objects排序列表;
container_list = broker.list_objects_iter(limit, marker, end_marker, prefix, delimiter, path)
if out_content_type == 'application/json':
et.body = json.dumps([self.update_data_record(record)
for record in container_list])
elif out_content_type.endswith('/xml'):
doc = Element('container', name=container.decode('utf-8'))
for obj in container_list:
record = self.update_data_record(obj)
if 'subdir' in record:
name = record['subdir'].decode('utf-8')
sub = SubElement(doc, 'subdir', name=name)
SubElement(sub, 'name').text = name
else:
obj_element = SubElement(doc, 'object')
for field in ["name", "hash", "bytes", "content_type", "last_modified"]:
SubElement(obj_element, field).text = str(record.pop(field)).decode('utf-8')
for field in sorted(record):
SubElement(obj_element, field).text = str(record[field]).decode('utf-8')
ret.body = tostring(doc, encoding='UTF-8').replace(
"<?xml version='1.0' encoding='UTF-8'?>",
'<?xml version="1.0" encoding="UTF-8"?>', 1)
else:
if not container_list:
return HTTPNoContent(request=req, headers=resp_headers)
ret.body = '\n'.join(rec[0] for rec in container_list) + '\n'
return ret
/swift/proxy/controllers/container.py----class ContainerController(Controller)----def PUT
def PUT(self, req):
"""HTTP PUT request handler."""
error_response = self.clean_acls(req) or check_metadata(req, 'container')
if error_response:
return error_response
if not req.environ.get('swift_owner'):
for key in self.app.swift_owner_headers:
req.headers.pop(key, None)
if len(self.container_name) > MAX_CONTAINER_NAME_LENGTH:
resp = HTTPBadRequest(request=req)
esp.body = 'Container name length of %d longer than %d' % (len(self.container_name), MAX_CONTAINER_NAME_LENGTH)
return resp
# 获取container所属account的信息,返回分区号、account的副本节点和container数目;
account_partition, accounts, container_count = self.account_info(self.account_name, req)
if not accounts and self.app.account_autocreate:
self.autocreate_account(req.environ, self.account_name)
account_partition, accounts, container_count = self.account_info(self.account_name, req)
if not accounts:
return HTTPNotFound(request=req)
if self.app.max_containers_per_account > 0 and \
container_count >= self.app.max_containers_per_account and \
self.account_name not in self.app.max_containers_whitelist:
resp = HTTPForbidden(request=req)
resp.body = 'Reached container limit of %s' % self.app.max_containers_per_account
return resp
# 获取指定container的分区号和所有副本节点;
container_partition, containers = self.app.container_ring.get_nodes(self.account_name, self.container_name)
headers = self._backend_requests(req, len(containers), account_partition, accounts)
# 清除memcache和env中的缓存信息;
clear_info_cache(self.app, req.environ, self.account_name, self.container_name)
resp = self.make_requests(
req, self.app.container_ring,
container_partition, 'PUT', req.swift_entity_path, headers)
return resp
def PUT(self, req):
"""
如果包含object信息,根据元数据在数据库中实现建立一个object;
如果不包含object信息,实现更新container的数据库中的元数据信息,并调用account_update通知account-server更新状态;
"""
# 从原始请求中获取drive, part, account, container, obj等信息;
drive, part, account, container, obj = split_and_validate_path(req, 4, 5, True)
if 'x-timestamp' not in req.headers or not check_float(req.headers['x-timestamp']):
return HTTPBadRequest(body='Missing timestamp', request=req, content_type='text/plain')
if 'x-container-sync-to' in req.headers:
err, sync_to, realm, realm_key = validate_sync_to(
req.headers['x-container-sync-to'], self.allowed_sync_hosts,
self.realms_conf)
if err:
return HTTPBadRequest(err)
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
timestamp = normalize_timestamp(req.headers['x-timestamp'])
# 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
broker = self._get_container_broker(drive, part, account, container)
# 如果包含object信息,则序列化后写入db_file.pending文件;
# 如果包含object信息,根据元数据在数据库中实现建立一个object;
if obj: # put container object
if account.startswith(self.auto_create_account_prefix) and not os.path.exists(broker.db_file):
# 数据库初始化;
try:
broker.initialize(timestamp)
except DatabaseAlreadyExists:
pass
if not os.path.exists(broker.db_file):
return HTTPNotFound()
# 根据相关的metadata在数据库中建立一个object;
broker.put_object(obj, timestamp, int(req.headers['x-size']),
req.headers['x-content-type'],
req.headers['x-etag'])
return HTTPCreated(request=req)
# 如果不包含object信息,则根据request.head中的key-value更新container_statu数据库的metadata;
# 并调用account_update通知account-server更新状态;
# 如果不包含object信息,实现更新container的数据库中的元数据信息;
else: # put container
created = self._update_or_create(req, broker, timestamp)
metadata = {}
metadata.update(
(key, (value, timestamp))
for key, value in req.headers.iteritems()
if key.lower() in self.save_headers or
is_sys_or_user_meta('container', key))
if metadata:
if 'X-Container-Sync-To' in metadata:
if 'X-Container-Sync-To' not in broker.metadata or \
metadata['X-Container-Sync-To'][0] != \
broker.metadata['X-Container-Sync-To'][0]:
broker.set_x_container_sync_points(-1, -1)
# 更新数据库的元数据字典;
broker.update_metadata(metadata)
# 根据最新的container信息更新account服务;
# 用于在对container做删除/修改操作时通知其所属account做同步修改;
# 主要部分就是向account所在server_ip发送PUT请求,URL格式为:
# PUThttp://{account_ip}:{account_port}/{account_device}/{account_partition}/{account}/{container}
resp = self.account_update(req, account, container, broker)
if resp:
return resp
if created:
return HTTPCreated(request=req)
else:
return HTTPAccepted(request=req)
/swift/proxy/controllers/container.py----class ContainerController(Controller)----def POST
def POST(self, req):
"""HTTP POST request handler."""
error_response = self.clean_acls(req) or check_metadata(req, 'container')
if error_response:
return error_response
if not req.environ.get('swift_owner'):
for key in self.app.swift_owner_headers:
req.headers.pop(key, None)
# 获取container所属account的信息,返回分区号、account的副本节点和container数目;
account_partition, accounts, container_count = self.account_info(self.account_name, req)
if not accounts:
return HTTPNotFound(request=req)
# 获取指定container的分区号和所有副本节点;
container_partition, containers = self.app.container_ring.get_nodes(self.account_name, self.container_name)
headers = self.generate_request_headers(req, transfer=True)
clear_info_cache(self.app, req.environ, self.account_name, self.container_name)
resp = self.make_requests(
req, self.app.container_ring, container_partition, 'POST',
req.swift_entity_path, [headers] * len(containers))
return resp
def POST(self, req):
"""
实现更新container的元数据信息,从head中取出特定要求的metadata更新至指定container的数据库;
"""
# 从原始请求中获取drive, part, account, container, obj等信息;
drive, part, account, container = split_and_validate_path(req, 4)
if 'x-timestamp' not in req.headers or not check_float(req.headers['x-timestamp']):
return HTTPBadRequest(body='Missing or bad timestamp', request=req, content_type='text/plain')
if 'x-container-sync-to' in req.headers:
err, sync_to, realm, realm_key = validate_sync_to(
req.headers['x-container-sync-to'], self.allowed_sync_hosts,
self.realms_conf)
if err:
return HTTPBadRequest(err)
# mount_check是是否进行mount检查;
# 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
# 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
broker = self._get_container_broker(drive, part, account, container)
if broker.is_deleted():
return HTTPNotFound(request=req)
# 把timestamp(时间戳)转换为标准格式;
timestamp = normalize_timestamp(req.headers['x-timestamp'])
metadata = {}
metadata.update(
(key, (value, timestamp)) for key, value in req.headers.iteritems()
if key.lower() in self.save_headers or
is_sys_or_user_meta('container', key))
# 然后从head中取出特定要求的metadata更新至数据库;
if metadata:
if 'X-Container-Sync-To' in metadata:
if 'X-Container-Sync-To' not in broker.metadata or \
metadata['X-Container-Sync-To'][0] != \
broker.metadata['X-Container-Sync-To'][0]:
broker.set_x_container_sync_points(-1, -1)
broker.update_metadata(metadata)
return HTTPNoContent(request=req)
/swift/proxy/controllers/container.py----class ContainerController(Controller)----def DELETE
def DELETE(self, req):
"""HTTP DELETE request handler."""
# 获取container所属account的信息,返回分区号、account的副本节点和container数目;
account_partition, accounts, container_count = self.account_info(self.account_name, req)
if not accounts:
return HTTPNotFound(request=req)
# 获取指定container的分区号和所有副本节点;
container_partition, containers = self.app.container_ring.get_nodes(self.account_name, self.container_name)
headers = self._backend_requests(req, len(containers), account_partition, accounts)
clear_info_cache(self.app, req.environ, self.account_name, self.container_name)
resp = self.make_requests(
req, self.app.container_ring, container_partition, 'DELETE',
req.swift_entity_path, headers)
# Indicates no server had the container
if resp.status_int == HTTP_ACCEPTED:
return HTTPNotFound(request=req)
return resp
def DELETE(self, req):
"""
输入的URL格式为host:port/device/partition/account/container/<object>
如果没有object字段,说明是删除container,过程和Account的DELETE操作一样,
先进行一系列检查,然后根据db_file.pengding文件刷新数据库到最新状态并检查是否已经删除,
如果status字段不为DELETED,清空数据库中的metadata字段,
更新delete_timestamp然后置status字段为DELETED,
最后调用account_update通知其所属account更新状态;
如果URL中包含object字段,则是为了在对其所包含的Object进行操作后同步更新container,
这里会调用ContainerBroker#delete_object,同样也是将删除信息序列化后写入db_file.pending文件,待下次对该container操作时更新进数据库;
删除指定的container或者删除指定container中的指定object;
如果URL中没有object字段,说明是删除container;
如果URL中包含object字段,标志指定的object为deleted;
"""
# 从原始请求中获取drive, part, account, container, obj等信息;
drive, part, account, container, obj = split_and_validate_path(req, 4, 5, True)
if 'x-timestamp' not in req.headers or \
not check_float(req.headers['x-timestamp']):
return HTTPBadRequest(body='Missing timestamp', request=req, content_type='text/plain')
# mount_check是是否进行mount检查;
# 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
if self.mount_check and not check_mount(self.root, drive):
return HTTPInsufficientStorage(drive=drive, request=req)
# 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
broker = self._get_container_broker(drive, part, account, container)
# initialize:数据库初始化;
if account.startswith(self.auto_create_account_prefix) and obj and \
not os.path.exists(broker.db_file):
try:
broker.initialize(normalize_timestamp(req.headers.get('x-timestamp') or time.time()))
except DatabaseAlreadyExists:
pass
if not os.path.exists(broker.db_file):
return HTTPNotFound()
# 如果存在obj,则执行标志object状态为deleted的操作;
# delete_object:标志object状态为deleted;
# 如果URL中包含object字段,则是为了在对其所包含的Object进行操作后同步更新container,
# 这里会调用ContainerBroker#delete_object,同样也是将删除信息序列化后写入db_file.pending文件,
# 待下次对该container操作时更新进数据库;
# 标志指定的object为deleted;
if obj: # delete object
broker.delete_object(obj, req.headers.get('x-timestamp'))
return HTTPNoContent(request=req)
# 如果没有object字段,说明是删除container,过程和Account的DELETE操作一样,
# 先进行一系列检查,然后根据db_file.pengding文件刷新数据库到最新状态并检查是否已经删除,
# 如果status字段不为DELETED,清空数据库中的metadata字段,
# 更新delete_timestamp然后置status字段为DELETED,
# 最后调用account_update通知其所属account更新状态;
else:
# delete container
# 检测container DB是否为空;
if not broker.empty():
return HTTPConflict(request=req)
existed = float(broker.get_info()['put_timestamp']) and not broker.is_deleted()
# 对数据库中的对象进行删除状态的标记工作,并不会执行文件的删除工作;
broker.delete_db(req.headers['X-Timestamp'])
if not broker.is_deleted():
return HTTPConflict(request=req)
# 根据最新的container信息更新account服务;
# 用于在对container做删除/修改操作时通知其所属account做同步修改;
# 主要部分就是向account所在server_ip发送PUT请求,URL格式为:
# PUThttp://{account_ip}:{account_port}/{account_device}/{account_partition}/{account}/{container}
resp = self.account_update(req, account, container, broker)
if resp:
return resp
if existed:
return HTTPNoContent(request=req)
return HTTPNotFound()