当前位置: 首页 > 工具软件 > Swift OAuth2 > 使用案例 >

Swift源码分析----swift-proxy与swift-container

束研
2023-12-01

感谢朋友支持本博客,欢迎共同探讨交流,由于能力和时间有限,错误之处在所难免,欢迎指正!

如果转载,请保留作者信息。
博客地址:http://blog.csdn.net/gaoxingnengjisuan
邮箱地址:dong.liu@siat.ac.cn

PS:最近没有登录博客,很多朋友的留言没有看见,这里道歉!还有就是本人较少上QQ,可以邮件交流。


概述:

这篇博客主要关注swift-proxy与swift-container服务中PUT,POST,DELETE,GET,HEAD等方法的对应调用实现;


源码解析部分(代码中较重要的部分已经进行了相关的注释):


GETorHEAD

/swift/proxy/controllers/container.py----class ContainerController(Controller)----def GETorHEAD

def GETorHEAD(self, req):
"""Handler for HTTP GET/HEAD requests."""
    if not self.account_info(self.account_name, req)[1]:
        return HTTPNotFound(request=req)
        
    # 获取指定container的对应分区号;
    part = self.app.container_ring.get_part(self.account_name, self.container_name)
    resp = self.GETorHEAD_base(
        req, _('Container'), self.app.container_ring, part,
        req.swift_entity_path)
        
    if 'swift.authorize' in req.environ:
        req.acl = resp.headers.get('x-container-read')
        aresp = req.environ['swift.authorize'](req)
        if aresp:
            return aresp
    if not req.environ.get('swift_owner', False):
        for key in self.app.swift_owner_headers:
            if key in resp.headers:
                del resp.headers[key]
    return resp

/swift/container/server.py----class ContainerController(object)----def HEAD

def HEAD(self, req):
    """
    HEAD请求返回container的基本信息(元数据信息),并以key-value的形式保存在HTTPHEAD中返回;
    """
    # 从原始请求中获取drive, part, account, container, obj等信息;
    drive, part, account, container, obj = split_and_validate_path(req, 4, 5, True)
    out_content_type = get_listing_content_type(req)
        
    # mount_check是是否进行mount检查;
    # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
    if self.mount_check and not check_mount(self.root, drive):
        return HTTPInsufficientStorage(drive=drive, request=req)
        
    # 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
    broker = self._get_container_broker(drive, part, account, container,
                                        pending_timeout=0.1,
                                        stale_reads_ok=True)
        
    if broker.is_deleted():
        return HTTPNotFound(request=req)
        
    # 获取全局数据;
    # 返回包括account, created_at, put_timestamp, delete_timestamp, container_count, object_count, bytes_used, hash, id等值的字典;
    info = broker.get_info()
    headers = {
            'X-Container-Object-Count': info['object_count'],
            'X-Container-Bytes-Used': info['bytes_used'],
            'X-Timestamp': info['created_at'],
            'X-PUT-Timestamp': info['put_timestamp'],
        }
    headers.update(
            (key, value)
            for key, (value, timestamp) in broker.metadata.iteritems()
            if value != '' and (key.lower() in self.save_headers or
                                is_sys_or_user_meta('container', key)))
    headers['Content-Type'] = out_content_type

    return HTTPNoContent(request=req, headers=headers, charset='utf-8')


/swift/container/server.py----class ContainerController(object)----def GET

