在系统访问api时,都要经过auth_token认证,只有认证成功才能继续访问api,所以弄清api认证的流程很有必要。
token认证包括了三个认证过程,即:cache认证,本地认证和远程认证;
1、根据token信息从token cache获取token id 和具体的token信息(json字符串,cached);返回cached;
2、如果token cache中没有,则使用cms对token字符串进行解析认证;返回解析后的token信息,然后将计算是否超时;最后将没超时的token信息保存在 token cache中;返回解析后的字符串;
3、如果cms解析失败,则进行远程token认证,即访问keystone server进行token 认证;
4、如果远程认证成功,将计算是否超时;最后将没超时的token信息保存在 token cache中;返回解析后的字符串;
5、如果远程认证失败,则抛出认证失败异常;
每个api应用都有个api-paste.ini文件,里面定义了访问应用的路径以及访问的过滤器filter。下面以neutron的api-paste.init文件为例:
[composite:neutron] use = egg:Paste#urlmap /: neutronversions /v2.0: neutronapi_v2_0
[composite:neutronapi_v2_0] use = call:neutron.auth:pipeline_factory noauth = request_id catch_errors extensions neutronapiapp_v2_0 keystone = request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0 ...... [filter:authtoken] paste.filter_factory = keystonemiddleware.auth_token:filter_factory |
在文件中,可以看到访问应用会先经过authtoken过滤器,过滤器定义为:keystonemiddleware.auth_token:filter_factory.
def filter_factory(global_conf, **local_conf): conf = global_conf.copy() conf.update(local_conf) def auth_filter(app): return AuthProtocol(app, conf) return auth_filter |
Filter_factory返回一个AuthProtocol对象。Filter在处理请求时会调用__call__函数。
class AuthProtocol(object): def __call__(self, env, start_response): def _fmt_msg(env): msg = ('user: user_id %s, project_id %s, roles %s ' 'service: user_id %s, project_id %s, roles %s' % ( env.get('HTTP_X_USER_ID'), env.get('HTTP_X_PROJECT_ID'), env.get('HTTP_X_ROLES'), env.get('HTTP_X_SERVICE_USER_ID'), env.get('HTTP_X_SERVICE_PROJECT_ID'), env.get('HTTP_X_SERVICE_ROLES'))) return msg self._token_cache.initialize(env) #初始化一个cache pool self._remove_auth_headers(env) #移除请求的header信息,防止用户伪造token try: user_auth_ref = None serv_auth_ref = None try: #从请求中获取token信息 user_token = self._get_user_token_from_header(env) #验证token,返回token信息 user_token_info = self._validate_token(user_token, env) #构造user_auth_ref user_auth_ref = access.AccessInfo.factory( body=user_token_info, auth_token=user_token) env['keystone.token_info'] = user_token_info #构造头部信息 user_headers = self._build_user_headers(user_auth_ref,user_token_info) self._add_headers(env, user_headers) except exc.InvalidToken: if self._delay_auth_decision: self._add_headers(env, {'X-Identity-Status': 'Invalid'}) else: return self._reject_request(env, start_response)
try: #获取服务请求的token serv_token = self._get_service_token_from_header(env) if serv_token is not None: #验证token serv_token_info = self._validate_token(serv_token, env) serv_auth_ref = access.AccessInfo.factory( body=serv_token_info, auth_token=serv_token) serv_headers = self._build_service_headers(serv_token_info) self._add_headers(env, serv_headers) except exc.InvalidToken: if self._delay_auth_decision: self._LOG.info( _LI('Invalid service token - deferring reject ' 'downstream')) self._add_headers(env,{'X-Service-Identity-Status': 'Invalid'}) else: self._LOG.info( _LI('Invalid service token - rejecting request')) return self._reject_request(env, start_response) env['keystone.token_auth'] = _user_plugin.UserAuthPlugin( user_auth_ref, serv_auth_ref) except exc.ServiceError as e: self._LOG.critical(_LC('Unable to obtain admin token: %s'), e) return self._do_503_error(env, start_response) return self._call_app(env, start_response) |
__call__函数中主要实现从header中获取token,再验证token,最后将token信息放入header中。
_validate_token代码如下:
def _validate_token(self, token, env, retry=True): token_id = None try: token_ids, cached = self._token_cache.get(token) token_id = token_ids[0] if cached: data = cached if self._check_revocations_for_cached: self._revocations.check(token_ids) self._confirm_token_bind(data, env) else: verified = None try: if cms.is_pkiz(token): verified = self._verify_pkiz_token(token, token_ids) elif cms.is_asn1_token(token): verified = self._verify_signed_token(token, token_ids) except exceptions.CertificateConfigError: .... if verified is not None: data = jsonutils.loads(verified) expires = _get_token_expiration(data) _confirm_token_not_expired(expires) else: data = self._identity_server.verify_token(token, retry) expires = _get_token_expiration(data) self._confirm_token_bind(data, env) self._token_cache.store(token_id, data, expires) return data except (exceptions.ConnectionRefused, exceptions.RequestTimeout): .... except Exception: self._LOG.debug('Token validation failure.', exc_info=True) if token_id: self._token_cache.store_invalid(token_id) self._LOG.warn(_LW('Authorization failed for token')) raise exc.InvalidToken(_('Token authorization failed')) |
1、根据token从token cache里面获取token id 和cache(实际是未超期token,超期的会被token cache过滤掉,抛出InvalidToken异常),这个cache是系统自带的cache,可以使用memcache进行代替;
2、如果cache不为空,则检查注销列表中是否有token,如果有,则认证失败;
3、坚持token的bind,这里默认为:permissive,不加限制
4、如果cache为空,这是个新的token,在缓存中没有,则使用cms对token信息进行验证。返回verified,即token;检查token是否超时,如果超时,则抛出认证失败异常;
5、如果cms解析失败,则向keystone server 进行验证;验证成功,再次计算token是否过期,最后将保存的token保存在token cache中。
在返回token信息后,还需要将token信息转换到header中,代码如下:
def _build_user_headers(self, auth_ref, token_info): roles = ','.join(auth_ref.role_names) if _token_is_v2(token_info) and not auth_ref.project_id: raise exc.InvalidToken(_('Unable to determine tenancy.'))
rval = { 'X-Identity-Status': 'Confirmed', 'X-Roles': roles, } for header_tmplt, attr in six.iteritems(_HEADER_TEMPLATE): rval[header_tmplt % ''] = getattr(auth_ref, attr) rval['X-Role'] = roles for header_tmplt, attr in six.iteritems(_DEPRECATED_HEADER_TEMPLATE): rval[header_tmplt] = getattr(auth_ref, attr) if self._include_service_catalog and auth_ref.has_service_catalog(): catalog = auth_ref.service_catalog.get_data() if _token_is_v3(token_info): catalog = _v3_to_v2_catalog(catalog) rval['X-Service-Catalog'] = jsonutils.dumps(catalog)
return rval |
1、获取token信息中的role信息;
2、根据header template保存到header中;
如果有服务目录(server catalog),则将服务目录保存到header中。