flask身份验证
This tutorial takes a test-first approach to implementing token-based authentication in a Flask app using JSON Web Tokens (JWTs).
本教程采用测试优先的方法,使用JSON Web令牌(JWT)在Flask应用中实现基于令牌的身份验证。
Updates:
更新:
By the end of this tutorial, you will be able to…
在本教程结束时,您将能够...
Free Bonus: Click here to get access to a free Flask + Python video tutorial that shows you how to build Flask web app, step-by-step.
免费红利: 单击此处可访问免费的Flask + Python视频教程 ,该教程逐步向您展示如何构建Flask Web应用程序。
JSON Web Tokens (or JWTs) provide a means of transmitting information from the client to the server in a stateless, secure way.
JSON Web令牌 (或JWT)提供了一种以无状态 ,安全的方式将信息从客户端传输到服务器的方法。
On the server, JWTs are generated by signing user information via a secret key, which are then securely stored on the client. This form of auth works well with modern, single page applications. For more on this, along with the pros and cons of using JWTs vs. session and cookie-based auth, please review the following articles:
在服务器上,通过秘密密钥对用户信息签名来生成JWT,然后将JWT安全地存储在客户端上。 这种形式的身份验证可与现代的单页应用程序很好地配合使用。 有关此的更多信息,以及使用JWT与会话和基于Cookie的身份验证的优缺点,请查看以下文章:
NOTE: Keep in mind that since a JWT is signed rather than encrypted it should never contain sensitive information like a user’s password.
注意:请记住,由于JWT是经过签名而不是加密的,因此它绝不应包含敏感信息,例如用户密码。
Enough theory, let’s start implementing some code!
理论足够多,让我们开始实现一些代码!
Start by cloning the project boilerplate and then create a new branch:
首先克隆项目样板,然后创建一个新分支:
$ git clone https://github.com/realpython/flask-jwt-auth.git
$ git clone https://github.com/realpython/flask-jwt-auth.git
$ $ cd flask-jwt-auth
cd flask-jwt-auth
$ git checkout tags/1.0.0 -b jwt-auth
$ git checkout tags/1.0.0 -b jwt-auth
Create and activate a virtualenv and install the dependencies:
创建并激活virtualenv并安装依赖项:
This is optional, but it’s a good idea to create a new Github repository and update the remote:
这是可选的,但是最好创建一个新的Github存储库并更新远程服务器:
(env)$ git remote set-url origin <newurl>
(env)$ git remote set-url origin <newurl>
Let’s set up Postgres.
让我们设置Postgres。
NOTE: If you’re on a Mac, check out Postgres app.
注意 :如果您使用的是Mac,请查看Postgres应用 。
Once the local Postgres server is running, create two new databases from psql
that share the same name as your project name:
一旦本地Postgres服务器运行,从psql
创建两个新数据库,它们共享与您的项目名称相同的名称:
NOTE: There may be some variation on the above commands, for creating a database, based upon your version of Postgres. Check for the correct command in the Postgres documentation.
注意 :根据您使用的Postgres版本,上述命令在创建数据库时可能会有一些变化。 在Postgres文档中检查正确的命令。
Before applying the database migrations we need to update the config file found in project/server/config.py. Simply update the database_name
:
在应用数据库迁移之前,我们需要更新位于project / server / config.py中的配置文件。 只需更新database_name
:
database_name database_name = = 'flask_jwt_auth'
'flask_jwt_auth'
Set the environment variables in the terminal:
在终端中设置环境变量:
Update the following tests in project/tests/test__config.py:
在project / tests / test__config.py中更新以下测试:
class class TestDevelopmentConfigTestDevelopmentConfig (( TestCaseTestCase ):
):
def def create_appcreate_app (( selfself ):
):
appapp .. configconfig .. from_objectfrom_object (( 'project.server.config.DevelopmentConfig''project.server.config.DevelopmentConfig' )
)
return return app
app
def def test_app_is_developmenttest_app_is_development (( selfself ):
):
selfself .. assertTrueassertTrue (( appapp .. configconfig [[ 'DEBUG''DEBUG' ] ] is is TrueTrue )
)
selfself .. assertFalseassertFalse (( current_app current_app is is NoneNone )
)
selfself .. assertTrueassertTrue (
(
appapp .. configconfig [[ 'SQLALCHEMY_DATABASE_URI''SQLALCHEMY_DATABASE_URI' ] ] == == 'postgresql://postgres:@localhost/flask_jwt_auth'
'postgresql://postgres:@localhost/flask_jwt_auth'
)
)
class class TestTestingConfigTestTestingConfig (( TestCaseTestCase ):
):
def def create_appcreate_app (( selfself ):
):
appapp .. configconfig .. from_objectfrom_object (( 'project.server.config.TestingConfig''project.server.config.TestingConfig' )
)
return return app
app
def def test_app_is_testingtest_app_is_testing (( selfself ):
):
selfself .. assertTrueassertTrue (( appapp .. configconfig [[ 'DEBUG''DEBUG' ])
])
selfself .. assertTrueassertTrue (
(
appapp .. configconfig [[ 'SQLALCHEMY_DATABASE_URI''SQLALCHEMY_DATABASE_URI' ] ] == == 'postgresql://postgres:@localhost/flask_jwt_auth_test'
'postgresql://postgres:@localhost/flask_jwt_auth_test'
)
)
Run them to ensure they still pass:
运行它们以确保它们仍然通过:
You should see:
您应该看到:
test_app_is_development (test__config.TestDevelopmentConfig) ... ok
test_app_is_development (test__config.TestDevelopmentConfig) ... ok
test_app_is_production (test__config.TestProductionConfig) ... ok
test_app_is_production (test__config.TestProductionConfig) ... ok
test_app_is_testing (test__config.TestTestingConfig) ... ok
test_app_is_testing (test__config.TestTestingConfig) ... ok
----------------------------------------------------------------------
----------------------------------------------------------------------
Ran 3 tests in 0.007s
Ran 3 tests in 0.007s
OK
OK
Add a models.py file to the “server” directory:
将models.py文件添加到“服务器”目录中:
In the above snippet, we define a basic user model, which uses the Flask-Bcrypt extension to hash the password.
在以上代码段中,我们定义了一个基本的用户模型,该模型使用Flask-Bcrypt扩展名对密码进行哈希处理。
Install psycopg2 to connect to Postgres:
安装psycopg2以连接到Postgres:
(env)$ pip install (env)$ pip install psycopg2psycopg2 ==== 2.6.2
2 .6.2
(env)$ pip freeze > requirements.txt
(env)$ pip freeze > requirements.txt
Within manage.py change-
在manage.py change中-
To-
至-
from from project.server project.server import import appapp , , dbdb , , models
models
Apply the migration:
应用迁移:
Did it work?
奏效了吗?
(( envenv )) $ $ psql
psql
# # c c flask_jwt_auth
flask_jwt_auth
You You are are now now connected connected to to database database "flask_jwt_auth" "flask_jwt_auth" as as user user "michael.herman""michael.herman" .
.
# # d
d
List List of of relations
relations
Schema Schema | | Name Name | | Type Type | | Owner
Owner
--------+-----------------+----------+----------
--------+-----------------+----------+----------
public public | | alembic_version alembic_version | | table table | | postgres
postgres
public public | | users users | | table table | | postgres
postgres
public public | | users_id_seq users_id_seq | | sequence sequence | | postgres
postgres
(( 3 3 rowsrows )
)
The auth workflow works as follows:
身份验证工作流程如下:
This cycle repeats until the token expires or is revoked. In the latter case, the server issues a new token.
重复此循环,直到令牌到期或被吊销为止。 在后一种情况下,服务器发出新令牌。
The tokens themselves are divided into three parts:
令牌本身分为三个部分:
We’ll dive a bit deeper into the payload, but if you’re curious, you can read more about each part from the Introduction to JSON Web Tokens article.
我们将更深入地研究有效负载,但是如果您感到好奇,可以从JSON Web令牌简介一文中阅读有关每个部分的更多信息。
To work with JSON Web Tokens in our app, install the PyJWT package:
要在我们的应用程序中使用JSON Web令牌,请安装PyJWT软件包:
Add the following method to the User()
class in project/server/models.py:
将以下方法添加到project / server / models.py中的User()
类:
def def encode_auth_tokenencode_auth_token (( selfself , , user_iduser_id ):
):
"""
"""
Generates the Auth Token
Generates the Auth Token
:return: string
:return: string
"""
"""
trytry :
:
payload payload = = {
{
'exp''exp' : : datetimedatetime .. datetimedatetime .. utcnowutcnow () () + + datetimedatetime .. timedeltatimedelta (( daysdays == 00 , , secondsseconds == 55 ),
),
'iat''iat' : : datetimedatetime .. datetimedatetime .. utcnowutcnow (),
(),
'sub''sub' : : user_id
user_id
}
}
return return jwtjwt .. encodeencode (
(
payloadpayload ,
,
appapp .. configconfig .. getget (( 'SECRET_KEY''SECRET_KEY' ),
),
algorithmalgorithm == 'HS256'
'HS256'
)
)
except except Exception Exception as as ee :
:
return return e
e
Don’t forget to add the import:
不要忘记添加导入:
So, given a user id, this method creates and returns a token from the payload and the secret key set in the config.py file. The payload is where we add metadata about the token and information about the user. This info is often referred to as JWT Claims. We utilize the following “claims”:
因此,给定用户ID,此方法会从有效负载和config.py文件中设置的密钥创建并返回令牌。 有效负载是我们添加有关令牌的元数据和有关用户的信息的地方。 此信息通常称为JWT Claims 。 我们利用以下“声明”:
exp
: expiration date of the tokeniat
: the time the token is generatedsub
: the subject of the token (the user whom it identifies)exp
:令牌的到期日期 iat
:令牌生成的时间 sub
:令牌的主题(它标识的用户) The secret key must be random and only accessible server-side. Use the Python interpreter to generate a key:
密钥必须是随机的,并且只能在服务器端访问。 使用Python解释器生成密钥:
>>> >>> import import os
os
>>> >>> osos .. urandomurandom (( 2424 )
)
b"xf9'xe4p(xa9x12x1a!x94x8dx1cx99lxc7xb7exc7cx86x02MJxa0"
b"xf9'xe4p(xa9x12x1a!x94x8dx1cx99lxc7xb7exc7cx86x02MJxa0"
Set the key as an environment variable:
将密钥设置为环境变量:
Add this key to the SECRET_KEY
within the BaseConfig()
class in project/server/config.py:
将此密钥添加到project / server / config.py中的BaseConfig()
类内的SECRET_KEY
中:
SECRET_KEY SECRET_KEY = = osos .. getenvgetenv (( 'SECRET_KEY''SECRET_KEY' , , 'my_precious''my_precious' )
)
Update the tests within project/tests/test__config.py to ensure the variable is set correctly:
更新project / tests / test__config.py中的测试,以确保正确设置了变量:
Before moving on, let’s write a quick unit test for the user model. Add the following code to a new file called test_user_model.py in “project/tests”:
在继续之前,让我们为用户模型编写一个快速的单元测试。 将以下代码添加到“ project / tests”中名为test_user_model.py的新文件中:
# project/tests/test_user_model.py
# project/tests/test_user_model.py
import import unittest
unittest
from from project.server project.server import import db
db
from from project.server.models project.server.models import import User
User
from from project.tests.base project.tests.base import import BaseTestCase
BaseTestCase
class class TestUserModelTestUserModel (( BaseTestCaseBaseTestCase ):
):
def def test_encode_auth_tokentest_encode_auth_token (( selfself ):
):
user user = = UserUser (
(
emailemail == 'test@test.com''test@test.com' ,
,
passwordpassword == 'test'
'test'
)
)
dbdb .. sessionsession .. addadd (( useruser )
)
dbdb .. sessionsession .. commitcommit ()
()
auth_token auth_token = = useruser .. encode_auth_tokenencode_auth_token (( useruser .. idid )
)
selfself .. assertTrueassertTrue (( isinstanceisinstance (( auth_tokenauth_token , , bytesbytes ))
))
if if __name__ __name__ == == '__main__''__main__' :
:
unittestunittest .. mainmain ()
()
Run the tests. They all should pass.
运行测试。 他们都应该通过。
Similarly, to decode a token, add the following method to the User()
class:
同样,要解码令牌,请将以下方法添加到User()
类:
We need to decode the auth token with every API request and verify its signature to be sure of the user’s authenticity. To verify the auth_token
, we used the same SECRET_KEY
used to encode a token.
我们需要对每个API请求都解码auth令牌,并验证其签名以确保用户的真实性。 为了验证auth_token
,我们使用了用于编码令牌的相同SECRET_KEY
。
If the auth_token
is valid, we get the user id from the sub
index of the payload. If invalid, there could be two exceptions:
如果auth_token
有效,我们将从有效负载的sub
索引中获取用户ID。 如果无效,则可能有两个例外:
ExpiredSignatureError
exception. This means the time specified in the payload’s exp
field has expired.InvalidTokenError
exception is raised.ExpiredSignatureError
异常。 这意味着在有效负载的exp
字段中指定的时间已到期。 InvalidTokenError
异常。 NOTE: We have used a static method since it does not relate to the class’s instance.
注意:我们使用了静态方法,因为它与类的实例无关。
Add a test to test_user_model.py:
将测试添加到test_user_model.py:
def def test_decode_auth_tokentest_decode_auth_token (( selfself ):
):
user user = = UserUser (
(
emailemail == 'test@test.com''test@test.com' ,
,
passwordpassword == 'test'
'test'
)
)
dbdb .. sessionsession .. addadd (( useruser )
)
dbdb .. sessionsession .. commitcommit ()
()
auth_token auth_token = = useruser .. encode_auth_tokenencode_auth_token (( useruser .. idid )
)
selfself .. assertTrueassertTrue (( isinstanceisinstance (( auth_tokenauth_token , , bytesbytes ))
))
selfself .. assertTrueassertTrue (( UserUser .. decode_auth_tokendecode_auth_token (( auth_tokenauth_token ) ) == == 11 )
)
Make sure the tests pass before moving on.
在继续进行之前,请确保测试通过。
NOTE: We will handle invalid tokens by blacklisting them later.
注意:我们将在以后将其列入黑名单,以处理无效的令牌。
Now we can configure the auth routes using a test-first approach:
现在,我们可以使用“测试优先”方法配置身份验证路由:
/auth/register
/auth/login
/auth/logout
/auth/user
/auth/register
/auth/login
/auth/logout
/auth/user
Start by creating a new folder called “auth” in “project/server”. Then, within “auth” add two files, __init__.py and views.py. Finally, add the following code to views.py:
首先在“项目/服务器”中创建一个名为“ auth”的新文件夹。 然后,在“ auth”中添加两个文件__init__.py和views.py。 最后,将以下代码添加到views.py中:
To register the new Blueprint with the app, add the following to the bottom of project/server/__init__.py:
要向应用程序注册新的蓝图 ,请将以下内容添加到project / server / __ init__.py的底部:
from from project.server.auth.views project.server.auth.views import import auth_blueprint
auth_blueprint
appapp .. register_blueprintregister_blueprint (( auth_blueprintauth_blueprint )
)
Now, add a new file called test_auth.py within “project/tests” to hold all our tests for this Blueprint:
现在,在“项目/测试”中添加一个名为test_auth.py的新文件,以保存我们对该蓝图的所有测试:
Start with a test:
从测试开始:
def def test_registrationtest_registration (( selfself ):
):
""" Test for user registration """
""" Test for user registration """
with with selfself .. clientclient :
:
response response = = selfself .. clientclient .. postpost (
(
'/auth/register''/auth/register' ,
,
datadata == jsonjson .. dumpsdumps (( dictdict (
(
emailemail == 'joe@gmail.com''joe@gmail.com' ,
,
passwordpassword == '123456'
'123456'
)),
)),
content_typecontent_type == 'application/json'
'application/json'
)
)
data data = = jsonjson .. loadsloads (( responseresponse .. datadata .. decodedecode ())
())
selfself .. assertTrueassertTrue (( datadata [[ 'status''status' ] ] == == 'success''success' )
)
selfself .. assertTrueassertTrue (( datadata [[ 'message''message' ] ] == == 'Successfully registered.''Successfully registered.' )
)
selfself .. assertTrueassertTrue (( datadata [[ 'auth_token''auth_token' ])
])
selfself .. assertTrueassertTrue (( responseresponse .. content_type content_type == == 'application/json''application/json' )
)
selfself .. assertEqualassertEqual (( responseresponse .. status_codestatus_code , , 201201 )
)
Make sure to add the import:
确保添加导入:
Run the tests. You should see the following error:
运行测试。 您应该看到以下错误:
raise JSONDecodeError("Expecting value", s, err.value) from None
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Now, let’s write the code to get the test to pass. Add the following to project/server/auth/views.py:
现在,让我们编写代码以使测试通过。 将以下内容添加到project / server / auth / views.py:
Here, we register a new user and generate a new auth token for further requests, which we send back to the client.
在这里,我们注册一个新用户并为进一步的请求生成一个新的身份验证令牌,然后将其发送回客户端。
Run the tests to ensure they all pass:
运行测试以确保它们全部通过:
Ran 6 tests in 0.132s
Ran 6 tests in 0.132s
OK
OK
Next, let’s add one more test to ensure the registration fails if the user already exists:
接下来,让我们再添加一个测试,以确保如果用户已经存在,则注册失败:
Run the tests again before moving on to the next route. All should pass.
在继续下一条路线之前,请再次运行测试。 一切都会过去。
Again, start with a test. To verify the login API, let’s test for two cases:
同样,从测试开始。 为了验证登录API,让我们测试两种情况:
def def test_registered_user_logintest_registered_user_login (( selfself ):
):
""" Test for login of registered-user login """
""" Test for login of registered-user login """
with with selfself .. clientclient :
:
# user registration
# user registration
resp_register resp_register = = selfself .. clientclient .. postpost (
(
'/auth/register''/auth/register' ,
,
datadata == jsonjson .. dumpsdumps (( dictdict (
(
emailemail == 'joe@gmail.com''joe@gmail.com' ,
,
passwordpassword == '123456'
'123456'
)),
)),
content_typecontent_type == 'application/json''application/json' ,
,
)
)
data_register data_register = = jsonjson .. loadsloads (( resp_registerresp_register .. datadata .. decodedecode ())
())
selfself .. assertTrueassertTrue (( data_registerdata_register [[ 'status''status' ] ] == == 'success''success' )
)
selfself .. assertTrueassertTrue (
(
data_registerdata_register [[ 'message''message' ] ] == == 'Successfully registered.'
'Successfully registered.'
)
)
selfself .. assertTrueassertTrue (( data_registerdata_register [[ 'auth_token''auth_token' ])
])
selfself .. assertTrueassertTrue (( resp_registerresp_register .. content_type content_type == == 'application/json''application/json' )
)
selfself .. assertEqualassertEqual (( resp_registerresp_register .. status_codestatus_code , , 201201 )
)
# registered user login
# registered user login
response response = = selfself .. clientclient .. postpost (
(
'/auth/login''/auth/login' ,
,
datadata == jsonjson .. dumpsdumps (( dictdict (
(
emailemail == 'joe@gmail.com''joe@gmail.com' ,
,
passwordpassword == '123456'
'123456'
)),
)),
content_typecontent_type == 'application/json'
'application/json'
)
)
data data = = jsonjson .. loadsloads (( responseresponse .. datadata .. decodedecode ())
())
selfself .. assertTrueassertTrue (( datadata [[ 'status''status' ] ] == == 'success''success' )
)
selfself .. assertTrueassertTrue (( datadata [[ 'message''message' ] ] == == 'Successfully logged in.''Successfully logged in.' )
)
selfself .. assertTrueassertTrue (( datadata [[ 'auth_token''auth_token' ])
])
selfself .. assertTrueassertTrue (( responseresponse .. content_type content_type == == 'application/json''application/json' )
)
selfself .. assertEqualassertEqual (( responseresponse .. status_codestatus_code , , 200200 )
)
In this test case, the registered user tries to log in and, as expected, our application should allow this.
在此测试用例中,注册用户尝试登录,并且按预期,我们的应用程序应允许该登录。
Run the tests. They should fail. Now write the code:
运行测试。 他们应该失败。 现在编写代码:
Don’t forget to convert the class to a view function:
不要忘记将类转换为视图函数 :
# define the API resources
# define the API resources
registration_view registration_view = = RegisterAPIRegisterAPI .. as_viewas_view (( 'register_api''register_api' )
)
login_view login_view = = LoginAPILoginAPI .. as_viewas_view (( 'login_api''login_api' )
)
# add Rules for API Endpoints
# add Rules for API Endpoints
auth_blueprintauth_blueprint .. add_url_ruleadd_url_rule (
(
'/auth/register''/auth/register' ,
,
view_funcview_func == registration_viewregistration_view ,
,
methodsmethods == [[ 'POST''POST' ]
]
)
)
auth_blueprintauth_blueprint .. add_url_ruleadd_url_rule (
(
'/auth/login''/auth/login' ,
,
view_funcview_func == login_viewlogin_view ,
,
methodsmethods == [[ 'POST''POST' ]
]
)
)
Run the tests again. Do they pass? They should. Don’t move on until all tests pass.
再次运行测试。 他们通过了吗? 他们应该。 在所有测试通过之前,请不要继续前进。
Add the test:
添加测试:
In this case, a non-registered user attempts to log in and, as expected, our application should not allow this.
在这种情况下,未注册的用户会尝试登录,并且按预期,我们的应用程序不应允许该登录。
Run the tests, and then update the code:
运行测试,然后更新代码:
class class LoginAPILoginAPI (( MethodViewMethodView ):
):
"""
"""
User Login Resource
User Login Resource
"""
"""
def def postpost (( selfself ):
):
# get the post data
# get the post data
post_data post_data = = requestrequest .. get_jsonget_json ()
()
trytry :
:
# fetch the user data
# fetch the user data
user user = = UserUser .. queryquery .. filter_byfilter_by (
(
emailemail == post_datapost_data .. getget (( 'email''email' )
)
)) .. firstfirst ()
()
if if user user and and bcryptbcrypt .. check_password_hashcheck_password_hash (
(
useruser .. passwordpassword , , post_datapost_data .. getget (( 'password''password' )
)
):
):
auth_token auth_token = = useruser .. encode_auth_tokenencode_auth_token (( useruser .. idid )
)
if if auth_tokenauth_token :
:
responseObject responseObject = = {
{
'status''status' : : 'success''success' ,
,
'message''message' : : 'Successfully logged in.''Successfully logged in.' ,
,
'auth_token''auth_token' : : auth_tokenauth_token .. decodedecode ()
()
}
}
return return make_responsemake_response (( jsonifyjsonify (( responseObjectresponseObject )), )), 200
200
elseelse :
:
responseObject responseObject = = {
{
'status''status' : : 'fail''fail' ,
,
'message''message' : : 'User does not exist.'
'User does not exist.'
}
}
return return make_responsemake_response (( jsonifyjsonify (( responseObjectresponseObject )), )), 404
404
except except Exception Exception as as ee :
:
printprint (( ee )
)
responseObject responseObject = = {
{
'status''status' : : 'fail''fail' ,
,
'message''message' : : 'Try again'
'Try again'
}
}
return return make_responsemake_response (( jsonifyjsonify (( responseObjectresponseObject )), )), 500
500
What did we change? Do the tests pass? What if the email is correct but the password is incorrect? What happens? Write a test for this!
我们改变了什么? 测试通过了吗? 如果电子邮件正确但密码错误怎么办? 怎么了? 为此编写测试!
In order to get the user details of the currently logged in user, the auth token must be sent with the request within the header.
为了获取当前登录用户的用户详细信息,必须将auth令牌与请求一起发送到标头中。
Start with a test:
从测试开始:
The test should fail. Now, in the handler class, we should:
测试应该失败。 现在,在处理程序类中,我们应该:
class class UserAPIUserAPI (( MethodViewMethodView ):
):
"""
"""
User Resource
User Resource
"""
"""
def def getget (( selfself ):
):
# get the auth token
# get the auth token
auth_header auth_header = = requestrequest .. headersheaders .. getget (( 'Authorization''Authorization' )
)
if if auth_headerauth_header :
:
auth_token auth_token = = auth_headerauth_header .. splitsplit (( " "" " )[)[ 11 ]
]
elseelse :
:
auth_token auth_token = = ''
''
if if auth_tokenauth_token :
:
resp resp = = UserUser .. decode_auth_tokendecode_auth_token (( auth_tokenauth_token )
)
if if not not isinstanceisinstance (( respresp , , strstr ):
):
user user = = UserUser .. queryquery .. filter_byfilter_by (( idid == respresp )) .. firstfirst ()
()
responseObject responseObject = = {
{
'status''status' : : 'success''success' ,
,
'data''data' : : {
{
'user_id''user_id' : : useruser .. idid ,
,
'email''email' : : useruser .. emailemail ,
,
'admin''admin' : : useruser .. adminadmin ,
,
'registered_on''registered_on' : : useruser .. registered_on
registered_on
}
}
}
}
return return make_responsemake_response (( jsonifyjsonify (( responseObjectresponseObject )), )), 200
200
responseObject responseObject = = {
{
'status''status' : : 'fail''fail' ,
,
'message''message' : : resp
resp
}
}
return return make_responsemake_response (( jsonifyjsonify (( responseObjectresponseObject )), )), 401
401
elseelse :
:
responseObject responseObject = = {
{
'status''status' : : 'fail''fail' ,
,
'message''message' : : 'Provide a valid auth token.'
'Provide a valid auth token.'
}
}
return return make_responsemake_response (( jsonifyjsonify (( responseObjectresponseObject )), )), 401
401
So, if the token is valid and not expired, we get the user id from the token’s payload, which is then used to get the user data from the database.
因此,如果令牌有效且未过期,我们将从令牌的有效负载中获取用户ID,然后将其用于从数据库中获取用户数据。
NOTE: We still need to check if a token is blacklisted. We’ll get to this shortly.
注意:我们仍然需要检查令牌是否已列入黑名单。 我们将尽快解决。
Make sure to add:
确保添加:
And:
和:
auth_blueprintauth_blueprint .. add_url_ruleadd_url_rule (
(
'/auth/status''/auth/status' ,
,
view_funcview_func == user_viewuser_view ,
,
methodsmethods == [[ 'GET''GET' ]
]
)
)
The tests should pass:
测试应通过:
One more route to go!
还有另外一条路!
Tests valid logout:
测试有效的注销:
def def test_valid_logouttest_valid_logout (( selfself ):
):
""" Test for logout before token expires """
""" Test for logout before token expires """
with with selfself .. clientclient :
:
# user registration
# user registration
resp_register resp_register = = selfself .. clientclient .. postpost (
(
'/auth/register''/auth/register' ,
,
datadata == jsonjson .. dumpsdumps (( dictdict (
(
emailemail == 'joe@gmail.com''joe@gmail.com' ,
,
passwordpassword == '123456'
'123456'
)),
)),
content_typecontent_type == 'application/json''application/json' ,
,
)
)
data_register data_register = = jsonjson .. loadsloads (( resp_registerresp_register .. datadata .. decodedecode ())
())
selfself .. assertTrueassertTrue (( data_registerdata_register [[ 'status''status' ] ] == == 'success''success' )
)
selfself .. assertTrueassertTrue (
(
data_registerdata_register [[ 'message''message' ] ] == == 'Successfully registered.''Successfully registered.' )
)
selfself .. assertTrueassertTrue (( data_registerdata_register [[ 'auth_token''auth_token' ])
])
selfself .. assertTrueassertTrue (( resp_registerresp_register .. content_type content_type == == 'application/json''application/json' )
)
selfself .. assertEqualassertEqual (( resp_registerresp_register .. status_codestatus_code , , 201201 )
)
# user login
# user login
resp_login resp_login = = selfself .. clientclient .. postpost (
(
'/auth/login''/auth/login' ,
,
datadata == jsonjson .. dumpsdumps (( dictdict (
(
emailemail == 'joe@gmail.com''joe@gmail.com' ,
,
passwordpassword == '123456'
'123456'
)),
)),
content_typecontent_type == 'application/json'
'application/json'
)
)
data_login data_login = = jsonjson .. loadsloads (( resp_loginresp_login .. datadata .. decodedecode ())
())
selfself .. assertTrueassertTrue (( data_logindata_login [[ 'status''status' ] ] == == 'success''success' )
)
selfself .. assertTrueassertTrue (( data_logindata_login [[ 'message''message' ] ] == == 'Successfully logged in.''Successfully logged in.' )
)
selfself .. assertTrueassertTrue (( data_logindata_login [[ 'auth_token''auth_token' ])
])
selfself .. assertTrueassertTrue (( resp_loginresp_login .. content_type content_type == == 'application/json''application/json' )
)
selfself .. assertEqualassertEqual (( resp_loginresp_login .. status_codestatus_code , , 200200 )
)
# valid token logout
# valid token logout
response response = = selfself .. clientclient .. postpost (
(
'/auth/logout''/auth/logout' ,
,
headersheaders == dictdict (
(
AuthorizationAuthorization == 'Bearer ' 'Bearer ' + + jsonjson .. loadsloads (
(
resp_loginresp_login .. datadata .. decodedecode ()
()
)[)[ 'auth_token''auth_token' ]
]
)
)
)
)
data data = = jsonjson .. loadsloads (( responseresponse .. datadata .. decodedecode ())
())
selfself .. assertTrueassertTrue (( datadata [[ 'status''status' ] ] == == 'success''success' )
)
selfself .. assertTrueassertTrue (( datadata [[ 'message''message' ] ] == == 'Successfully logged out.''Successfully logged out.' )
)
selfself .. assertEqualassertEqual (( responseresponse .. status_codestatus_code , , 200200 )
)
In this first test, we register a new user, log them in, and then attempt to log them out before the token expires.
在第一个测试中,我们注册一个新用户,登录他们,然后尝试在令牌过期之前注销他们。
Test invalid logout:
测试无效的登出:
Like the last test, we register a user, log them in, and then attempt to log them out. In this case, the token is invalid since it has expired.
像上次测试一样,我们注册一个用户,登录他们,然后尝试注销他们。 在这种情况下,令牌已过期,因此无效。
Add the import:
添加导入:
import import time
time
Now, the code must:
现在,代码必须:
Before writing the route handler, let’s create a new model for blacklisting tokens…
在编写路由处理程序之前,让我们创建一个将令牌列入黑名单的新模型…
Add the following code to project/server/models.py:
将以下代码添加到project / server / models.py:
Then create and apply the migrations. Once done, your database should have the following tables:
然后创建并应用迁移。 完成后,您的数据库应具有下表:
Schema Schema | | Name Name | | Type Type | | Owner
Owner
--------+-------------------------+----------+----------
--------+-------------------------+----------+----------
public public | | alembic_version alembic_version | | table table | | postgres
postgres
public public | | blacklist_tokens blacklist_tokens | | table table | | postgres
postgres
public public | | blacklist_tokens_id_seq blacklist_tokens_id_seq | | sequence sequence | | postgres
postgres
public public | | users users | | table table | | postgres
postgres
public public | | users_id_seq users_id_seq | | sequence sequence | | postgres
postgres
(( 5 5 rowsrows )
)
With that, we can add the logout handler…
这样,我们可以添加注销处理程序…
Update the views:
更新视图:
Update the imports:
更新导入:
from from project.server.models project.server.models import import UserUser , , BlacklistToken
BlacklistToken
When a users logs out, the token is no longer valid so we add it to the blacklist.
当用户注销时,令牌不再有效,因此我们将其添加到黑名单中。
NOTE: Often, larger applications have a way to renew blacklisted tokens every now and then so that the system does not run out of valid tokens.
注意:通常,大型应用程序会不时地更新列入黑名单的令牌,以便系统不会耗尽有效的令牌。
Run the tests:
运行测试:
Finally, we need to ensure that a token has not been blacklisted, right after the token has been decoded – decode_auth_token()
– within the logout and user status routes.
最后,我们需要确保在注销和用户状态路由中的令牌已解码decode_auth_token()
,该令牌没有被列入黑名单。
First, let’s write a test for the logout route:
首先,让我们为注销路径编写一个测试:
def def test_valid_blacklisted_token_logouttest_valid_blacklisted_token_logout (( selfself ):
):
""" Test for logout after a valid token gets blacklisted """
""" Test for logout after a valid token gets blacklisted """
with with selfself .. clientclient :
:
# user registration
# user registration
resp_register resp_register = = selfself .. clientclient .. postpost (
(
'/auth/register''/auth/register' ,
,
datadata == jsonjson .. dumpsdumps (( dictdict (
(
emailemail == 'joe@gmail.com''joe@gmail.com' ,
,
passwordpassword == '123456'
'123456'
)),
)),
content_typecontent_type == 'application/json''application/json' ,
,
)
)
data_register data_register = = jsonjson .. loadsloads (( resp_registerresp_register .. datadata .. decodedecode ())
())
selfself .. assertTrueassertTrue (( data_registerdata_register [[ 'status''status' ] ] == == 'success''success' )
)
selfself .. assertTrueassertTrue (
(
data_registerdata_register [[ 'message''message' ] ] == == 'Successfully registered.''Successfully registered.' )
)
selfself .. assertTrueassertTrue (( data_registerdata_register [[ 'auth_token''auth_token' ])
])
selfself .. assertTrueassertTrue (( resp_registerresp_register .. content_type content_type == == 'application/json''application/json' )
)
selfself .. assertEqualassertEqual (( resp_registerresp_register .. status_codestatus_code , , 201201 )
)
# user login
# user login
resp_login resp_login = = selfself .. clientclient .. postpost (
(
'/auth/login''/auth/login' ,
,
datadata == jsonjson .. dumpsdumps (( dictdict (
(
emailemail == 'joe@gmail.com''joe@gmail.com' ,
,
passwordpassword == '123456'
'123456'
)),
)),
content_typecontent_type == 'application/json'
'application/json'
)
)
data_login data_login = = jsonjson .. loadsloads (( resp_loginresp_login .. datadata .. decodedecode ())
())
selfself .. assertTrueassertTrue (( data_logindata_login [[ 'status''status' ] ] == == 'success''success' )
)
selfself .. assertTrueassertTrue (( data_logindata_login [[ 'message''message' ] ] == == 'Successfully logged in.''Successfully logged in.' )
)
selfself .. assertTrueassertTrue (( data_logindata_login [[ 'auth_token''auth_token' ])
])
selfself .. assertTrueassertTrue (( resp_loginresp_login .. content_type content_type == == 'application/json''application/json' )
)
selfself .. assertEqualassertEqual (( resp_loginresp_login .. status_codestatus_code , , 200200 )
)
# blacklist a valid token
# blacklist a valid token
blacklist_token blacklist_token = = BlacklistTokenBlacklistToken (
(
tokentoken == jsonjson .. loadsloads (( resp_loginresp_login .. datadata .. decodedecode ())[())[ 'auth_token''auth_token' ])
])
dbdb .. sessionsession .. addadd (( blacklist_tokenblacklist_token )
)
dbdb .. sessionsession .. commitcommit ()
()
# blacklisted valid token logout
# blacklisted valid token logout
response response = = selfself .. clientclient .. postpost (
(
'/auth/logout''/auth/logout' ,
,
headersheaders == dictdict (
(
AuthorizationAuthorization == 'Bearer ' 'Bearer ' + + jsonjson .. loadsloads (
(
resp_loginresp_login .. datadata .. decodedecode ()
()
)[)[ 'auth_token''auth_token' ]
]
)
)
)
)
data data = = jsonjson .. loadsloads (( responseresponse .. datadata .. decodedecode ())
())
selfself .. assertTrueassertTrue (( datadata [[ 'status''status' ] ] == == 'fail''fail' )
)
selfself .. assertTrueassertTrue (( datadata [[ 'message''message' ] ] == == 'Token blacklisted. Please log in again.''Token blacklisted. Please log in again.' )
)
selfself .. assertEqualassertEqual (( responseresponse .. status_codestatus_code , , 401401 )
)
In this test, we blacklist the token just before the logout route gets hit which makes our valid token unusable.
在此测试中,我们在命中注销路由之前将令牌列入了黑名单,这使我们的有效令牌不可用。
Update the imports:
更新导入:
The test should fail with the following exception:
测试应失败,但以下情况除外:
psycopg2.IntegrityError: duplicate key value violates unique constraint "blacklist_tokens_token_key"
psycopg2.IntegrityError: duplicate key value violates unique constraint "blacklist_tokens_token_key"
DETAIL: Key (token)=(eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE0ODUyMDgyOTUsImlhdCI6MTQ4NTIwODI5MCwic3ViIjoxfQ.D9annoyh-VwpI5RY3blaSBX4pzK5UJi1H9dmKg2DeLQ) already exists.
DETAIL: Key (token)=(eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE0ODUyMDgyOTUsImlhdCI6MTQ4NTIwODI5MCwic3ViIjoxfQ.D9annoyh-VwpI5RY3blaSBX4pzK5UJi1H9dmKg2DeLQ) already exists.
Now update the decode_auth_token
function to handle already blacklisted tokens right after the decode and respond with appropriate message.
现在更新decode_auth_token
函数,以在解码后立即处理已列入黑名单的令牌,并以适当的消息进行响应。
Finally, add the check_blacklist()
function to project/server/models.py in the BlacklistToken
class:
最后,将check_blacklist()
函数添加到BlacklistToken
类的project / server / models.py中:
@staticmethod
@staticmethod
def def check_blacklistcheck_blacklist (( auth_tokenauth_token ):
):
# check whether auth token has been blacklisted
# check whether auth token has been blacklisted
res res = = BlacklistTokenBlacklistToken .. queryquery .. filter_byfilter_by (( tokentoken == strstr (( auth_tokenauth_token )))) .. firstfirst ()
()
if if resres :
:
return return True
True
elseelse :
:
return return False
False
Before you run the test, update test_decode_auth_token
to convert the bytes object to a string:
在运行测试之前,请更新test_decode_auth_token
以将bytes对象转换为字符串:
Run the tests:
运行测试:
Ran 13 tests in 9.557s
Ran 13 tests in 9.557s
OK
OK
In a similar fashion, add one more test for the user status route.
以类似的方式,为用户状态路由添加一个测试。
Similar to the last test, we blacklisted the token before the user status route gets hit.
与上次测试类似,我们在用户状态路由被点击之前将令牌列入了黑名单。
Run the tests for one final time:
最后一次运行测试:
Ran 14 tests in 10.206s
Ran 14 tests in 10.206s
OK
OK
Finally, take a look at test_auth.py. Notice the duplicate code? For example:
最后,看看test_auth.py。 注意重复的代码吗? 例如:
There are eight occurrences of this. To fix, add the following helper at the top of the file:
有八次这样的情况。 要解决此问题,请在文件顶部添加以下帮助程序:
def def register_userregister_user (( selfself , , emailemail , , passwordpassword ):
):
return return selfself .. clientclient .. postpost (
(
'/auth/register''/auth/register' ,
,
datadata == jsonjson .. dumpsdumps (( dictdict (
(
emailemail == emailemail ,
,
passwordpassword == password
password
)),
)),
content_typecontent_type == 'application/json''application/json' ,
,
)
)
Now, anywhere you need to register a user, you can call the helper:
现在,在需要注册用户的任何地方,都可以致电帮助程序:
How about logging in a user? Refactor it on your own. What else can you refactor? Comment below.
如何登录用户? 自行重构。 您还能重构什么? 在下面发表评论。
For the PyBites Challenge, let’s refactor some code to correct an issue added to the GitHub repo. Start by adding the following test to test_auth.py:
对于PyBites挑战 ,让我们重构了一些代码来纠正问题添加到GitHub库。 首先将以下测试添加到test_auth.py:
def def test_user_status_malformed_bearer_tokentest_user_status_malformed_bearer_token (( selfself ):
):
""" Test for user status with malformed bearer token"""
""" Test for user status with malformed bearer token"""
with with selfself .. clientclient :
:
resp_register resp_register = = register_userregister_user (( selfself , , 'joe@gmail.com''joe@gmail.com' , , '123456''123456' )
)
response response = = selfself .. clientclient .. getget (
(
'/auth/status''/auth/status' ,
,
headersheaders == dictdict (
(
AuthorizationAuthorization == 'Bearer' 'Bearer' + + jsonjson .. loadsloads (
(
resp_registerresp_register .. datadata .. decodedecode ()
()
)[)[ 'auth_token''auth_token' ]
]
)
)
)
)
data data = = jsonjson .. loadsloads (( responseresponse .. datadata .. decodedecode ())
())
selfself .. assertTrueassertTrue (( datadata [[ 'status''status' ] ] == == 'fail''fail' )
)
selfself .. assertTrueassertTrue (( datadata [[ 'message''message' ] ] == == 'Bearer token malformed.''Bearer token malformed.' )
)
selfself .. assertEqualassertEqual (( responseresponse .. status_codestatus_code , , 401401 )
)
Essentially, an error is thrown if the Authorization
header is formatted incorrectly – e.g., no space between Bearer
and the token value. Run the tests to ensure they fail, and then update the UserAPI
class in project/server/auth/views.py:
本质上,如果Authorization
标头的格式不正确(例如, Bearer
和令牌值之间没有空格),则会引发错误。 运行测试以确保它们失败,然后在project / server / auth / views.py中更新UserAPI
类:
Run the tests one final time.
最后一次运行测试。
In this tutorial, we went through the process of adding authentication to a Flask app with JSON Web Tokens. Turn back to the objectives from the beginning of this tutorial. Can you put each one into action? What did you learn?
在本教程中,我们完成了使用JSON Web令牌向Flask应用添加身份验证的过程。 从本教程的开头回到目标。 你能把每个人付诸行动吗? 你学到了什么?
What’s next? How about the client-side. Check out Token-Based Authentication With Angular for adding Angular into the mix.
下一步是什么? 客户端呢。 签出Angular基于令牌的身份验证以将Angular添加到混合中。
To see how to build a complete web app from scratch using Flask, check out our video series:
要了解如何使用Flask从头开始构建完整的Web应用,请观看我们的视频系列:
Free Bonus: Click here to get access to a free Flask + Python video tutorial that shows you how to build Flask web app, step-by-step.
免费红利: 单击此处可访问免费的Flask + Python视频教程 ,该教程逐步向您展示如何构建Flask Web应用程序。
Feel free to share your comments, questions, or tips in the comments below. The full code can be found in the flask-jwt-auth repository.
随时在下面的评论中分享您的评论,问题或提示。 完整的代码可以在flask-jwt-auth存储库中找到。
Cheers!
干杯!
翻译自: https://www.pybloggers.com/2017/01/token-based-authentication-with-flask-2/
flask身份验证