flask基础网站--编号222

越宣
2023-12-01
flask-tutorial
├── flaskr/
│   ├── __init__.py
│   ├── db.py
│   ├── schema.sql
│   ├── auth.py
│   ├── blog.py
│   ├── templates/
│   │   ├── base.html
│   │   ├── auth/
│   │   │   ├── login.html
│   │   │   └── register.html
│   │   └── blog/
│   │       ├── create.html
│   │       ├── index.html
│   │       └── update.html
│   └── static/
│       └── style.css
├── tests/
│   ├── conftest.py
│   ├── data.sql
│   ├── test_factory.py
│   ├── test_db.py
│   ├── test_auth.py
│   └── test_blog.py
├── venv/
├── setup.py
└── MANIFEST.in

flaskr/templates/base.html:添加链接描述

flaskr/templates/auth/login.html:添加链接描述
flaskr/templates/auth/register.html:添加链接描述

flaskr/templates/blog/create.html:添加链接描述
flaskr/templates/blog/index.html:添加链接描述
flaskr/templates/blog/update.html:添加链接描述

flaskr/static/style.css:添加链接描述

flaskr

init.py

# coding=utf-8
# 作用:一是包含应用工厂;二是 告诉 Python flaskr 文件夹应当视作为一个包。
# https://dormousehole.readthedocs.io/en/latest/tutorial/factory.html
import os
from flask import Flask


# 一个应用工厂函数
def create_app(test_config=None):
    app = Flask(__name__, instance_relative_config = True)
    app.config.from_mapping(
        SECRET_KEY = 'dev',
        DATABASE = os.path.join(app.instance_path, 'flaskr.sqlite'))

    if test_config is None:
        app.config.from_pyfile('config.py', silent = True)
    else:
        app.config.from_mapping(test_config)
    try:
        os.makedirs(app.instance_path)
    except OSError:
        pass

    from . import db
    db.init_app(app)  # flask init-db命令:初始化数据库

    from . import auth
    app.register_blueprint(auth.bp)  # 注册 蓝图

    from . import blog
    app.register_blueprint(blog.bp)
    app.add_url_rule('/', endpoint = 'index')  # 上面没写这个是因为需要验证

    return app

db.py

# coding=utf-8
# https://dormousehole.readthedocs.io/en/latest/tutorial/database.html
import sqlite3
import click
from flask import current_app, g
from flask.cli import with_appcontext


def get_db( ):
    if 'db' not in g:
        g.db = sqlite3.connect(
            current_app.config['DATABASE'],
            detect_types = sqlite3.PARSE_DECLTYPES)
        # 告诉连接返回类似于字典的行,这样可以通过列名称来操作 数据。
        g.db.row_factory = sqlite3.Row
    return g.db


def close_db(e=None):
    db = g.pop('db', None)
    if db is not None:
        db.close( )


# 初始化数据库
def init_db( ):
    db = get_db( )
    with current_app.open_resource('schema.sql') as f:
        db.executescript(f.read( ).decode('utf8'))


@click.command('init-db')
@with_appcontext
def init_db_command( ):
    init_db( )
    click.echo('初始化数据库...')


def init_app(app):
    # 告诉 Flask 在返回响应后进行清理的时候调用此函数。
    app.teardown_appcontext(close_db)
    #  添加一个新的 可以与 flask 一起工作的命令。
    app.cli.add_command(init_db_command)

schema.sql

DROP TABLE IF EXISTS user;
DROP TABLE IF EXISTS post;

CREATE TABLE user (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  username TEXT UNIQUE NOT NULL,
  password TEXT NOT NULL
);

CREATE TABLE post (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  author_id INTEGER NOT NULL,
  created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  title TEXT NOT NULL,
  body TEXT NOT NULL,
  FOREIGN KEY (author_id) REFERENCES user (id)
);

auth.py

# coding=utf-8
import functools

from flask import (Blueprint, flash, g, redirect, render_template, request, session, url_for)
from werkzeug.security import check_password_hash, generate_password_hash
from .db import get_db

bp = Blueprint('auth', __name__, url_prefix = '/auth')


