Flask 学习-27.flask_jwt_extended插件学习current_user的使用

吕鹏
2023-12-01

前言

flask_jwt_extended 最基本的使用只需要掌握三个函数:

  • create_access_token() 用来创建 Token 令牌
  • get_jwt_identity() 用来根据令牌取得之前的 identity 信息
  • jwt_required() 这是一个装饰器,用来保护 flask 节点

简单示例

以下是官方文档给的简单示例https://flask-jwt-extended.readthedocs.io/en/latest/basic_usage/

from flask import Flask
from flask import jsonify
from flask import request

from flask_jwt_extended import create_access_token
from flask_jwt_extended import get_jwt_identity
from flask_jwt_extended import jwt_required
from flask_jwt_extended import JWTManager

app = Flask(__name__)

# Setup the Flask-JWT-Extended extension
app.config["JWT_SECRET_KEY"] = "super-secret"  # Change this!
jwt = JWTManager(app)


# Create a route to authenticate your users and return JWTs. The
# create_access_token() function is used to actually generate the JWT.
@app.route("/login", methods=["POST"])
def login():
    username = request.json.get("username", None)
    password = request.json.get("password", None)
    if username != "test" or password != "test":
        return jsonify({"msg": "Bad username or password"}), 401

    access_token = create_access_token(identity=username)
    return jsonify(access_token=access_token)


# Protect a route with jwt_required, which will kick out requests
# without a valid JWT present.
@app.route("/protected", methods=["GET"])
@jwt_required()
def protected():
    # Access the identity of the current user with get_jwt_identity
    current_user = get_jwt_identity()
    return jsonify(logged_in_as=current_user), 200


if __name__ == "__main__":
    app.run()

需要token验证的接口,在请求头部加如下格式token

Authorization: Bearer <access_token>

user_lookup_loader() 使用

在大多数 Web 应用程序中,重要的是能够访问正在访问受保护路由的用户。我们提供了几个回调函数,可以在使用 JWT 时实现无缝连接。
第一个是user_identity_loader(),它将User用于创建 JWT 的任何对象转换为 JSON 可序列化格式。
另一方面,当请求中存在 JWT 时,您可以使用它user_lookup_loader() 来自动加载您的对象。User加载的用户在您的受保护路由中可用current_user。

以下是官方文档给的示例

from hmac import compare_digest

from flask import Flask
from flask import jsonify
from flask import request
from flask_sqlalchemy import SQLAlchemy

from flask_jwt_extended import create_access_token
from flask_jwt_extended import current_user
from flask_jwt_extended import jwt_required
from flask_jwt_extended import JWTManager


app = Flask(__name__)


app.config["JWT_SECRET_KEY"] = "super-secret"  # Change this!
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite://"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False


jwt = JWTManager(app)
db = SQLAlchemy(app)


class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.Text, nullable=False, unique=True)
    full_name = db.Column(db.Text, nullable=False)

    # NOTE: In a real application make sure to properly hash and salt passwords
    def check_password(self, password):
        return compare_digest(password, "password")


# Register a callback function that takes whatever object is passed in as the
# identity when creating JWTs and converts it to a JSON serializable format.
@jwt.user_identity_loader
def user_identity_lookup(user):
    return user.id


# Register a callback function that loads a user from your database whenever
# a protected route is accessed. This should return any python object on a
# successful lookup, or None if the lookup failed for any reason (for example
# if the user has been deleted from the database).
@jwt.user_lookup_loader
def user_lookup_callback(_jwt_header, jwt_data):
    identity = jwt_data["sub"]
    return User.query.filter_by(id=identity).one_or_none()


@app.route("/login", methods=["POST"])
def login():
    username = request.json.get("username", None)
    password = request.json.get("password", None)

    user = User.query.filter_by(username=username).one_or_none()
    if not user or not user.check_password(password):
        return jsonify("Wrong username or password"), 401

    # Notice that we are passing in the actual sqlalchemy user object here
    access_token = create_access_token(identity=user)
    return jsonify(access_token=access_token)


@app.route("/who_am_i", methods=["GET"])
@jwt_required()
def protected():
    # We can now access our sqlalchemy User object via `current_user`.
    return jsonify(
        id=current_user.id,
        full_name=current_user.full_name,
        username=current_user.username,
    )


if __name__ == "__main__":
    db.create_all()
    db.session.add(User(full_name="Bruce Wayne", username="batman"))
    db.session.add(User(full_name="Ann Takamaki", username="panther"))
    db.session.add(User(full_name="Jester Lavore", username="little_sapphire"))
    db.session.commit()

    app.run()

详细讲解

前面简单示例中使用get_jwt_identity()方法获取identity 信息,也就是username。在上面示例中使用current_user 也可以获取到当前用户对象
current_user 必须要在@jwt_required()装饰器中使用。

from flask_jwt_extended import (
    create_access_token, create_refresh_token, jwt_required, get_jwt_identity, current_user
)


class UserInfo(Resource):

    @jwt_required()
    def get(self):
        """根据token 解析用户username"""
        print(f"xxxxxxxxxxxxxx: {current_user}")
        return {
            "msg": 'success'
        }

如果直接使用会出现报错RuntimeError: You must provide a @jwt.user_lookup_loader callback to use this method
提示我们需先使用@jwt.user_lookup_loader 提供callback 回调函数

@jwt.user_identity_loader
def user_identity_lookup(user):
    """注册一个回调函数,该函数在创建JWT时将传入的任何对象作为标识,
    并将其转换为JSON可序列化格式。"""
    return user.id


@jwt.user_lookup_loader
def user_lookup_callback(_jwt_header, jwt_data):
    """注册一个回调函数,每当访问受保护的路由时,该函数将从数据库中加载用户。
    如果查找成功,则应返回任何python对象,如果查找因任何原因失败
    (例如,如果用户已从数据库中删除),则应返回None
    """
    identity = jwt_data["sub"]
    return Users.query.filter_by(id=identity).one_or_none()

之前生成token是根据username 传一个字符串得到token,现在可以改成传user对象了

access_token = create_access_token(identity=user)

完整代码示例

class Login(Resource):

    def post(self):
        args = reqparse.RequestParser() \
            .add_argument('username', type=str, location='json', required=True, help="用户名不能为空") \
            .add_argument("password", type=str, location='json', required=True, help="密码不能为空") \
            .parse_args()
        print(f"args: {args}")
        user = Users.query.filter_by(username=args.get('username')).first()
        if not user:
            return {"code": 222, "msg": f"用户名或密码不正确"}
        else:
            if not user.is_active:
                return {"code": 333, "msg": f"{user.username} not active"}
            else:
                # 验证密码
                if user.verify_password(args.get('password')):
                    access_token = create_access_token(identity=user)
                    return jsonify({
                        "code": "0",
                        "message": "success",
                        "data": {

                            "access_token": access_token,
                            "userid": user.id
                        }
                    })
                else:
                    return {"code": 222, "msg": f"用户名或密码不正确"}

# 注册
api.add_resource(Login, '/api/v1/login')

完成上面代码后current_user 返回的就是一个user对象了

class UserInfo(Resource):

    @jwt_required()
    def get(self):
        """根据token 解析用户username"""
        print(f"xxxxxxxxxxxxxx: {current_user}")
        return {
            "msg": 'success',
            "data": {
                "id": current_user.id,
                "username": current_user.username
            }
        }


# 注册
api.add_resource(UserInfo, '/api/v1/userinfo')

接口测试

GET http://127.0.0.1:5000/api/v1/userinfo HTTP/1.1
User-Agent: Fiddler
Host: 127.0.0.1:5000
Content-Type: application/json
Content-Length: 0
Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTY2MTk5NTAyNiwianRpIjoiNzA4MWMyOTYtZDZlOC00YThmLWIwOGMtZjg0YzhiNzIyM2JhIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6NSwibmJmIjoxNjYxOTk1MDI2LCJleHAiOjE2NjE5OTg2MjZ9.GHqjOjfC1RCoNe0fCq-mxIP0_mE4ojiUMwd1RzSY8C8



HTTP/1.1 200 OK
Server: Werkzeug/2.2.2 Python/3.8.5
Date: Thu, 01 Sep 2022 01:30:31 GMT
Content-Type: application/json
Content-Length: 91
Connection: close

{
    "msg": "success",
    "data": {
        "id": 5,
        "username": "test5"
    }
}
 类似资料: