from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
class User(db.Model, UserMixin):
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(50))
hash_password = db.Column(db.String(200))
def __repr__(self):
return self.username
@property
def password(self):
raise AttributeError('`password` is not a readable attribute')
@password.setter
def password(self, password):
self.hash_password = generate_password_hash(password)
def verify_password(self, password):
return check_password_hash(self.hash_password, password)
import os
from typing import Optional
from datetime import timedelta
from flask import Flask, jsonify, views, session
from flask_sqlalchemy import SQLAlchemy
from flask_pydantic import validate
from flask_login import login_user, LoginManager, login_required
from flask_login import logout_user, current_user
from pydantic import BaseModel, constr, validator
from models import Role, to_json, User
app = Flask(__name__)
# 通过 app 配置数据库信息
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:149789@127.0.0.1:3306/sqlalchemy_test'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config["SECRET_KEY"] = os.urandom(10)
# 创建数据库实例
db = SQLAlchemy(app)
login_manager = LoginManager()
login_manager.session_protection = 'strong'
login_manager.init_app(app)
# 设置session的过期时间
# app.permanent_session_lifetime = timedelta(seconds=10)
# 自定义一个未登录返回的错误页面
@login_manager.unauthorized_handler
def unauthorized():
return jsonify({"msg": "请先登录再来访问"})
@login_manager.user_loader
def load_user(id):
return User.query.get(int(id))
class RolePageModel(BaseModel):
page: Optional[int] = None
size: Optional[int] = None
class LoginModel(BaseModel):
username: str
password: str
class RegisterModel(BaseModel):
username: str
password1: constr(min_length=6)
password2: constr(min_length=6)
# 单个校验
@validator('username')
def username_a(cls, v):
if "a" not in v:
raise ValueError("用户名不包含a")
return v
# 联合验证
@validator('password2')
def passwords_match(cls, v, values, **kwargs):
if 'password1' in values and v != values['password1']:
raise ValueError('两次密码不一致')
return v
class RoleView(views.MethodView):
@validate()
@login_required
def get(self, query: RolePageModel):
page = query.page
size = query.size
if not page:
page = 1
if not size:
size = 2
if size > 2:
size = 2
# 获取角色列表
try:
all_obj = db.session.query(Role).paginate(page, size).items
except Exception:
return jsonify({"status": 404, "msg": "没有内容了"})
ret_json = to_json(all_obj)
return jsonify(ret_json)
class RegisterView(views.MethodView):
@validate()
def post(self, body: RegisterModel):
username = body.username
password = body.password1
user = User(username=username)
user.password = password
db.session.add(user)
db.session.commit()
return jsonify({"msg": "用户%s注册成功" % username})
class LoginView(views.MethodView):
@validate()
def post(self, body: LoginModel):
username = body.username
password = body.password
user = User.query.filter(User.username == username).first()
if user:
if user.verify_password(password):
"""
permanent是“永久”的意思,
如果设置了此项为True,
意味着在permanent_session_lifetime过期时间内即使关闭浏览器,再次打开时session还有效
如果设置了此项为False,
即使在session的有效期内关闭浏览器,也会清空session,从而导致登录失效
还有就是,session.permanent只能在视图内设置,不能和过期时间一起设置
"""
session.permanent = True
login_user(user)
return jsonify({"msg": "登录成功"})
else:
return jsonify({"msg": "用户密码错误"})
else:
return jsonify({"msg": "没有用户, 请先注册"})
class LogoutView(views.MethodView):
def get(self):
logout_user()
return jsonify({"msg": "成功退出登录"})
app.add_url_rule("/role", view_func=RoleView.as_view("role"))
app.add_url_rule("/login", view_func=LoginView.as_view("login"))
app.add_url_rule("/logout", view_func=LogoutView.as_view("logout"))
app.add_url_rule("/register", view_func=RegisterView.as_view("register"))
if __name__ == '__main__':
app.run(debug=True)