要说flask-sqlalchemy就先要知道SQLAchemy,因为flask-sqlalchemy就是对SQLAlchemy的封装。
什么是SQLAlchemy?
SQLAlchemy是python中的一个orm框架,就是将对对象的操作(增,删改等等),转化成sql语句,然后再执行sql语句对数据库进行操作。使得对数据库的操作非常容易。
SQLAlchemy的简单使用
连接数据库:
from sqlalchemy import create_engine
engine = create_engine(
# 连接的数据库
"mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 获取连接
con = engine.raw_connection()
# 获取cursor
cur = con.cursor()
# 简单使用(与pymysql中cursor一样使用)
cursor.execute("select * from t1")
result = cursor.fetchall()
# 关闭连接
cursor.close()
con.close()
根据类创建表
# declarative_base是类的父类
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Column, String, Integer, ForeignKey
Base = declarative_base()
# 创建单表
# 要继承Base
class user(Base):
# 首先要设置表名
__tablename__ = "user"
# 字段uid
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
email = Column(String(32), unique=True)
# 一对多的表结构
class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='篮球')
class Person(Base):
__tablename__ = 'person'
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
hobby_id = Column(Integer, ForeignKey("hobby.id"))
# 多对多示例
class Server2Group(Base):
__tablename__ = 'server2group'
id = Column(Integer, primary_key=True, autoincrement=True)
server_id = Column(Integer, ForeignKey('server.id'))
group_id = Column(Integer, ForeignKey('group.id'))
class Group(Base):
__tablename__ = 'group'
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)
class Server(Base):
__tablename__ = 'server'
id = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(64), unique=True, nullable=False)
# 创建数据库连接池
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 创建表
Base.metadata.create_all(engine)
# 删除表
Base.metadata.drop_all(engine)
from flask_sqlalchemy import *
from flask-sqlalchemy.orm import sessionmaker
# 创建连接池
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/s6",
max_overflow=0,
pool_size=5,
...
)
# 获取连接
Session = sessionmaker(bind=engine)
# 每次执行数据库操作时,都需要创建一个session
# 获取session的方式有两种
# 第一种
session = Session()
# 第二种
# 这是对第一种方式进行了封装,使用threading.local,给每一个线程都创建一个
session = scoped_session(Session)
# 假设有一个对象User,并由user表
session.add(obj1) # 添加一条数据
session.add_all([obj1, obj2, ...]) # 添加多条数
session.query(Uesr).all() # 查询表中所有数据,返回一个对象列表
session.query(Uesr).filter(User.id > 2) # 根据条件查询表中数据,返回一个对象列表
session.query(Uesr).filter(User.id > 2).delete() # 根据条件查询表中数据,删除
session.query(Uesr).filter(User.id = 2).update("id": "5") # 根据条件查询表中数据,更新
session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
增删改都要commit
r1 = session.query(Users).all()
# 查询字段并重命名
r2 = session.query(Users.name.label('xx'), Users.age).all()
r3 = session.query(Users).filter(Users.name == "alex").all()
r4 = session.query(Users).filter_by(name='alex').all()
r5 = session.query(Users).filter_by(name='alex').first()
r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()
r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import *
from sqlalchemy.orm import relationship
Base = declarative_base()
# ##################### 一对多示例 #########################
class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='篮球')
class Person(Base):
__tablename__ = 'person'
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
hobby_id = Column(Integer, ForeignKey("hobby.id"))
# 与生成表结构无关,仅用于查询方便
hobby = relationship("Hobby", backref='pers')
# ##################### 多对多示例 #########################
class Server2Group(Base):
__tablename__ = 'server2group'
id = Column(Integer, primary_key=True, autoincrement=True)
server_id = Column(Integer, ForeignKey('server.id'))
group_id = Column(Integer, ForeignKey('group.id'))
class Group(Base):
__tablename__ = 'group'
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)
# 与生成表结构无关,仅用于查询方便
servers = relationship('Server', secondary='server2group', backref='groups')
class Server(Base):
__tablename__ = 'server'
id = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(64), unique=True, nullable=False)
# 使用relationship正向查询
"""
v = session.query(Group).first()
print(v.name)
print(v.servers)
"""
# 使用relationship反向查询
"""
v = session.query(Server).first()
print(v.hostname)
print(v.groups)
"""
relationship第一个参数是关联的类名,backref:是反向类名,secondary:多对多时的第三张表名
安装
pip install flask-sqlalchemy
配置连接数据库的参数
将配置写到flask的配置中
SQLALCHEMY_DATABASE_URI 连接数据库。示例:mysql://username:password@host/post/db?charset=utf-8
SQLALCHEMY_BINDS 一个将会绑定多种数据库的字典。 更多详细信息请看官文 绑定多种数据库.
SQLALCHEMY_ECHO 调试设置为true
SQLALCHEMY_POOL_SIZE 数据库池的大小,默认值为5。
SQLALCHEMY_POOL_TIMEOUT 连接超时时间
SQLALCHEMY_POOL_RECYCLE 自动回收连接的秒数。
SQLALCHEMY_MAX_OVERFLOW 控制在连接池达到最大值后可以创建的连接数。当这些额外的 连接回收到连接池后将会被断开和抛弃。
SQLALCHEMY_TRACK_MODIFICATIONS 如果设置成 True (默认情况),Flask-SQLAlchemy 将会追踪对象的修改并且发送信号。这需要额外的内存, 如果不必要的可以禁用它。
使用
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# db中有上文中sqlalchemy所做操作的所有属性,类等等,比如db.Model(Base = declarative_base())
db = SQLAlchemy()
# 绑定flask
db.init_app(app)
# 获取session
session = db.session
# 操做就与上文SQL alchemy的session操作一致