@bp.route('/register', methods = ('GET', 'POST'))
def register( ):
    error = None
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']
        db = get_db( )  # 获取数据库
        select_id_sql = db.execute('SELECT id FROM user WHERE username = ?', (username,)).fetchone( )
        if not username:
            error = '用户名为必填项.'
        elif not password:
            error = '密码是必需的.'
        elif select_id_sql is not None:
            error = '用户 {} 已注册.'.format(username)

        if error is None:
            db.execute('INSERT INTO user (username, password) VALUES (?, ?)',
                       (username, generate_password_hash(password)))
            db.commit( )
            # 生成的 URL 生成一个重定向响应
            return redirect(url_for('auth.login'))
        # 用于储存在渲染模块时可以调用的信息
        flash(error)
    # 返回一个模板
    return render_template('auth/register.html')


@bp.route('/login', methods = ('GET', 'POST'))
def login( ):
    error = None
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']
        db = get_db( )
        user = db.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone( )
        if user is None:
            error = '用户名不正确.'
        # 相同的方式哈希提交的 密码并安全的比较哈希值。
        elif not check_password_hash(user['password'], password):
            error = '密码错误.'

        if error is None:
            session.clear( )  # 清楚缓存
            session['user_id'] = user['id']
            return redirect(url_for('index'))
        # 用于储存在渲染模块时可以调用的信息
        flash(error)
    # 返回一个模板
    return render_template('auth/login.html')


@bp.before_app_request  # 注册一个 在视图函数之前运行的函数
def load_logged_in_user( ):
    """
    检查用户 id 是否已经储存在 session 中,并从数据库中获取用户数据,然后储存在 g.user 中
    :return:
    """
    user_id = session.get('user_id')
    if user_id is None:
        g.user = None
    else:
        g.user = get_db( ).execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone( )


@bp.route('/logout')
def logout( ):
    session.clear( )
    return redirect(url_for('index'))


def login_required(view):
    @functools.wraps(view)
    def wrapped_view(**kwargs):
        if g.user is None:
            return redirect(url_for('auth.login'))

        return view(**kwargs)

    return wrapped_view

blog.py

# coding=utf-8
from flask import (
    Blueprint, flash, g, redirect, render_template, request, url_for
)
from werkzeug.exceptions import abort

from .auth import login_required
from .db import get_db

bp = Blueprint('blog', __name__)


@bp.route('/')
def index( ):
    db = get_db( )
    posts = db.execute(
        'SELECT p.id, title, body, created, author_id, username'
        ' FROM post p JOIN user u ON p.author_id = u.id'
        ' ORDER BY created DESC'
    ).fetchall( )
    return render_template('blog/index.html', posts = posts)


@bp.route('/create', methods = ('GET', 'POST'))
@login_required
def create( ):
    if request.method == 'POST':
        title = request.form['title']
        body = request.form['body']
        error = None

        if not title:
            error = 'Title is required.'

        if error is not None:
            flash(error)
        else:
            db = get_db( )
            db.execute(
                'INSERT INTO post (title, body, author_id)'
                ' VALUES (?, ?, ?)',
                (title, body, g.user['id'])
            )
            db.commit( )
            return redirect(url_for('blog.index'))

    return render_template('blog/create.html')


def get_post(id, check_author=True):
    post = get_db( ).execute(
        'SELECT p.id, title, body, created, author_id, username'
        ' FROM post p JOIN user u ON p.author_id = u.id'
        ' WHERE p.id = ?',
        (id,)
    ).fetchone( )

    if post is None:
        abort(404, "Post id {0} doesn't exist.".format(id))

    if check_author and post['author_id'] != g.user['id']:
        abort(403)

    return post


@bp.route('/<int:id>/update', methods = ('GET', 'POST'))
@login_required
def update(id):
    post = get_post(id)

    if request.method == 'POST':
        title = request.form['title']
        body = request.form['body']
        error = None

        if not title:
            error = 'Title is required.'

        if error is not None:
            flash(error)
        else:
            db = get_db( )
            db.execute(
                'UPDATE post SET title = ?, body = ?'
                ' WHERE id = ?',
                (title, body, id)
            )
            db.commit( )
            return redirect(url_for('blog.index'))

    return render_template('blog/update.html', post = post)


@bp.route('/<int:id>/delete', methods = ('POST',))
@login_required
def delete(id):
    get_post(id)
    db = get_db( )
    db.execute('DELETE FROM post WHERE id = ?', (id,))
    db.commit( )
    return redirect(url_for('blog.index'))

 类似资料: