SQL alchemy使用

楚志强
2023-12-01

目录

SQLAlchemy介绍

创建操作数据表

scoped_session线程安全

基本增删查改

新增数据

删除数据

修改数据

查询数据

一对多关系

多对多关系

flask-sqlalchemy与flask-migrate


SQLAlchemy介绍

SQLAlchemy是一个基于Python实现的ORM框架。独立的orm框架,可以轻松的集成到任意项目中去。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

组成部分:

  • Engine,框架的引擎
  • Connection Pooling ,数据库连接池
  • Dialect,选择连接数据库的DB API种类
  • Schema/Types,架构和类型
  • SQL Exprression Language,SQL表达式语言

安装:

pip install sqlalchemy

SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件。

mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]   

原生快速使用

import threading
from sqlalchemy import create_engine

engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/test",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)


def task():
    # 从连接池中拿一个链接
    conn = engine.raw_connection()
    cursor = conn.cursor()
    cursor.execute(
        "select * from table_name"
    )
    result = cursor.fetchall()
    print(result)
    cursor.close()
    conn.close()


for i in range(20):
    t = threading.Thread(target=task)
    t.start()

创建操作数据表

SQLAlchemy只能创建表、删除表、不能修改表。

models.py

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, DateTime, UniqueConstraint, Index

Base = declarative_base()

class Users(Base):
    id = Column(Integer, primary_key=True)  # id 主键
    name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空
    email = Column(String(32), unique=True)
    create_time = Column(DateTime, default=datetime.datetime.now)
    extra = Column(Text, nullable=True)

    __tablename__ = 'users'  # 数据库表名称
    __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'),  # 联合唯一
        Index('ix_id_name', 'name', 'email'),  # 索引
    )

# 创建表,同步到数据库
def init_db():
    """
    根据类创建数据库表
    """
    engine = create_engine(
        "mysql+pymysql://root:332525@127.0.0.1:3306/test",  # 数据库需要自行创建,无法自动创建
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    # 创建被Base管理的所有表
    Base.metadata.create_all(engine)

def drop_db():
    """
    根据类删除数据库表
    """
    engine = create_engine(
        "mysql+pymysql://root:332525@127.0.0.1:3306/test",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    # 删除被Base管理的所有表
    Base.metadata.drop_all(engine)

if __name__ == '__main__':
    init_db()  # 创建表
    # drop_db()  # 删除表

操作数据

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users

engine = create_engine("mysql+pymysql://root:332525@127.0.0.1:3306/test", max_overflow=0, pool_size=5)

Connection = sessionmaker(bind=engine)

# 每次执行数据库操作时,都需要创建一个Connection
conn = Connection()

# orm操作
obj = Users(name="tom")
conn.add(obj)
# 提交事务
conn.commit()
# 关闭session,其实是将连接放回连接池
conn.close()

scoped_session线程安全

在上述操作数据的方式中,如果多个线程同时操作,可能会出现数据错乱的问题。所以要用到scoped_session来保证数据安全。

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Users
from threading import Thread

engine = create_engine(
    "mysql+pymysql://root:332525@127.0.0.1:3306/test",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

Session = sessionmaker(bind=engine)
# 保证数据安全
session = scoped_session(Session)

def task(i):
    # 数据操作
    user = Users(name='user%s' % i)
    session.add(user)
    # 提交
    session.commit()
    # 关闭连接
    session.close()

for i in range(10):
    t = Thread(target=task, args=[i, ])
    t.start()

基本增删查改

创建连接:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Users

engine = create_engine(
    "mysql+pymysql://root:332525@127.0.0.1:3306/test",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

Session = sessionmaker(bind=engine)
# 保证数据安全
session = scoped_session(Session)

执行原生SQL

# 纯自定义sql
res = session.query(Users).from_statement(text("SELECT * FROM users where id>:value")).params(value=5).all()

新增数据

session.add(Users('tom'))  # 新增一条数据
session.add([Users('tom'), Users('jim')])  # 新增多条数据

删除数据

配合过滤

session.query(Users).filter(Users.id > 5).delete()  # 删除Users表中id大于5的

修改数据

配合过滤

# id为1,name字段值改为abc
session.query(Users).filter(Users.id == 1).update({"name": "abc"})
# id为1,name字段值改为拼接上'123'
session.query(Users).filter(Users.id == 1).update({Users.name: Users.name + "123"})

查询数据

  • all():获取所有结果
  • first():获取第一个结果
# 查所有,返回结果为列表,里面有一个个Users对象
res = session.query(Users).all()

# 指定查询的字段:select name as xx,age from users;
res = session.query(Users.id, Users.name.label('xx')).all()

# 通过filter过滤,filter()中写条件 > < ==
res = session.query(Users).filter(Users.id > 5).all()

# filter_by 过滤,name字段值为'user2'并且id为2的数据
res = session.query(Users).filter_by(name='user2', id=2).first()

# filter和filter_by的其他使用
res = session.query(Users).filter(Users.id > 1, Users.name == 'user2').all()  # 条件and关系
res = session.query(Users).filter(Users.id.between(7, 9)).all()  # 在..之间
res = session.query(Users).filter(Users.id.in_([7, 8, 9])).all()  # 在..之中
res = session.query(Users).filter(~Users.id.between(2, 9)).all()  # 取反
res = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='user2'))).all()  # 二次筛选

# 自定制where部分查询sql,:value相当于占位符
from sqlalchemy.sql import text
res = session.query(Users).filter(text("id>:value or name=:name")).params(value=2, name='user2').all()

# 与and_、或or_、非~
from sqlalchemy import and_, or_
res = session.query(Users).filter(or_(Users.id > 5, Users.name == 'user2')).all()
res = session.query(Users).filter(
    or_(
        Users.id > 5,
        and_(Users.name == 'user2', Users.id > 3)
    )
)

# 模糊匹配,%代表多个字符
res = session.query(Users).filter(Users.name.like('user%')).all()

# 限制,用于分页,切割
res = session.query(Users)[0:2]

# 排序,desc()降序,asc()升序
res = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分组
from sqlalchemy.sql import func
# 分组之后取最大id,id之和,最小id
res = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)
).group_by(Users.name).all()

# haviing筛选
res = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)
).group_by(Users.name).having(func.min(Users.id) > 2).all()

# 连表(默认用forinkey关联)
res = session.query(Users, Favor).filter(Users.id == Favor.id).all()
# join表,默认是inner join
res = session.query(Users).join(Favor).all()
# isouter=True 外连,表示Favor left join Favor,两表反过来既是右连接
res = session.query(Users).join(Favor, isouter=True).all()

一对多关系

models.py

from sqlalchemy.orm import relationship
from sqlalchemy import Column, Integer, String, ForeignKey

class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='篮球')

class Person(Base):
    __tablename__ = 'person'
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    # hobby指的是表名而不是类名
    hobby_id = Column(Integer, ForeignKey("hobby.id"))

    # 跟数据库无关,不会新增字段,只用于快速链表操作
    # 基于对象的跨表查询:
    # 类名,backref用于反向查询
    hobby = relationship('Hobby', backref='pers')

新增数据

# 方式一:
session.add(Hobby(caption='dance'))
session.add(Person(name='tom', hobby_id=1))

# # 方式二:
session.add(Person(name='tony', hobby=Hobby(caption='sing')))

一对多查询

"""基于对象的跨表查询"""
# 正向查询
res = session.query(Person).filter(Person.name == 'tom').first()
print(res.hobby.caption)
# 反向查询
res = session.query(Hobby).filter_by(caption='dance').first()
print(res.pers)

"""基于连表的跨表查询"""
res = session.query(Person, Hobby).filter(Person.hobby_id == Hobby.id, Person.name == 'tom').all()
res = session.query(Person).join(Hobby).filter(Person.name == 'tom').all()

多对多关系

class Person2Hobby(Base):
    __tablename__ = 'person2bobby'
    id = Column(Integer, primary_key=True, autoincrement=True)
    hobby_id = Column(Integer, ForeignKey('hobby.id'))
    person_id = Column(Integer, ForeignKey('person.id'))

class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='篮球')


class Person(Base):
    __tablename__ = 'person'
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)

    # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
    # 方便快速查询,写了这个字段,相当于django 的manytomany,快速使用基于对象的跨表查询
    hobbies = relationship('Hobby', secondary='person2bobby', backref='persons')

新增数据

# 方式一:所有表都有,一个个增加
session.add_all([
    Person(name='tom'), Person(name='tony'),
    Hobby(caption='sing'), Hobby(caption='dance')
])
session.add_all([
    Person2Hobby(person_id=1, hobby_id=1),
    Person2Hobby(person_id=1, hobby_id=2),
    Person2Hobby(person_id=2, hobby_id=1),
])

# 方式二:
session.add(Person(name='tom', hobbies=[Hobby(caption='sing'), Hobby(caption='dance')]))

多对多查询

"""基于对象的跨表查询"""
# 正向查询
res = session.query(Person).filter_by(name='tom').first()
print(res.hobbies)
# 反向查询
res = session.query(Hobby).filter_by(caption='sing').first()
print(res.persons)

flask-sqlalchemy与flask-migrate

flask-sqlalchemy:帮助我们快速把sqlalchemy集成到flask中。

flask-migrate:把表同步到数据库中

pip install flask-migrate==2.7.0

settings.py:

# SESSION_TYPE = 'redis'  # session类型为redis
# SESSION_KEY_PREFIX = 'session:'  # 保存到session中的值的前缀
# SESSION_PERMANENT = True  # 如果设置为False,则关闭浏览器session就失效。
# SESSION_USE_SIGNER = False  # 是否对发送到浏览器上 session:cookie值进行加密

SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:332525@127.0.0.1:3306/test"
SQLALCHEMY_POOL_SIZE = 5
SQLALCHEMY_POOL_TIMEOUT = 30
SQLALCHEMY_POOL_RECYCLE = -1
# 追踪对象的修改并且发送信号
SQLALCHEMY_TRACK_MODIFICATIONS = False

app.py:

from flask_sqlalchemy import SQLAlchemy
from flask import Flask
from models import Users

app = Flask(__name__)
# 导入配置文件
app.config.from_pyfile('settings.py')
# 实例化得到对象
db = SQLAlchemy()
# 注册
db.init_app(app)
# 创建表模型,继承db.Model
class Users(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(80), unique=True, nullable=False)

@app.route('/')
def index():
    # 使用session
    db.session.add(Users(name='tom'))
    db.session.commit()
    return 'index'

manage.py

from flask_script import Manager
from flask_migrate import MigrateCommand, Migrate

from app import app, db

# 第一步:初始化出flask_script的manage
manager = Manager(app)
# 第二步:使用flask_migrate的Migrate  包裹一下app和db(sqlalchemy对象)
Migrate(app, db)

# 第三步:把命令增加到flask-script中去
manager.add_command('db', MigrateCommand)
if __name__ == '__main__':
    manager.run()

打开终端输入迁移命令:

1.初始化,项目使用的时候,只敲一次,生成migrations文件夹

python manage.py db init

2.记录变化,增加表,删除表,增加字段,删除字段都需要记录

python manage.py db migrate

3.同步到数据库中

python manage.py db upgrade

4.运行flask

python manage.py runserver
 类似资料: