FlaskRestful

范霄
2023-12-01

Flask RESTFul

Restful官方文档

  • 软件架构设计思想,主要用在客户端和服务器交互类的软件设计中

  • 可以帮助开发者快速实现API接口开发

  • CS,客户端和服务端这种架构模型中

  • 主要研究网络和软件的交叉点,得到一个功能强,性能好,适宜通信的网络框架

  • 表现层状态转换

    • 主语 (资源)任何一个网络实体,都是一个资源
    • URI 每个URI代表一种资源
    • 资源展现给我们的形式就叫做表现层
    • 通过HTTP的请求谓词来实现表现层转换
  • 重要概念

    • URI
    • HTTP请求谓词
    • RESTful API中的最大难点:对象转换为JSON,过程叫做序列化

基本写法

route.py

from flask_restful import Api

from Book.views import BooksResource
#实例化Api对象(添加前缀去分路由)
books_api = Api(prefix='/books')
#① 代表资源     ② 代表路由
books_api.add_resource(BooksResource, "/books/")

views.py

#将返回的结果进行模板格式化
#用模板映射数据,模板中声明的格式,如果数据中存在,直接获取,不存在使用默认值填充,模板中声明的数据格式,如果数据中数据过多,会自动优化
book_fields = {
    "b_name": fields.String,
    "b_price": fields.Float,
    "id": fields.Integer,
    "b_author": fields.String
}

#模板级联写法
return_fields = {
    "status": fields.Integer,
    "msg": fields.String,
    # "data": fields.Nested(student_fields)
    "data": fields.List(fields.Nested(student_fields))
    #Nested级联别的字段
}

class BooksResource(Resource):

    def get(self):
        books = Book.query.all()
        data = {
            "status": 200,
            "msg": "ok",
            #调用对象的to_dict方法将对象转换为字典
            "data": [book.to_dict() for book in books]
        }
        return data
    
    ##序列化输出的三种方式:1。 @marshal_with(book_fields)
    2."data": marshal(book, book_fields)
    3.return marshal(book, book_fields)


    # @marshal_with(book_fields)
    def post(self):

        b_name = request.form.get("b_name")
        b_price = request.form.get("b_price")

        book = Book(b_name=b_name, b_price=b_price)

        if not book.save():

            data = {
                "msg": "save fail",
                "status": 400
            }

            return data

        data = {
            "msg": "save success",
            "status": 201,
            # 格式化数据     ①数据 ②模板
            # "data": marshal(book, book_fields)
            "data": book
        }

        # return data
        return marshal(book, book_fields)

项目工程里的views

from Book.route import books_api

def init_api(app):
    books_api.init_app(app)
    

__ init__

'''  # 初始化路由
    init_api(app)'''

中级写法

输入

views.py(token的用法)

# 先声明请求转换器
parse = reqparse.RequestParser()
# 添加请求参数规则
#要求一个值传递的参数,只需要添加 required=True 来调用 add_argument()。
#如果你要接受一个键有多个值的话,你可以传入 action='append'
#如果由于某种原因,你想要以不同的名称存储你的参数一旦它被解析的时候,你可以使用 dest kwarg。
#默认下,RequestParser 试着从 flask.Request.values,以及 flask.Request.json 解析值。在 add_argument() 中使用 location 参数可以指定解析参数的位置。flask.Request 中任何变量都能被使用。('form','args','headers','cookies')
#通过传入一个列表到 location 中可以指定 多个 参数位置:

parse.add_argument("username", required=True, type=str, help="username can't be blank")
parse.add_argument("password", required=True, type=str, help="password can't be blank")
parse.add_argument("action", required=True, type=str, help="please supply action")
#
parse.add_argument("hobby", dest="aihao",action="append")
parse.add_argument("csrftoken", location=["cookies", "args"], action="append")




class UsersResource(Resource):
    def post(self):
        # 转换参数    根据规则去过滤
        args = parse.parse_args()
        # form 获取html中form表单中的数据,如果客户端不是html  也同样可以提交表单数据
        action =args.get("action")

        if action == "register":
            return self.do_register()
        elif action == "login":
            return self.do_login()

    def do_register(self):
        args = parse.parse_args()
        username = args.get("username")
        password = args.get("password")

        # if not username:
        #     # return {"msg": "username can't be blank"}
        #     abort(400, message="username can't be blank")

        user = User(username=username, password=password)
        if not user.save():
            data = {
                "msg": "register fail",
                "status": 400
            }

            return data

        data = {
            "msg": "register success",
            "status": 201
        }

        return data

    def do_login(self):

        username = request.form.get("username")
        password = request.form.get("password")

        users = User.query.filter(User.username.__eq__(username)).all()

        if not users:

            data = {
                "msg": "user doesn't exist",
                "status": 401
            }

            return data

        user = users[0]

        if not user.check_password(password):

            data = {
                "msg": "password error",
                "status": 401
            }

            return data
        # 生成令牌    唯一
        token = uuid.uuid4().hex
        # 服务器记录令牌
        cache.set(token, user.id, timeout=60*60*24*7)

        # 令牌返回给客户端
        data = {
            "msg": "login sucess",
            "status": 200,
            "data": {
                "token": token
            }
        }
        return data

装饰器实现用户验证

def login_required(fun):
    def wrapper(*args, **kwargs):
        token = request.form.get("token")
        #获取token
        if not token:
            abort(400, message="autn fail")
        user_id = cache.get(token)
      #才缓存里用token获取u_id
        if not user_id:
            abort(400, message="token error")
        user = User.query.get(user_id)
        #获取用户
        if not user:
            abort(401, message="user doesn't exist")
        g.user = user
        return fun(*args, **kwargs)
    return wrapper

综合用法

blog_fields = {
    "id": fields.Integer,
    "title": fields.String,
    "content": fields.String
}

return_fields = {
    "msg": fields.String,
    "status": fields.Integer,
    "data": fields.Nested(blog_fields)
}

parse = reqparse.RequestParser()
parse.add_argument("title", required=True, help="请填写标题")
parse.add_argument("content", required=True, help="请填写内容")


class BlogsResource(Resource):

    @marshal_with(return_fields)
    def get(self):
        blogs = Blog.query.all()

        data = {
            "msg": "ok",
            "status": 200,
            "data": blogs
        }

        return data

    @login_required
    def post(self):
        user = g.user

        args = parse.parse_args()

        title = args.get("title")

        content = args.get("content")

        blog = Blog(title=title, content=content, b_owner=user.id)

        if not blog.save():
            data = {
                "msg": "create fail",
                "status": 400
            }
            return data

        data = {
            "msg": "ok",
            "status": 200,
            "data": blog
        }

        return marshal(data, return_fields)


class BlogResource(Resource):

    def get(self, pk):
        blog = Blog.query.get(pk)

        if not blog:
            abort(404, message="blog has been deteled")

        data = {
            "msg": "ok",
            "status": 200,
            "data": blog
        }

        return marshal(data, return_fields)

    @login_required
    def patch(self, pk):

        blog = Blog.query.get(pk)

        if not blog:
            abort(404, message="blog has been deteled")

        user = g.user

        if blog.b_owner != user.id:
            abort(403, message="can't modify others")

        title = request.form.get("title")

        content = request.form.get("content")

        # if title:
        #     blog.title = title
        #
        # if content:
        #     blog.content = content

        blog.title = title or blog.title

        blog.content = content or blog.content

        if not blog.save():
            data = {
                "msg": "modify fail",
                "status": 400
            }
            return data

        data = {
            "msg": "ok",
            "status": 200,
            "data": blog
        }

        return marshal(data, return_fields)

拓展:自带密码加密和校验

class User(db.Model):

id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(32), unique=True, nullable=False)
_password = db.Column(db.String(256))

@property
def password(self):
    return self._password

@password.setter
def password(self, password):
    # self._password = hashlib.new("md5", password.encode("utf-8")).hexdigest()
    self._password = generate_password_hash(password)

def check_password(self, password):
    # return self._password == hashlib.new("md5", password.encode("utf-8")).hexdigest()
    return check_password_hash(self._password, password)

 类似资料:

相关阅读

相关文章

相关问答