1.导入模块
import os
from flask import Flask
from flask_migrate import Migrate, MigrateCommand
from flask_script import Manager
from flask_sqlalchemy import SQLAlchemy
2.基本配置:
# 数据库路径
dburl = os.path.join(os.path.dirname(__file__), 'models.db')
# 配置数据库类型及路径
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + dburl
# 数据改变后自动提交
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
# 数据变化的自动警告关闭
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
# 创建迁移对象,参数为app,db
migrate = Migrate(app, db)
'''
在命令中中新增db命令,在db命令下捆绑所有的数据迁移操作
之后就可以python manage.py db init/migrate/upgrade/downgrade/show/history...
init初始化数据迁移环境,全局只会执行一次
migrate 生成数据迁移文件(对应着django里的makemigrations)
upgrade 升级,执行数据迁移(对应着django里的migrate
'''
manager.add_command('db', MigrateCommand)
3.常用方法:
# 创建数据模型
class User(db.Model):
__tablename__ = 't_user'
id = db.Column(db.Integer(), primary_key=True, autoincrement=True)
name = db.Column(db.String(20), unique=True, nullable=False)
password = db.Column(db.String(20), nullable=False)
email = db.Column('mail_box', db.String(120), unique=True, nullable=False)
gender = db.Column(db.Boolean(), default=None, nullable=False)
def __str__(self):
return 'User{name=%s,email=%s,password=%s,}' % (self.name, self.email,self.password,)
# 创建python manage.py createdb命令,创建数据库
@manager.command
def createdb():
db.create_all()
return '创建数据库成功'
# 创建python manage.py dropdb命令,删除数据库
@manager.command
def dropdb():
db.drop_all()
return '删除数据库成功'
# 往数据库插入数据
@app.route('/insert/')
def insert():
user1 = User(name='张三', password='123456', email='zhangsan@qq.com', gender=True)
user2 = User(name='李四', password='123456', email='lisi@qq.com', gender=True)
user3 = User(name='王五', password='123456', email='wangwu@qq.com', gender=True)
user4 = User(name='莫愁', password='123456', email='mochou@qq.com', gender=False)
user5 = User(name='凤姐', password='123456', email='fengjie@qq.com', gender=False)
db.session.add_all([user1, user2, user3, user4, user5])
return '插入成功'
# 修改数据
@app.route('/update/<uid>')
def update(uid):
user = User.query.get(uid)
if user:
user.password = '654321'
db.session.add(user)
return '修改成功'
else:
return '查无此人'
# 查询数据
@app.route('/find/')
def find():
ulist=User.query.all()
ret=';'.join([str(i) for i in ulist])
return ret
#条件查询
@app.route('/xfind/')
def xfind():
# 查找id大于3的数据
# user=User.query.filter(User.id>3).all()
# 查找男性
user=User.query.filter(User.gender==True).all()
ret=';'.join([str(k) for k in user])
return ret
# 利用sql语句进行查询
@app.route('/dosql/')
def dosql():
ret=db.session.execute('select gender,count(id) from t_user GROUP by gender').fetchall()
return str(ret)
if __name__ == '__main__':
manager.run()