软件架构设计思想,主要用在客户端和服务器交互类的软件设计中
可以帮助开发者快速实现API接口开发
CS,客户端和服务端这种架构模型中
主要研究网络和软件的交叉点,得到一个功能强,性能好,适宜通信的网络框架
表现层状态转换
重要概念
route.py
from flask_restful import Api
from Book.views import BooksResource
#实例化Api对象(添加前缀去分路由)
books_api = Api(prefix='/books')
#① 代表资源 ② 代表路由
books_api.add_resource(BooksResource, "/books/")
views.py
#将返回的结果进行模板格式化
#用模板映射数据,模板中声明的格式,如果数据中存在,直接获取,不存在使用默认值填充,模板中声明的数据格式,如果数据中数据过多,会自动优化
book_fields = {
"b_name": fields.String,
"b_price": fields.Float,
"id": fields.Integer,
"b_author": fields.String
}
#模板级联写法
return_fields = {
"status": fields.Integer,
"msg": fields.String,
# "data": fields.Nested(student_fields)
"data": fields.List(fields.Nested(student_fields))
#Nested级联别的字段
}
class BooksResource(Resource):
def get(self):
books = Book.query.all()
data = {
"status": 200,
"msg": "ok",
#调用对象的to_dict方法将对象转换为字典
"data": [book.to_dict() for book in books]
}
return data
##序列化输出的三种方式:1。 @marshal_with(book_fields)
2."data": marshal(book, book_fields)
3.return marshal(book, book_fields)
# @marshal_with(book_fields)
def post(self):
b_name = request.form.get("b_name")
b_price = request.form.get("b_price")
book = Book(b_name=b_name, b_price=b_price)
if not book.save():
data = {
"msg": "save fail",
"status": 400
}
return data
data = {
"msg": "save success",
"status": 201,
# 格式化数据 ①数据 ②模板
# "data": marshal(book, book_fields)
"data": book
}
# return data
return marshal(book, book_fields)
项目工程里的views
from Book.route import books_api
def init_api(app):
books_api.init_app(app)
__ init__
''' # 初始化路由
init_api(app)'''
# 先声明请求转换器
parse = reqparse.RequestParser()
# 添加请求参数规则
#要求一个值传递的参数,只需要添加 required=True 来调用 add_argument()。
#如果你要接受一个键有多个值的话,你可以传入 action='append'
#如果由于某种原因,你想要以不同的名称存储你的参数一旦它被解析的时候,你可以使用 dest kwarg。
#默认下,RequestParser 试着从 flask.Request.values,以及 flask.Request.json 解析值。在 add_argument() 中使用 location 参数可以指定解析参数的位置。flask.Request 中任何变量都能被使用。('form','args','headers','cookies')
#通过传入一个列表到 location 中可以指定 多个 参数位置:
parse.add_argument("username", required=True, type=str, help="username can't be blank")
parse.add_argument("password", required=True, type=str, help="password can't be blank")
parse.add_argument("action", required=True, type=str, help="please supply action")
#
parse.add_argument("hobby", dest="aihao",action="append")
parse.add_argument("csrftoken", location=["cookies", "args"], action="append")
class UsersResource(Resource):
def post(self):
# 转换参数 根据规则去过滤
args = parse.parse_args()
# form 获取html中form表单中的数据,如果客户端不是html 也同样可以提交表单数据
action =args.get("action")
if action == "register":
return self.do_register()
elif action == "login":
return self.do_login()
def do_register(self):
args = parse.parse_args()
username = args.get("username")
password = args.get("password")
# if not username:
# # return {"msg": "username can't be blank"}
# abort(400, message="username can't be blank")
user = User(username=username, password=password)
if not user.save():
data = {
"msg": "register fail",
"status": 400
}
return data
data = {
"msg": "register success",
"status": 201
}
return data
def do_login(self):
username = request.form.get("username")
password = request.form.get("password")
users = User.query.filter(User.username.__eq__(username)).all()
if not users:
data = {
"msg": "user doesn't exist",
"status": 401
}
return data
user = users[0]
if not user.check_password(password):
data = {
"msg": "password error",
"status": 401
}
return data
# 生成令牌 唯一
token = uuid.uuid4().hex
# 服务器记录令牌
cache.set(token, user.id, timeout=60*60*24*7)
# 令牌返回给客户端
data = {
"msg": "login sucess",
"status": 200,
"data": {
"token": token
}
}
return data
def login_required(fun):
def wrapper(*args, **kwargs):
token = request.form.get("token")
#获取token
if not token:
abort(400, message="autn fail")
user_id = cache.get(token)
#才缓存里用token获取u_id
if not user_id:
abort(400, message="token error")
user = User.query.get(user_id)
#获取用户
if not user:
abort(401, message="user doesn't exist")
g.user = user
return fun(*args, **kwargs)
return wrapper
blog_fields = {
"id": fields.Integer,
"title": fields.String,
"content": fields.String
}
return_fields = {
"msg": fields.String,
"status": fields.Integer,
"data": fields.Nested(blog_fields)
}
parse = reqparse.RequestParser()
parse.add_argument("title", required=True, help="请填写标题")
parse.add_argument("content", required=True, help="请填写内容")
class BlogsResource(Resource):
@marshal_with(return_fields)
def get(self):
blogs = Blog.query.all()
data = {
"msg": "ok",
"status": 200,
"data": blogs
}
return data
@login_required
def post(self):
user = g.user
args = parse.parse_args()
title = args.get("title")
content = args.get("content")
blog = Blog(title=title, content=content, b_owner=user.id)
if not blog.save():
data = {
"msg": "create fail",
"status": 400
}
return data
data = {
"msg": "ok",
"status": 200,
"data": blog
}
return marshal(data, return_fields)
class BlogResource(Resource):
def get(self, pk):
blog = Blog.query.get(pk)
if not blog:
abort(404, message="blog has been deteled")
data = {
"msg": "ok",
"status": 200,
"data": blog
}
return marshal(data, return_fields)
@login_required
def patch(self, pk):
blog = Blog.query.get(pk)
if not blog:
abort(404, message="blog has been deteled")
user = g.user
if blog.b_owner != user.id:
abort(403, message="can't modify others")
title = request.form.get("title")
content = request.form.get("content")
# if title:
# blog.title = title
#
# if content:
# blog.content = content
blog.title = title or blog.title
blog.content = content or blog.content
if not blog.save():
data = {
"msg": "modify fail",
"status": 400
}
return data
data = {
"msg": "ok",
"status": 200,
"data": blog
}
return marshal(data, return_fields)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(32), unique=True, nullable=False)
_password = db.Column(db.String(256))
@property
def password(self):
return self._password
@password.setter
def password(self, password):
# self._password = hashlib.new("md5", password.encode("utf-8")).hexdigest()
self._password = generate_password_hash(password)
def check_password(self, password):
# return self._password == hashlib.new("md5", password.encode("utf-8")).hexdigest()
return check_password_hash(self._password, password)