flask项目和 JWT ( flask_jwt_extended )

有权
2023-12-01
from flask import Flask
from flask import jsonify
from flask import request

from flask_jwt_extended import create_access_token
from flask_jwt_extended import get_jwt
from flask_jwt_extended import jwt_required
from flask_jwt_extended import JWTManager

app = Flask(__name__)

app.config["JWT_SECRET_KEY"] = "super-secret"  # Change this!
jwt = JWTManager(app)


@app.route("/login", methods=["POST"])
def login():
    username = request.json.get("username", None)
    password = request.json.get("password", None)
    if username != "test" or password != "test":
        return jsonify({"msg": "Bad username or password"}), 401

    # You can use the additional_claims argument to either add
    # custom claims or override default claims in the JWT.
    additional_claims = {"aud": "some_audience", "foo": "bar"}
    access_token = create_access_token(username, additional_claims=additional_claims)
    return jsonify(access_token=access_token)


# In a protected view, get the claims you added to the jwt with the
# get_jwt() method
@app.route("/protected", methods=["GET"])
@jwt_required()
def protected():
    claims = get_jwt()
    return jsonify(foo=claims["foo"])


if __name__ == "__main__":
    app.run()

flask 项目里flask_jwt_extended的使用。也可以只调用flask_jwt_extended的验证token的方法,使用装饰器来扩展自己的项目需求。我的需求就是算是避免单点登录,就是用户多次登录生成的jwt token在有效时间内token都有效,这是flask_jwt_extended的特点,但是我们只要最新token有效。下面4个步骤:1.JWT ~ 4.JWT

class Config(object):
    SECRET_KEY = "kq3oujee2kTfQUs8yCM6xX9Yjq52v54knA"
    
	# 1.JWT
    JWT_SECRET_KEY = "super-secret"  # Change this!
    JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=6)
from flask_jwt_extended import JWTManager
def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(Config)
    app.before_request(func)
    
	# 2.JWT
    jwt = JWTManager(app)
from flask_jwt_extended import create_access_token,jwt_required
from flask_jwt_extended import get_jwt_identity

@user_blue.route('/login', methods=['POST'])
def login():
    userName = data.get("userName")
    password = data.get("password")
    if not all([userName,password]):
        return jsonify({'code':-1, 'msg': '参数不足.', 'data': None})
    user = ...
        
    # 3.JWT , additional_claims可以是username字符串,可以是字典其他结构
    additional_claims = {"username":user['username'],"age": user['age']}
    jwt_token = create_access_token(additional_claims)
    try:
        session_redis.set(userName, jwt_token, ex=21600)
    except Exception as e:
        current_app.logger.error(e)
        return jsonify('redis数据库异常.')
    return jsonify({'username':user['username'],'token': jwt_token})
# 4.JWT
@user_blue.route('/read', methods=['POST'])
@jwt_required()
def read():
    current_user = get_jwt_identity()
    return jsonify(logged_in_as=current_user,vip='svip'), 200

utils创建登录认证装饰器decorators.py
自定义登录装饰器,验证JWT token,并保存至redis,始终保持用户唯一token,即用户再次登录旧的token失效。避免单点登录

from flask_jwt_extended import verify_jwt_in_request
from functools import wraps
from app import engine, session_redis
from flask import request
from flask_jwt_extended import get_jwt, get_jwt_identity, jwt_required

# 使用flask_jwt_extended的verify_jwt_in_request方法来验证前端传来的jwt token。
def login_required(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
    	# 4. JWT
        verify_jwt_in_request()
        authorization= (request.headers.get('Authorization')).split()
        request_token = authorization[1]
        user_info = get_jwt_identity()
        session_token = session_redis.get(user_info['username'])
        if request_token == session_token:
            return func(*args, *kwargs)
        else:
            return "重定向到登录页面login.html"
    return wrapper

 类似资料: