flask-tutorial
├── flaskr/
│ ├── __init__.py
│ ├── db.py
│ ├── schema.sql
│ ├── auth.py
│ ├── blog.py
│ ├── templates/
│ │ ├── base.html
│ │ ├── auth/
│ │ │ ├── login.html
│ │ │ └── register.html
│ │ └── blog/
│ │ ├── create.html
│ │ ├── index.html
│ │ └── update.html
│ └── static/
│ └── style.css
├── tests/
│ ├── conftest.py
│ ├── data.sql
│ ├── test_factory.py
│ ├── test_db.py
│ ├── test_auth.py
│ └── test_blog.py
├── venv/
├── setup.py
└── MANIFEST.in
flaskr/templates/base.html:添加链接描述
flaskr/templates/auth/login.html:添加链接描述
flaskr/templates/auth/register.html:添加链接描述
flaskr/templates/blog/create.html:添加链接描述
flaskr/templates/blog/index.html:添加链接描述
flaskr/templates/blog/update.html:添加链接描述
flaskr/static/style.css:添加链接描述
init.py
# coding=utf-8
# 作用:一是包含应用工厂;二是 告诉 Python flaskr 文件夹应当视作为一个包。
# https://dormousehole.readthedocs.io/en/latest/tutorial/factory.html
import os
from flask import Flask
# 一个应用工厂函数
def create_app(test_config=None):
app = Flask(__name__, instance_relative_config = True)
app.config.from_mapping(
SECRET_KEY = 'dev',
DATABASE = os.path.join(app.instance_path, 'flaskr.sqlite'))
if test_config is None:
app.config.from_pyfile('config.py', silent = True)
else:
app.config.from_mapping(test_config)
try:
os.makedirs(app.instance_path)
except OSError:
pass
from . import db
db.init_app(app) # flask init-db命令:初始化数据库
from . import auth
app.register_blueprint(auth.bp) # 注册 蓝图
from . import blog
app.register_blueprint(blog.bp)
app.add_url_rule('/', endpoint = 'index') # 上面没写这个是因为需要验证
return app
db.py
# coding=utf-8
# https://dormousehole.readthedocs.io/en/latest/tutorial/database.html
import sqlite3
import click
from flask import current_app, g
from flask.cli import with_appcontext
def get_db( ):
if 'db' not in g:
g.db = sqlite3.connect(
current_app.config['DATABASE'],
detect_types = sqlite3.PARSE_DECLTYPES)
# 告诉连接返回类似于字典的行,这样可以通过列名称来操作 数据。
g.db.row_factory = sqlite3.Row
return g.db
def close_db(e=None):
db = g.pop('db', None)
if db is not None:
db.close( )
# 初始化数据库
def init_db( ):
db = get_db( )
with current_app.open_resource('schema.sql') as f:
db.executescript(f.read( ).decode('utf8'))
@click.command('init-db')
@with_appcontext
def init_db_command( ):
init_db( )
click.echo('初始化数据库...')
def init_app(app):
# 告诉 Flask 在返回响应后进行清理的时候调用此函数。
app.teardown_appcontext(close_db)
# 添加一个新的 可以与 flask 一起工作的命令。
app.cli.add_command(init_db_command)
schema.sql
DROP TABLE IF EXISTS user;
DROP TABLE IF EXISTS post;
CREATE TABLE user (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL
);
CREATE TABLE post (
id INTEGER PRIMARY KEY AUTOINCREMENT,
author_id INTEGER NOT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
title TEXT NOT NULL,
body TEXT NOT NULL,
FOREIGN KEY (author_id) REFERENCES user (id)
);
auth.py
# coding=utf-8
import functools
from flask import (Blueprint, flash, g, redirect, render_template, request, session, url_for)
from werkzeug.security import check_password_hash, generate_password_hash
from .db import get_db
bp = Blueprint('auth', __name__, url_prefix = '/auth')
@bp.route('/register', methods = ('GET', 'POST'))
def register( ):
error = None
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
db = get_db( ) # 获取数据库
select_id_sql = db.execute('SELECT id FROM user WHERE username = ?', (username,)).fetchone( )
if not username:
error = '用户名为必填项.'
elif not password:
error = '密码是必需的.'
elif select_id_sql is not None:
error = '用户 {} 已注册.'.format(username)
if error is None:
db.execute('INSERT INTO user (username, password) VALUES (?, ?)',
(username, generate_password_hash(password)))
db.commit( )
# 生成的 URL 生成一个重定向响应
return redirect(url_for('auth.login'))
# 用于储存在渲染模块时可以调用的信息
flash(error)
# 返回一个模板
return render_template('auth/register.html')
@bp.route('/login', methods = ('GET', 'POST'))
def login( ):
error = None
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
db = get_db( )
user = db.execute('SELECT * FROM user WHERE username = ?', (username,)).fetchone( )
if user is None:
error = '用户名不正确.'
# 相同的方式哈希提交的 密码并安全的比较哈希值。
elif not check_password_hash(user['password'], password):
error = '密码错误.'
if error is None:
session.clear( ) # 清楚缓存
session['user_id'] = user['id']
return redirect(url_for('index'))
# 用于储存在渲染模块时可以调用的信息
flash(error)
# 返回一个模板
return render_template('auth/login.html')
@bp.before_app_request # 注册一个 在视图函数之前运行的函数
def load_logged_in_user( ):
"""
检查用户 id 是否已经储存在 session 中,并从数据库中获取用户数据,然后储存在 g.user 中
:return:
"""
user_id = session.get('user_id')
if user_id is None:
g.user = None
else:
g.user = get_db( ).execute('SELECT * FROM user WHERE id = ?', (user_id,)).fetchone( )
@bp.route('/logout')
def logout( ):
session.clear( )
return redirect(url_for('index'))
def login_required(view):
@functools.wraps(view)
def wrapped_view(**kwargs):
if g.user is None:
return redirect(url_for('auth.login'))
return view(**kwargs)
return wrapped_view
blog.py
# coding=utf-8
from flask import (
Blueprint, flash, g, redirect, render_template, request, url_for
)
from werkzeug.exceptions import abort
from .auth import login_required
from .db import get_db
bp = Blueprint('blog', __name__)
@bp.route('/')
def index( ):
db = get_db( )
posts = db.execute(
'SELECT p.id, title, body, created, author_id, username'
' FROM post p JOIN user u ON p.author_id = u.id'
' ORDER BY created DESC'
).fetchall( )
return render_template('blog/index.html', posts = posts)
@bp.route('/create', methods = ('GET', 'POST'))
@login_required
def create( ):
if request.method == 'POST':
title = request.form['title']
body = request.form['body']
error = None
if not title:
error = 'Title is required.'
if error is not None:
flash(error)
else:
db = get_db( )
db.execute(
'INSERT INTO post (title, body, author_id)'
' VALUES (?, ?, ?)',
(title, body, g.user['id'])
)
db.commit( )
return redirect(url_for('blog.index'))
return render_template('blog/create.html')
def get_post(id, check_author=True):
post = get_db( ).execute(
'SELECT p.id, title, body, created, author_id, username'
' FROM post p JOIN user u ON p.author_id = u.id'
' WHERE p.id = ?',
(id,)
).fetchone( )
if post is None:
abort(404, "Post id {0} doesn't exist.".format(id))
if check_author and post['author_id'] != g.user['id']:
abort(403)
return post
@bp.route('/<int:id>/update', methods = ('GET', 'POST'))
@login_required
def update(id):
post = get_post(id)
if request.method == 'POST':
title = request.form['title']
body = request.form['body']
error = None
if not title:
error = 'Title is required.'
if error is not None:
flash(error)
else:
db = get_db( )
db.execute(
'UPDATE post SET title = ?, body = ?'
' WHERE id = ?',
(title, body, id)
)
db.commit( )
return redirect(url_for('blog.index'))
return render_template('blog/update.html', post = post)
@bp.route('/<int:id>/delete', methods = ('POST',))
@login_required
def delete(id):
get_post(id)
db = get_db( )
db.execute('DELETE FROM post WHERE id = ?', (id,))
db.commit( )
return redirect(url_for('blog.index'))