def GET(self, req):
    """
    处理HTTP协议的GET请求;
    GET同HEAD一样,都是请求返回container的基本信息,并以key-value的形式保存在HTTPHEAD当中;
    不同之处在于GET方法中获取了指定container下的object列表,存储在body中,同HTTPHEAD一同返回;
    """
    # 从原始请求中获取drive, part, account, container, obj等信息;
    drive, part, account, container, obj = split_and_validate_path(req, 4, 5, True)
    path = get_param(req, 'path')
    prefix = get_param(req, 'prefix')
    delimiter = get_param(req, 'delimiter')
        
    # mount_check是是否进行mount检查;
    # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
    if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):
        # delimiters can be made more flexible later
        return HTTPPreconditionFailed(body='Bad delimiter')
        
    marker = get_param(req, 'marker', '')
    end_marker = get_param(req, 'end_marker')
    limit = CONTAINER_LISTING_LIMIT
    given_limit = get_param(req, 'limit')
    if given_limit and given_limit.isdigit():
        limit = int(given_limit)
        if limit > CONTAINER_LISTING_LIMIT:
            return HTTPPreconditionFailed(
                request=req,
                body='Maximum limit is %d' % CONTAINER_LISTING_LIMIT)
                
    out_content_type = get_listing_content_type(req)
    if self.mount_check and not check_mount(self.root, drive):
        return HTTPInsufficientStorage(drive=drive, request=req)
        
    # 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
    broker = self._get_container_broker(drive, part, account, container,
                                        pending_timeout=0.1,
                                        stale_reads_ok=True)
        
    if broker.is_deleted():
        return HTTPNotFound(request=req)
        
    # 获取全局数据;
    # 返回包括account, created_at, put_timestamp, delete_timestamp, container_count, object_count, bytes_used, hash, id等值的字典;
    info = broker.get_info()
    resp_headers = {
            'X-Container-Object-Count': info['object_count'],
            'X-Container-Bytes-Used': info['bytes_used'],
            'X-Timestamp': info['created_at'],
            'X-PUT-Timestamp': info['put_timestamp'],
        }
    for key, (value, timestamp) in broker.metadata.iteritems():
        if value and (key.lower() in self.save_headers or is_sys_or_user_meta('container', key)):
            resp_headers[key] = value
    ret = Response(request=req, headers=resp_headers, content_type=out_content_type, charset='utf-8')


    # 获取objects排序列表;
    container_list = broker.list_objects_iter(limit, marker, end_marker, prefix, delimiter, path)
        
    if out_content_type == 'application/json':
        et.body = json.dumps([self.update_data_record(record)
                                   for record in container_list])
    elif out_content_type.endswith('/xml'):
        doc = Element('container', name=container.decode('utf-8'))
        for obj in container_list:
            record = self.update_data_record(obj)
            if 'subdir' in record:
                name = record['subdir'].decode('utf-8')
                sub = SubElement(doc, 'subdir', name=name)
                SubElement(sub, 'name').text = name
            else:
                obj_element = SubElement(doc, 'object')
                for field in ["name", "hash", "bytes", "content_type", "last_modified"]:
                    SubElement(obj_element, field).text = str(record.pop(field)).decode('utf-8')
                for field in sorted(record):
                    SubElement(obj_element, field).text = str(record[field]).decode('utf-8')
        ret.body = tostring(doc, encoding='UTF-8').replace(
            "<?xml version='1.0' encoding='UTF-8'?>",
            '<?xml version="1.0" encoding="UTF-8"?>', 1)
    else:
        if not container_list:
            return HTTPNoContent(request=req, headers=resp_headers)
        ret.body = '\n'.join(rec[0] for rec in container_list) + '\n'
    return ret

PUT

/swift/proxy/controllers/container.py----class ContainerController(Controller)----def PUT

def PUT(self, req):
"""HTTP PUT request handler."""
    error_response = self.clean_acls(req) or check_metadata(req, 'container')
    if error_response:
        return error_response
    if not req.environ.get('swift_owner'):
        for key in self.app.swift_owner_headers:
            req.headers.pop(key, None)
    if len(self.container_name) > MAX_CONTAINER_NAME_LENGTH:
        resp = HTTPBadRequest(request=req)
        esp.body = 'Container name length of %d longer than %d' % (len(self.container_name), MAX_CONTAINER_NAME_LENGTH)
        return resp
        
    # 获取container所属account的信息,返回分区号、account的副本节点和container数目;
    account_partition, accounts, container_count = self.account_info(self.account_name, req)
        
    if not accounts and self.app.account_autocreate:
        self.autocreate_account(req.environ, self.account_name)
        account_partition, accounts, container_count = self.account_info(self.account_name, req)
                
    if not accounts:
        return HTTPNotFound(request=req)
        
    if self.app.max_containers_per_account > 0 and \
            container_count >= self.app.max_containers_per_account and \
            self.account_name not in self.app.max_containers_whitelist:
        resp = HTTPForbidden(request=req)
        resp.body = 'Reached container limit of %s' % self.app.max_containers_per_account
        return resp
        
    # 获取指定container的分区号和所有副本节点;
    container_partition, containers = self.app.container_ring.get_nodes(self.account_name, self.container_name)
        
    headers = self._backend_requests(req, len(containers), account_partition, accounts)
        
    # 清除memcache和env中的缓存信息;
    clear_info_cache(self.app, req.environ, self.account_name, self.container_name)
        
    resp = self.make_requests(
        req, self.app.container_ring,
        container_partition, 'PUT', req.swift_entity_path, headers)
        
    return resp

/swift/container/server.py----class ContainerController(object)----def PUT

def PUT(self, req):
    """
    如果包含object信息,根据元数据在数据库中实现建立一个object;
    如果不包含object信息,实现更新container的数据库中的元数据信息,并调用account_update通知account-server更新状态;
    """
    # 从原始请求中获取drive, part, account, container, obj等信息;
    drive, part, account, container, obj = split_and_validate_path(req, 4, 5, True)
        
    if 'x-timestamp' not in req.headers or not check_float(req.headers['x-timestamp']):
        return HTTPBadRequest(body='Missing timestamp', request=req, content_type='text/plain')
    if 'x-container-sync-to' in req.headers:
        err, sync_to, realm, realm_key = validate_sync_to(
            req.headers['x-container-sync-to'], self.allowed_sync_hosts,
            self.realms_conf)
        if err:
            return HTTPBadRequest(err)
            
    if self.mount_check and not check_mount(self.root, drive):
        return HTTPInsufficientStorage(drive=drive, request=req)
    timestamp = normalize_timestamp(req.headers['x-timestamp'])
        
    # 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
    broker = self._get_container_broker(drive, part, account, container)
        
    # 如果包含object信息,则序列化后写入db_file.pending文件;
    # 如果包含object信息,根据元数据在数据库中实现建立一个object;
    if obj:     # put container object
        if account.startswith(self.auto_create_account_prefix) and not os.path.exists(broker.db_file):
                
            # 数据库初始化;
            try:
                broker.initialize(timestamp)
            except DatabaseAlreadyExists:
                pass
        if not os.path.exists(broker.db_file):
            return HTTPNotFound()
            
        # 根据相关的metadata在数据库中建立一个object;
        broker.put_object(obj, timestamp, int(req.headers['x-size']),
                          req.headers['x-content-type'],
                          req.headers['x-etag'])
        return HTTPCreated(request=req)

    # 如果不包含object信息,则根据request.head中的key-value更新container_statu数据库的metadata;
    # 并调用account_update通知account-server更新状态;
    # 如果不包含object信息,实现更新container的数据库中的元数据信息;
    else:   # put container
        created = self._update_or_create(req, broker, timestamp)
        metadata = {}
        metadata.update(
            (key, (value, timestamp))
            for key, value in req.headers.iteritems()
            if key.lower() in self.save_headers or
            is_sys_or_user_meta('container', key))
        if metadata:
            if 'X-Container-Sync-To' in metadata:
                if 'X-Container-Sync-To' not in broker.metadata or \
                        metadata['X-Container-Sync-To'][0] != \
                        broker.metadata['X-Container-Sync-To'][0]:
                    broker.set_x_container_sync_points(-1, -1)
            # 更新数据库的元数据字典;
            broker.update_metadata(metadata)
            
        # 根据最新的container信息更新account服务;     
        # 用于在对container做删除/修改操作时通知其所属account做同步修改;
        # 主要部分就是向account所在server_ip发送PUT请求,URL格式为:
        # PUThttp://{account_ip}:{account_port}/{account_device}/{account_partition}/{account}/{container}
        resp = self.account_update(req, account, container, broker)
        if resp:
            return resp
        
        if created:
            return HTTPCreated(request=req)
        else:
            return HTTPAccepted(request=req)

POST

/swift/proxy/controllers/container.py----class ContainerController(Controller)----def POST

def POST(self, req):
"""HTTP POST request handler."""
    error_response = self.clean_acls(req) or check_metadata(req, 'container')
    if error_response:
        return error_response
    if not req.environ.get('swift_owner'):
        for key in self.app.swift_owner_headers:
            req.headers.pop(key, None)
        
    # 获取container所属account的信息,返回分区号、account的副本节点和container数目;
    account_partition, accounts, container_count = self.account_info(self.account_name, req)
        
    if not accounts:
        return HTTPNotFound(request=req)
        
    # 获取指定container的分区号和所有副本节点;
    container_partition, containers = self.app.container_ring.get_nodes(self.account_name, self.container_name)
        
    headers = self.generate_request_headers(req, transfer=True)
    clear_info_cache(self.app, req.environ, self.account_name, self.container_name)
        
    resp = self.make_requests(
            req, self.app.container_ring, container_partition, 'POST',
            req.swift_entity_path, [headers] * len(containers))
        
    return resp

/swift/container/server.py----class ContainerController(object)----def POST

def POST(self, req):
    """
    实现更新container的元数据信息,从head中取出特定要求的metadata更新至指定container的数据库;
    """
    # 从原始请求中获取drive, part, account, container, obj等信息;
    drive, part, account, container = split_and_validate_path(req, 4)
    if 'x-timestamp' not in req.headers or not check_float(req.headers['x-timestamp']):
        return HTTPBadRequest(body='Missing or bad timestamp', request=req, content_type='text/plain')
    if 'x-container-sync-to' in req.headers:
        err, sync_to, realm, realm_key = validate_sync_to(
            req.headers['x-container-sync-to'], self.allowed_sync_hosts,
            self.realms_conf)
        if err:
            return HTTPBadRequest(err)
        
    # mount_check是是否进行mount检查;
    # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
    if self.mount_check and not check_mount(self.root, drive):
        return HTTPInsufficientStorage(drive=drive, request=req)
        
    # 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
    broker = self._get_container_broker(drive, part, account, container)
        
    if broker.is_deleted():
        return HTTPNotFound(request=req)
        
    # 把timestamp(时间戳)转换为标准格式;
    timestamp = normalize_timestamp(req.headers['x-timestamp'])
       
    metadata = {}
    metadata.update(
        (key, (value, timestamp)) for key, value in req.headers.iteritems()
        if key.lower() in self.save_headers or
        is_sys_or_user_meta('container', key))
        
    # 然后从head中取出特定要求的metadata更新至数据库;
    if metadata:
        if 'X-Container-Sync-To' in metadata:
            if 'X-Container-Sync-To' not in broker.metadata or \
                    metadata['X-Container-Sync-To'][0] != \
                    broker.metadata['X-Container-Sync-To'][0]:
                broker.set_x_container_sync_points(-1, -1)
        broker.update_metadata(metadata)
        
    return HTTPNoContent(request=req)

DELETE

/swift/proxy/controllers/container.py----class ContainerController(Controller)----def DELETE

def DELETE(self, req):
"""HTTP DELETE request handler."""
    # 获取container所属account的信息,返回分区号、account的副本节点和container数目;
    account_partition, accounts, container_count = self.account_info(self.account_name, req)
        
    if not accounts:
        return HTTPNotFound(request=req)
        
    # 获取指定container的分区号和所有副本节点;
    container_partition, containers = self.app.container_ring.get_nodes(self.account_name, self.container_name)
        
    headers = self._backend_requests(req, len(containers), account_partition, accounts)
    clear_info_cache(self.app, req.environ, self.account_name, self.container_name)
        
    resp = self.make_requests(
        req, self.app.container_ring, container_partition, 'DELETE',
        req.swift_entity_path, headers)
        
    # Indicates no server had the container
    if resp.status_int == HTTP_ACCEPTED:
        return HTTPNotFound(request=req)
    return resp

/swift/container/server.py----class ContainerController(object)----def DELETE

def DELETE(self, req):
    """
    输入的URL格式为host:port/device/partition/account/container/<object>
    如果没有object字段,说明是删除container,过程和Account的DELETE操作一样,
    先进行一系列检查,然后根据db_file.pengding文件刷新数据库到最新状态并检查是否已经删除,
    如果status字段不为DELETED,清空数据库中的metadata字段,
    更新delete_timestamp然后置status字段为DELETED,
    最后调用account_update通知其所属account更新状态;
        
    如果URL中包含object字段,则是为了在对其所包含的Object进行操作后同步更新container,
    这里会调用ContainerBroker#delete_object,同样也是将删除信息序列化后写入db_file.pending文件,待下次对该container操作时更新进数据库;
        
    删除指定的container或者删除指定container中的指定object;
    如果URL中没有object字段,说明是删除container;
    如果URL中包含object字段,标志指定的object为deleted;
    """
    # 从原始请求中获取drive, part, account, container, obj等信息;
    drive, part, account, container, obj = split_and_validate_path(req, 4, 5, True)
        
    if 'x-timestamp' not in req.headers or \
            not check_float(req.headers['x-timestamp']):
        return HTTPBadRequest(body='Missing timestamp', request=req, content_type='text/plain')
        
    # mount_check是是否进行mount检查;
    # 如果进行mount检查,并且检查结果没有挂载,则引发http 507错误,提示磁盘没有足够存储空间;
    if self.mount_check and not check_mount(self.root, drive):
        return HTTPInsufficientStorage(drive=drive, request=req)
        
    # 返回一个ContainerBroker实例,用于代理其数据库访问的操作;
    broker = self._get_container_broker(drive, part, account, container)
        
    # initialize:数据库初始化;
    if account.startswith(self.auto_create_account_prefix) and obj and \
            not os.path.exists(broker.db_file):
        try:
            broker.initialize(normalize_timestamp(req.headers.get('x-timestamp') or time.time()))
        except DatabaseAlreadyExists:
            pass
    if not os.path.exists(broker.db_file):
        return HTTPNotFound()
        
    # 如果存在obj,则执行标志object状态为deleted的操作;
    # delete_object:标志object状态为deleted;
    # 如果URL中包含object字段,则是为了在对其所包含的Object进行操作后同步更新container,
    # 这里会调用ContainerBroker#delete_object,同样也是将删除信息序列化后写入db_file.pending文件,
    # 待下次对该container操作时更新进数据库;
    # 标志指定的object为deleted;
    if obj:     # delete object
        broker.delete_object(obj, req.headers.get('x-timestamp'))
        return HTTPNoContent(request=req)
        
    # 如果没有object字段,说明是删除container,过程和Account的DELETE操作一样,
    # 先进行一系列检查,然后根据db_file.pengding文件刷新数据库到最新状态并检查是否已经删除,
    # 如果status字段不为DELETED,清空数据库中的metadata字段,
    # 更新delete_timestamp然后置status字段为DELETED,
    # 最后调用account_update通知其所属account更新状态;
    else:
        # delete container
        # 检测container DB是否为空;
        if not broker.empty():
            return HTTPConflict(request=req)
        existed = float(broker.get_info()['put_timestamp']) and not broker.is_deleted()

        # 对数据库中的对象进行删除状态的标记工作,并不会执行文件的删除工作;
        broker.delete_db(req.headers['X-Timestamp'])
        if not broker.is_deleted():
            return HTTPConflict(request=req)

        # 根据最新的container信息更新account服务;  
        # 用于在对container做删除/修改操作时通知其所属account做同步修改;
        # 主要部分就是向account所在server_ip发送PUT请求,URL格式为:
        # PUThttp://{account_ip}:{account_port}/{account_device}/{account_partition}/{account}/{container}
        resp = self.account_update(req, account, container, broker)
        if resp:
            return resp
            
        if existed:
            return HTTPNoContent(request=req)
        return HTTPNotFound()

 类似资料: