SQLAlchemy 使用 query() 方法查询数据
接着前面一篇创建的模型
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Column, Integer, String
# 拼接配置dialect + driver://username:passwor@host:port/database
DB_URI = 'mysql+pymysql://root:123456@localhost:3306/web'
Base = declarative_base()
class Students(Base):
__tablename__ = 'students' # 数据库表名
id = Column(Integer, primary_key=True)
name = Column(String(20))
fullname = Column(String(30))
nickname = Column(String(30))
def __repr__(self):
return "<Students(name='%s', fullname='%s', nickname='%s')>" % (
self.name, self.fullname, self.nickname)
创建session实例
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from xx.xx import Students
engine = create_engine('mysql+pymysql://root:123456@localhost:3306/web')
# 把当前的引擎绑定给这个会话
Session = sessionmaker(bind=engine)
# 实例化
session = Session()
query() 查询会转换成对应的SQL 语句
# query()
r1 = session.query(Students)
print(r1)
等价于
SELECT students.id AS students_id, students.name AS students_name, students.fullname AS students_fullname, students.nickname AS students_nickname
FROM students
query()可以只查询某个字段,多个字段逗号隔开
r2 = session.query(Students.name)
print(r2)
等价于
SELECT students.name AS students_name
FROM students
all()查询全部数据
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from xx.xx import Students
engine = create_engine('mysql+pymysql://root:123456@localhost:3306/web')
# 把当前的引擎绑定给这个会话
Session = sessionmaker(bind=engine)
# 实例化
session = Session()
all = session.query(Students).all()
print(all)
for item in all:
print(item.name, item.fullname)
运行结果
[<Students(name='yoyo', fullname='yoyoketang', nickname='yy')>,
<Students(name='yoyo1', fullname='yoyoketang1', nickname='yy1')>,
<Students(name='yoyo2', fullname='yoyoketang2', nickname='yy2')>]
yoyo yoyoketang
yoyo1 yoyoketang1
yoyo2 yoyoketang2
all 查询的结果是一个list of Students对象
first = session.query(Students).first()
print(first)
运行结果
<Students(name='yoyo', fullname='yoyoketang', nickname='yy')>
filter()方法加筛选条件
# filter() 过滤
r1 = session.query(Students).filter(Students.name == 'yoyo')
print(r1)
等价于以下sql
SELECT students.id AS students_id, students.name AS students_name, students.fullname AS students_fullname, students.nickname AS students_nickname
FROM students
WHERE students.name = %(name_1)s
查询结果需加 all() 方法得到全部结果,或者 first() 方法得到第一个结果
# filter() 过滤
r1 = session.query(Students).filter(Students.name == 'yoyo').all()
print(r1) # [<Students(name='yoyo', fullname='yoyoketang', nickname='yy')>]
以上查询类似于 SELECT * FROM ...
,如果查询结果只需要某个字段,可以在query()方法指定字段名称
# filter() 过滤
r1 = session.query(Students.name).filter(Students.name == 'yoyo').all()
print(r1) # [('yoyo',)]
r2 = session.query(Students.name, Students.fullname).filter(Students.name == 'yoyo').all()
print(r2) # [('yoyo', 'yoyoketang')]
字符串可以判断相等,数字类型的可以判断 >=
和 <=
r3 = session.query(Students.name).filter(Students.id >= 1).all()
print(r3) # [('yoyo',), ('yoyo1',), ('yoyo2',)]
在filter()中用,分隔多个条件表示and
r = session.query(Students.name).filter(Students.id >= 1, Students.name == 'yoyo').all()
print(r) # [('yoyo',)]
or_
方法实现 or 条件查询
from sqlalchemy import or_
r = session.query(Students.name).filter(or_(Students.id >= 1, Students.name == 'yoyo')).all()
print(r) # [('yoyo',), ('yoyo1',), ('yoyo2',)]
根据id倒叙
r4 = session.query(Students).order_by(Students.id.desc()).all() # desc()表示倒序
print(r4)
运行结果
[<Students(name='yoyo2', fullname='yoyoketang2', nickname='yy2')>,
<Students(name='yoyo1', fullname='yoyoketang1', nickname='yy1')>,
<Students(name='yoyo', fullname='yoyoketang', nickname='yy')>]
判断等于和不等于可以直接用 ==
和 !=
like()
模糊匹配
r = session.query(Students.name).filter(Students.name.like('%yoyo%')).all()
print(r) # [('yoyo',), ('yoyo1',), ('yoyo2',)]
in_()
包含
r = session.query(Students.name).filter(Students.name.in_(['yoyo', 'yoyo1'])).all()
print(r) # [('yoyo',), ('yoyo1',)]
.count()方法统计查询个数
r = session.query(Students).count()
print(r) # 3
r1 = session.query(Students).filter(Students.name == 'yoyo').count()
print(r1) # 1
all()方法返回的是一个list,那就可以通过切片取一部分数据
# 返回前2条数据
r = session.query(Students.name).all()[:2]
print(r) # [('yoyo',), ('yoyo1',)]
根据查询结果,调用delete()方法删除对应数据,需要执行session.commit()提交事务。
session.query(Students).filter(Students.name == 'yoyo').delete()
session.commit()
update()方法,需要执行session.commit()提交事务
# 修改fullname
session.query(Students).filter(Students.name == 'yoyo1').update({'fullname': 'zhangsan'})
session.commit()
# 修改后查询
r = session.query(Students).filter(Students.name == 'yoyo1').all()
print(r) # [<Students(name='yoyo1', fullname='zhangsan', nickname='yy1')>]
从运行结果可以看到已经更新成功。