感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!
如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn
PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。
概述:
这篇博客主要关注swift-proxy与swift-object服务中PUT,POST,DELETE,GET,HEAD等方法的对应调用实现;
源码解析部分(代码中较重要的部分已经进行了相关的注释):
GETorHEAD
/swift/proxy/controllers/obj.py----class ContainerController(Controller)----def GETorHEAD
def GETorHEAD(self, req):
"""
处理HTTP协议GET或者HEAD请求;
"""
# 获取指定object所属的container的信息;
container_info = self.container_info(self.account_name, self.container_name, req)
req.acl = container_info['read_acl']
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
# 获取指定object所对应的分区号;
partition = self.app.object_ring.get_part(self.account_name, self.container_name, self.object_name)
resp = self.GETorHEAD_base(
req, _('Object'), self.app.object_ring, partition,
req.swift_entity_path)
if ';' in resp.headers.get('content-type', ''):
# strip off swift_bytes from content-type
content_type, check_extra_meta = resp.headers['content-type'].rsplit(';', 1)
if check_extra_meta.lstrip().startswith('swift_bytes='):
resp.content_type = content_type
return resp
/swift/obj/server.py----class ContainerController(object)----def HEAD
def HEAD(self, request):
"""
检索返回一个object的metadata,同GET请求的处理方法几乎一致,唯一不同的是不在body中返回file;
"""
device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
try:
disk_file = self.get_diskfile(device, partition, account, container, obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
metadata = disk_file.read_metadata()
except (DiskFileNotExist, DiskFileQuarantined):
return HTTPNotFound(request=request, conditional_response=True)
response = Response(request=request, conditional_response=True)
response.headers['Content-Type'] = metadata.get('Content-Type', 'application/octet-stream')
for key, value in metadata.iteritems():
if is_user_meta('object', key) or key.lower() in self.allowed_headers:
response.headers[key] = value
response.etag = metadata['ETag']
ts = metadata['X-Timestamp']
response.last_modified = math.ceil(float(ts))
# Needed for container sync feature
response.headers['X-Timestamp'] = ts
response.content_length = int(metadata['Content-Length'])
try:
response.content_encoding = metadata['Content-Encoding']
except KeyError:
pass
return response
def GET(self, request):
"""
检索一个object对象,在response.heads中返回metadata,在response.body中返回objectdata,流程如下:
1.根据url中的信息新建DiskFile对象file,检查request.heads中的必要K-V,检查mount情况;
2.如果file#is_deleted或者file.metadata中'X-Delete-At'小于当前时间(表示已标记为准备删除)
或者通过file#get_data_file_size查看文件是否异常,如果已经删除或存在异常,返回404HTTPNotFound;
3.检查request.heads里的'If-match'和'If-none-match',前者检查file.metadata中的'ETag'是否与其一致确定所检索的文件,后者确定如果没有匹配的是否返回file的etag信息;
4.确定了需要操作的file,利用file的iterator,将其绑定response的构造函数参数app_iter,
并且将file.metadata写入response.heads中,并返回response;
"""
device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
keep_cache = self.keep_cache_private or (
'X-Auth-Token' not in request.headers and
'X-Storage-Token' not in request.headers)
try:
disk_file = self.get_diskfile(device, partition, account, container, obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
with disk_file.open():
metadata = disk_file.get_metadata()
obj_size = int(metadata['Content-Length'])
file_x_ts = metadata['X-Timestamp']
file_x_ts_flt = float(file_x_ts)
file_x_ts_utc = datetime.fromtimestamp(file_x_ts_flt, UTC)
if_unmodified_since = request.if_unmodified_since
if if_unmodified_since and file_x_ts_utc > if_unmodified_since:
return HTTPPreconditionFailed(request=request)
if_modified_since = request.if_modified_since
if if_modified_since and file_x_ts_utc <= if_modified_since:
return HTTPNotModified(request=request)
keep_cache = (self.keep_cache_private or
('X-Auth-Token' not in request.headers and
'X-Storage-Token' not in request.headers))
response = Response(
app_iter=disk_file.reader(keep_cache=keep_cache),
request=request, conditional_response=True)
response.headers['Content-Type'] = metadata.get('Content-Type', 'application/octet-stream')
for key, value in metadata.iteritems():
if is_user_meta('object', key) or key.lower() in self.allowed_headers:
response.headers[key] = value
response.etag = metadata['ETag']
response.last_modified = math.ceil(file_x_ts_flt)
response.content_length = obj_size
try:
response.content_encoding = metadata['Content-Encoding']
except KeyError:
pass
response.headers['X-Timestamp'] = file_x_ts
resp = request.get_response(response)
except (DiskFileNotExist, DiskFileQuarantined):
resp = HTTPNotFound(request=request, conditional_response=True)
return resp
/swift/proxy/controllers/obj.py----class ContainerController(Controller)----def POST
def POST(self, req):
"""
处理HTTP协议POST请求;
"""
# 计算预计删除对象时间???
if 'x-delete-after' in req.headers:
try:
x_delete_after = int(req.headers['x-delete-after'])
except ValueError:
return HTTPBadRequest(request=req,
content_type='text/plain',
body='Non-integer X-Delete-After')
req.headers['x-delete-at'] = normalize_delete_at_timestamp(time.time() + x_delete_after)
# 在object的实现方法中,系统默认以PUT方法来实现POST方法;
if self.app.object_post_as_copy:
req.method = 'PUT'
req.path_info = '/v1/%s/%s/%s' % (self.account_name, self.container_name, self.object_name)
req.headers['Content-Length'] = 0
req.headers['X-Copy-From'] = quote('/%s/%s' % (self.container_name, self.object_name))
req.headers['X-Fresh-Metadata'] = 'true'
req.environ['swift_versioned_copy'] = True
if req.environ.get('QUERY_STRING'):
req.environ['QUERY_STRING'] += '&multipart-manifest=get'
else:
req.environ['QUERY_STRING'] = 'multipart-manifest=get'
resp = self.PUT(req)
if resp.status_int != HTTP_CREATED:
return resp
return HTTPAccepted(request=req)
else:
error_response = check_metadata(req, 'object')
if error_response:
return error_response
container_info = self.container_info(
self.account_name, self.container_name, req)
container_partition = container_info['partition']
containers = container_info['nodes']
req.acl = container_info['write_acl']
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
if not containers:
return HTTPNotFound(request=req)
if 'x-delete-at' in req.headers:
try:
x_delete_at = normalize_delete_at_timestamp(int(req.headers['x-delete-at']))
if int(x_delete_at) < time.time():
return HTTPBadRequest(
body='X-Delete-At in past', request=req,
content_type='text/plain')
except ValueError:
return HTTPBadRequest(request=req,
content_type='text/plain',
body='Non-integer X-Delete-At')
req.environ.setdefault('swift.log_info', []).append('x-delete-at:%s' % x_delete_at)
delete_at_container = normalize_delete_at_timestamp(
int(x_delete_at) /self.app.expiring_objects_container_divisor *self.app.expiring_objects_container_divisor)
delete_at_part, delete_at_nodes = \
self.app.container_ring.get_nodes(self.app.expiring_objects_account, delete_at_container)
else:
delete_at_container = delete_at_part = delete_at_nodes = None
partition, nodes = self.app.object_ring.get_nodes(self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
headers = self._backend_requests(
req, len(nodes), container_partition, containers,
delete_at_container, delete_at_part, delete_at_nodes)
resp = self.make_requests(req, self.app.object_ring, partition, 'POST', req.swift_entity_path, headers)
return resp
def POST(self, request):
"""
更新object的元数据信息,流程如下:
1.从requesturl中提取device,partition, account, container, obj;
检查requestheads中的'x-timestamp'是否存在,检查mount情况;
2.根据请求信息新建DiskFile对象file,检查是否存在;
(包括检查metadata中的'X-Delete-At',调用file#is_deleted()和检查file.data_size)
3.如果检查都通过,则根据request.heads中的元素更新metadata;
4.从request.heads中提取'X-Delete-At'并与file.metadata中的相同字段比较;
根据较新的值调用file#delete_at_update(),通知更新container的信息;
5.调用file#put()方法将metadata写入到.meta文件和data_file的扩展属性中;
实现更新object的元数据信息;
并通知object的更新到container;
"""
# 根据request.path获取device、partition、account、container、obj等参数;
device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
if 'x-timestamp' not in request.headers or not check_float(request.headers['x-timestamp']):
return HTTPBadRequest(body='Missing timestamp', request=request, content_type='text/plain')
new_delete_at = int(request.headers.get('X-Delete-At') or 0)
if new_delete_at and new_delete_at < time.time():
return HTTPBadRequest(body='X-Delete-At in past', request=request, content_type='text/plain')
try:
disk_file = self.get_diskfile(device, partition, account, container, obj)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
orig_metadata = disk_file.read_metadata()
except (DiskFileNotExist, DiskFileQuarantined):
return HTTPNotFound(request=request)
orig_timestamp = orig_metadata.get('X-Timestamp', '0')
if orig_timestamp >= request.headers['x-timestamp']:
return HTTPConflict(request=request)
metadata = {'X-Timestamp': request.headers['x-timestamp']}
metadata.update(val for val in request.headers.iteritems() if is_user_meta('object', val[0]))
for header_key in self.allowed_headers:
if header_key in request.headers:
header_caps = header_key.title()
metadata[header_caps] = request.headers[header_key]
orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
if orig_delete_at != new_delete_at:
if new_delete_at:
self.delete_at_update('PUT', new_delete_at, account, container, obj, request, device)
if orig_delete_at:
self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device)
disk_file.write_metadata(metadata)
return HTTPAccepted(request=request)
下一篇博客将继续swift-proxy与swift-object的分析工作。