keystonemiddleware中的token 认证

冀永寿
2023-12-01

        在系统访问api时,都要经过auth_token认证,只有认证成功才能继续访问api,所以弄清api认证的流程很有必要。

token认证包括了三个认证过程,即:cache认证,本地认证和远程认证;

1、根据token信息从token cache获取token id 和具体的token信息(json字符串,cached);返回cached;

2、如果token cache中没有,则使用cms对token字符串进行解析认证;返回解析后的token信息,然后将计算是否超时;最后将没超时的token信息保存在 token cache中;返回解析后的字符串;

3、如果cms解析失败,则进行远程token认证,即访问keystone server进行token 认证;

4、如果远程认证成功,将计算是否超时;最后将没超时的token信息保存在 token cache中;返回解析后的字符串;

5、如果远程认证失败,则抛出认证失败异常;


每个api应用都有个api-paste.ini文件,里面定义了访问应用的路径以及访问的过滤器filter。下面以neutronapi-paste.init文件为例:

[composite:neutron]

use = egg:Paste#urlmap

/: neutronversions

/v2.0: neutronapi_v2_0

 

[composite:neutronapi_v2_0]

use = call:neutron.auth:pipeline_factory

noauth = request_id catch_errors extensions neutronapiapp_v2_0

keystone = request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0

......

[filter:authtoken]

paste.filter_factory = keystonemiddleware.auth_token:filter_factory

在文件中,可以看到访问应用会先经过authtoken过滤器,过滤器定义为:keystonemiddleware.auth_token:filter_factory.

def filter_factory(global_conf, **local_conf):

    conf = global_conf.copy()

    conf.update(local_conf)

    def auth_filter(app):

        return AuthProtocol(app, conf)

    return auth_filter

Filter_factory返回一个AuthProtocol对象。Filter在处理请求时会调用__call__函数。

class AuthProtocol(object): 

  def __call__(self, env, start_response):

        def _fmt_msg(env):

            msg = ('user: user_id %s, project_id %s, roles %s '

                   'service: user_id %s, project_id %s, roles %s' % (

                       env.get('HTTP_X_USER_ID'), env.get('HTTP_X_PROJECT_ID'),

                       env.get('HTTP_X_ROLES'),

                       env.get('HTTP_X_SERVICE_USER_ID'),

                       env.get('HTTP_X_SERVICE_PROJECT_ID'),

                       env.get('HTTP_X_SERVICE_ROLES')))

            return msg

        self._token_cache.initialize(env)  #初始化一个cache pool

        self._remove_auth_headers(env)  #移除请求的header信息,防止用户伪造token

        try:

            user_auth_ref = None

            serv_auth_ref = None

            try:

                 #从请求中获取token信息

                user_token = self._get_user_token_from_header(env)

                #验证token,返回token信息

                user_token_info = self._validate_token(user_token, env)

                #构造user_auth_ref 

                user_auth_ref = access.AccessInfo.factory(

                    body=user_token_info,

                    auth_token=user_token)

                env['keystone.token_info'] = user_token_info

                #构造头部信息

                user_headers = self._build_user_headers(user_auth_ref,user_token_info)

                self._add_headers(env, user_headers)

            except exc.InvalidToken:

                if self._delay_auth_decision:                    

                    self._add_headers(env, {'X-Identity-Status': 'Invalid'})

                else:

                    return self._reject_request(env, start_response)

 

            try:

                 #获取服务请求的token

                serv_token = self._get_service_token_from_header(env)

                if serv_token is not None:

                   #验证token

                    serv_token_info = self._validate_token(serv_token, env)

                    serv_auth_ref = access.AccessInfo.factory(

                        body=serv_token_info,

                        auth_token=serv_token)

                    serv_headers = self._build_service_headers(serv_token_info)

                    self._add_headers(env, serv_headers)

            except exc.InvalidToken:

                if self._delay_auth_decision:

                    self._LOG.info(

                        _LI('Invalid service token - deferring reject '

                            'downstream'))

                    self._add_headers(env,{'X-Service-Identity-Status': 'Invalid'})

                else:

                    self._LOG.info(

                        _LI('Invalid service token - rejecting request'))

                    return self._reject_request(env, start_response)

            env['keystone.token_auth'] = _user_plugin.UserAuthPlugin(

                user_auth_ref, serv_auth_ref)

        except exc.ServiceError as e:

            self._LOG.critical(_LC('Unable to obtain admin token: %s'), e)

            return self._do_503_error(env, start_response)

        return self._call_app(env, start_response)

__call__函数中主要实现从header中获取token,再验证token,最后将token信息放入header中。

_validate_token代码如下:

    def _validate_token(self, token, env, retry=True):

        token_id = None

        try:

            token_ids, cached = self._token_cache.get(token)          

            token_id = token_ids[0]

            if cached:

                data = cached

                if self._check_revocations_for_cached:                   

                    self._revocations.check(token_ids)

                self._confirm_token_bind(data, env)

            else:

                verified = None               

                try:

                    if cms.is_pkiz(token):

                        verified = self._verify_pkiz_token(token, token_ids)

                    elif cms.is_asn1_token(token):

                        verified = self._verify_signed_token(token, token_ids)

                except exceptions.CertificateConfigError:

                    ....

                if verified is not None:

                    data = jsonutils.loads(verified)

                    expires = _get_token_expiration(data)

                    _confirm_token_not_expired(expires)

                else:

                    data = self._identity_server.verify_token(token, retry)                    

                    expires = _get_token_expiration(data)

                self._confirm_token_bind(data, env)

                self._token_cache.store(token_id, data, expires)

            return data

        except (exceptions.ConnectionRefused, exceptions.RequestTimeout):

           ....

        except Exception:

            self._LOG.debug('Token validation failure.', exc_info=True)

            if token_id:

                self._token_cache.store_invalid(token_id)

            self._LOG.warn(_LW('Authorization failed for token'))

            raise exc.InvalidToken(_('Token authorization failed'))

1、根据tokentoken cache里面获取token id cache(实际是未超期token,超期的会被token cache过滤掉,抛出InvalidToken异常),这个cache是系统自带的cache,可以使用memcache进行代替;

2、如果cache不为空,则检查注销列表中是否有token,如果有,则认证失败;

3、坚持tokenbind,这里默认为:permissive,不加限制

4、如果cache为空,这是个新的token,在缓存中没有,则使用cmstoken信息进行验证。返回verified,token;检查token是否超时,如果超时,则抛出认证失败异常;

5、如果cms解析失败,则向keystone server 进行验证;验证成功,再次计算token是否过期,最后将保存的token保存在token cache中。

 

在返回token信息后,还需要将token信息转换到header中,代码如下:

    def _build_user_headers(self, auth_ref, token_info):       

        roles = ','.join(auth_ref.role_names)

        if _token_is_v2(token_info) and not auth_ref.project_id:

            raise exc.InvalidToken(_('Unable to determine tenancy.'))

 

        rval = {

            'X-Identity-Status': 'Confirmed',

            'X-Roles': roles,

        }

        for header_tmplt, attr in six.iteritems(_HEADER_TEMPLATE):

            rval[header_tmplt % ''] = getattr(auth_ref, attr)        

        rval['X-Role'] = roles

        for header_tmplt, attr in six.iteritems(_DEPRECATED_HEADER_TEMPLATE):

            rval[header_tmplt] = getattr(auth_ref, attr)

        if self._include_service_catalog and auth_ref.has_service_catalog():

            catalog = auth_ref.service_catalog.get_data()

            if _token_is_v3(token_info):

                catalog = _v3_to_v2_catalog(catalog)

            rval['X-Service-Catalog'] = jsonutils.dumps(catalog)

 

        return rval

1、获取token信息中的role信息;

2、根据header template保存到header中;

如果有服务目录(server catalog),则将服务目录保存到header中。



 类似资料: