当前位置: 首页 > 工具软件 > Swift-K > 使用案例 >

Swift源码分析----swift-proxy与swift-object(1)

华安民
2023-12-01

感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!

如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn

PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。


概述:

这篇博客主要关注swift-proxy与swift-object服务中PUT,POST,DELETE,GET,HEAD等方法的对应调用实现;


源码解析部分(代码中较重要的部分已经进行了相关的注释):


GETorHEAD

/swift/proxy/controllers/obj.py----class ContainerController(Controller)----def GETorHEAD

def GETorHEAD(self, req):
    """
    处理HTTP协议GET或者HEAD请求;
    """
    # 获取指定object所属的container的信息;
    container_info = self.container_info(self.account_name, self.container_name, req)
        
    req.acl = container_info['read_acl']
    if 'swift.authorize' in req.environ:
        aresp = req.environ['swift.authorize'](req)
        if aresp:
            return aresp

    # 获取指定object所对应的分区号;
    partition = self.app.object_ring.get_part(self.account_name, self.container_name, self.object_name)
        
    resp = self.GETorHEAD_base(
            req, _('Object'), self.app.object_ring, partition,
            req.swift_entity_path)

    if ';' in resp.headers.get('content-type', ''):
        # strip off swift_bytes from content-type
        content_type, check_extra_meta = resp.headers['content-type'].rsplit(';', 1)
        if check_extra_meta.lstrip().startswith('swift_bytes='):
            resp.content_type = content_type
    return resp
/swift/obj/server.py----class ContainerController(object)----def HEAD

def HEAD(self, request):
    """
    检索返回一个object的metadata,同GET请求的处理方法几乎一致,唯一不同的是不在body中返回file;
    """
    device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
    try:
        disk_file = self.get_diskfile(device, partition, account, container, obj)
    except DiskFileDeviceUnavailable:
        return HTTPInsufficientStorage(drive=device, request=request)
    try:
        metadata = disk_file.read_metadata()
    except (DiskFileNotExist, DiskFileQuarantined):
        return HTTPNotFound(request=request, conditional_response=True)
    response = Response(request=request, conditional_response=True)
    response.headers['Content-Type'] = metadata.get('Content-Type', 'application/octet-stream')
    for key, value in metadata.iteritems():
        if is_user_meta('object', key) or key.lower() in self.allowed_headers:
            response.headers[key] = value
    response.etag = metadata['ETag']
    ts = metadata['X-Timestamp']
    response.last_modified = math.ceil(float(ts))
    # Needed for container sync feature
    response.headers['X-Timestamp'] = ts
    response.content_length = int(metadata['Content-Length'])
    try:
        response.content_encoding = metadata['Content-Encoding']
    except KeyError:
        pass
    return response

/swift/obj/server.py----class ContainerController(object)----def GET

def GET(self, request):
    """
    检索一个object对象,在response.heads中返回metadata,在response.body中返回objectdata,流程如下:
    1.根据url中的信息新建DiskFile对象file,检查request.heads中的必要K-V,检查mount情况;
    2.如果file#is_deleted或者file.metadata中'X-Delete-At'小于当前时间(表示已标记为准备删除)
      或者通过file#get_data_file_size查看文件是否异常,如果已经删除或存在异常,返回404HTTPNotFound;
    3.检查request.heads里的'If-match'和'If-none-match',前者检查file.metadata中的'ETag'是否与其一致确定所检索的文件,后者确定如果没有匹配的是否返回file的etag信息;
    4.确定了需要操作的file,利用file的iterator,将其绑定response的构造函数参数app_iter,
      并且将file.metadata写入response.heads中,并返回response;
    """
    device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)
        
    keep_cache = self.keep_cache_private or (
        'X-Auth-Token' not in request.headers and
        'X-Storage-Token' not in request.headers)
        
    try:
        disk_file = self.get_diskfile(device, partition, account, container, obj)
    except DiskFileDeviceUnavailable:
        return HTTPInsufficientStorage(drive=device, request=request)
        
    try:
        with disk_file.open():
            metadata = disk_file.get_metadata()
            obj_size = int(metadata['Content-Length'])
            file_x_ts = metadata['X-Timestamp']
            file_x_ts_flt = float(file_x_ts)
            file_x_ts_utc = datetime.fromtimestamp(file_x_ts_flt, UTC)

            if_unmodified_since = request.if_unmodified_since
                
            if if_unmodified_since and file_x_ts_utc > if_unmodified_since:
                return HTTPPreconditionFailed(request=request)

            if_modified_since = request.if_modified_since
            if if_modified_since and file_x_ts_utc <= if_modified_since:
                return HTTPNotModified(request=request)

            keep_cache = (self.keep_cache_private or
                          ('X-Auth-Token' not in request.headers and
                           'X-Storage-Token' not in request.headers))
            response = Response(
                app_iter=disk_file.reader(keep_cache=keep_cache),
                request=request, conditional_response=True)
            response.headers['Content-Type'] = metadata.get('Content-Type', 'application/octet-stream')
            for key, value in metadata.iteritems():
                if is_user_meta('object', key) or key.lower() in self.allowed_headers:
                    response.headers[key] = value
            response.etag = metadata['ETag']
            response.last_modified = math.ceil(file_x_ts_flt)
            response.content_length = obj_size
            try:
                response.content_encoding = metadata['Content-Encoding']
            except KeyError:
                pass
            response.headers['X-Timestamp'] = file_x_ts
            resp = request.get_response(response)
                
    except (DiskFileNotExist, DiskFileQuarantined):
        resp = HTTPNotFound(request=request, conditional_response=True)
    return resp

POST

/swift/proxy/controllers/obj.py----class ContainerController(Controller)----def POST

def POST(self, req):
    """
    处理HTTP协议POST请求;
    """
    # 计算预计删除对象时间???
    if 'x-delete-after' in req.headers:
        try:
            x_delete_after = int(req.headers['x-delete-after'])
        except ValueError:
            return HTTPBadRequest(request=req,
                                  content_type='text/plain',
                                  body='Non-integer X-Delete-After')
        req.headers['x-delete-at'] = normalize_delete_at_timestamp(time.time() + x_delete_after)
        

    # 在object的实现方法中,系统默认以PUT方法来实现POST方法;
    if self.app.object_post_as_copy:
        req.method = 'PUT'
        req.path_info = '/v1/%s/%s/%s' % (self.account_name, self.container_name, self.object_name)
        req.headers['Content-Length'] = 0
        req.headers['X-Copy-From'] = quote('/%s/%s' % (self.container_name, self.object_name))
        req.headers['X-Fresh-Metadata'] = 'true'
        req.environ['swift_versioned_copy'] = True
        if req.environ.get('QUERY_STRING'):
            req.environ['QUERY_STRING'] += '&multipart-manifest=get'
        else:
            req.environ['QUERY_STRING'] = 'multipart-manifest=get'
        resp = self.PUT(req)

        if resp.status_int != HTTP_CREATED:
            return resp
        return HTTPAccepted(request=req)
        
    else:
        error_response = check_metadata(req, 'object')
        if error_response:
            return error_response
        container_info = self.container_info(
            self.account_name, self.container_name, req)
        container_partition = container_info['partition']
        containers = container_info['nodes']
        req.acl = container_info['write_acl']
        if 'swift.authorize' in req.environ:
            aresp = req.environ['swift.authorize'](req)
            if aresp:
                return aresp
        if not containers:
            return HTTPNotFound(request=req)
        if 'x-delete-at' in req.headers:
            try:
                x_delete_at = normalize_delete_at_timestamp(int(req.headers['x-delete-at']))
                if int(x_delete_at) < time.time():
                    return HTTPBadRequest(
                        body='X-Delete-At in past', request=req,
                        content_type='text/plain')
            except ValueError:
                return HTTPBadRequest(request=req,
                                      content_type='text/plain',
                                      body='Non-integer X-Delete-At')
            req.environ.setdefault('swift.log_info', []).append('x-delete-at:%s' % x_delete_at)
            delete_at_container = normalize_delete_at_timestamp(
                int(x_delete_at) /self.app.expiring_objects_container_divisor *self.app.expiring_objects_container_divisor)
            delete_at_part, delete_at_nodes = \
                    self.app.container_ring.get_nodes(self.app.expiring_objects_account, delete_at_container)
        else:
            delete_at_container = delete_at_part = delete_at_nodes = None
        partition, nodes = self.app.object_ring.get_nodes(self.account_name, self.container_name, self.object_name)
        req.headers['X-Timestamp'] = normalize_timestamp(time.time())

        headers = self._backend_requests(
            req, len(nodes), container_partition, containers,
            delete_at_container, delete_at_part, delete_at_nodes)

        resp = self.make_requests(req, self.app.object_ring, partition, 'POST', req.swift_entity_path, headers)
            
        return resp

/swift/obj/server.py----class ContainerController(object)----def POST

def POST(self, request):
    """     
    更新object的元数据信息,流程如下:
    1.从requesturl中提取device,partition, account, container, obj;
      检查requestheads中的'x-timestamp'是否存在,检查mount情况;
    2.根据请求信息新建DiskFile对象file,检查是否存在;
      (包括检查metadata中的'X-Delete-At',调用file#is_deleted()和检查file.data_size)
    3.如果检查都通过,则根据request.heads中的元素更新metadata;
    4.从request.heads中提取'X-Delete-At'并与file.metadata中的相同字段比较;
      根据较新的值调用file#delete_at_update(),通知更新container的信息;
    5.调用file#put()方法将metadata写入到.meta文件和data_file的扩展属性中;
        
    实现更新object的元数据信息;
    并通知object的更新到container;
    """
    # 根据request.path获取device、partition、account、container、obj等参数;
    device, partition, account, container, obj = split_and_validate_path(request, 5, 5, True)

    if 'x-timestamp' not in request.headers or not check_float(request.headers['x-timestamp']):
        return HTTPBadRequest(body='Missing timestamp', request=request, content_type='text/plain')

    new_delete_at = int(request.headers.get('X-Delete-At') or 0)
    if new_delete_at and new_delete_at < time.time():
        return HTTPBadRequest(body='X-Delete-At in past', request=request, content_type='text/plain')
        
    try:
        disk_file = self.get_diskfile(device, partition, account, container, obj)
    except DiskFileDeviceUnavailable:
        return HTTPInsufficientStorage(drive=device, request=request)
    try:
        orig_metadata = disk_file.read_metadata()
    except (DiskFileNotExist, DiskFileQuarantined):
        return HTTPNotFound(request=request)
    orig_timestamp = orig_metadata.get('X-Timestamp', '0')
    if orig_timestamp >= request.headers['x-timestamp']:
        return HTTPConflict(request=request)
        
    metadata = {'X-Timestamp': request.headers['x-timestamp']}
    metadata.update(val for val in request.headers.iteritems() if is_user_meta('object', val[0]))
    for header_key in self.allowed_headers:
       if header_key in request.headers:
            header_caps = header_key.title()
            metadata[header_caps] = request.headers[header_key]
        
    orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
        
    if orig_delete_at != new_delete_at:
        if new_delete_at:
            self.delete_at_update('PUT', new_delete_at, account, container, obj, request, device)
        if orig_delete_at:
            self.delete_at_update('DELETE', orig_delete_at, account, container, obj, request, device)
    disk_file.write_metadata(metadata)
    return HTTPAccepted(request=request)

下一篇博客将继续swift-proxy与swift-object的分析工作。

 类似资料: