数据库是一个网站的基础。 Flask 可以使用很多种数据库。比
如 MySQL , MongoDB , SQLite , PostgreSQL 等。这里我们以 MySQL 为例进行讲解。而
在 Flask 中,如果想要操作数据库,我们可以使用 ORM 来操作数据库,使用 ORM 操作数据库将 变得非常简单
在Flask操作数据库之前 需要安装这几个操作库
pip install pymysql
pip install SQLAlchemy
SQLAlchemy : SQLAlchemy 是一个数据库的 ORM 框架
from sqlalchemy import create_engine
HOSTNAME = '127.0.0.1'#数据库地址
PORT = '3306'#端口号
DATABASE = 'world'#库名
USERNAME = 'root'#用户名
PASSWORD = '123456'#密码
#把配置信息和以上的连接数据 组合一下 指向DB_URL
DB_URL = f'mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset-utf-8'
#create_engine 创建数据库引擎 create_engine函数需要传递一个DB_url指向的字符串 链接数据库
engine = create_engine(DB_URL)
#判断是否链接成功
conn = engine.connect()
result = conn.execute('select * from user ')
print(result.fetchone())
对象关系映射 是用来描述对象和数据库之间映射的一种方式
更加规范SQL 不需要手写SQL DB工程师
更加直观的数据关系 ---->对象 方便建模
可以屏蔽不同数据库直接的差异
将表映射成class
将表中的数据 映射成Class的实例对象
要使用 ORM 来操作数据库,首先需要创建一个类来与对应的表进行映射。现在以 User表 来做为 例子,它有 自增长的id 、 name 、 fullname 、 password 这些字段,那么对应的类为:
先用一个伪代码 演示一下
class User(object):
"""User类就是一个数据表
下面的 id、name、usernama、password 是类的属性,相当于数据库字段
"""
id=id,
name=name,
username=username,
password=password
# 这个实例化对象 就相当于把User内的参数映射成了一条数据
#SQLAlchemy之后会在 底层执行这条数据库语句
U = User('xxx','xxx')
那么这样 操作数据库 就会像操作类那样 轻松了 上面的是理清思路的伪代码 不作为执行
字段的数据类型 只能是sqlalchemy提供的类型 所以需要导入sqlalchemy下的Column类来指定数据类型
id = Column(Integer, primary_key=True, nullable=False,autoincrement=True)
Column 类的参数:
from sqlalchemy import create_engine,Column,Integer,String
from sqlalchemy.ext.declarative import declarative_base#导入declarative_base用于返回ORM需要的基类
HOSTNAME = '127.0.0.1'#数据库地址
PORT = '3306'#端口号
DATABASE = 'world'#库名
USERNAME = 'root'#用户名
PASSWORD = '123456'#密码
#把配置信息和以上的连接数据 组合一下 指向DB_URL
DB_URL = f'mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset-utf-8'
#create_engine 创建数据库引擎 create_engine函数需要传递一个字符串 链接数据库
engine = create_engine(DB_URL)
"""以上是链接信息"""
"""以下是创建数据表的ORM"""
#将创建好的数据库引擎传入declarative_base方法中,会创建一个对象
base = declarative_base(engine)
#1、创建一个ORM模型 这个ORM模型必须继承自sqlalchemy提供的基类
class Users(base):
"""给表起名字: tablename的属性值是数据表的名字"""
__tablename__ = 'users'
#2、添加类的属性与数据表的字段进行一一映射,这些属性的数据类型 必须是sqlalchemy提供的数据类型
id = Column(Integer, primary_key=True, nullable=False,autoincrement=True)
name = Column(String(50), nullable=False)#String也是一个类 需要传递一个长度参数 这里我设置成50
age = Column(Integer,primary_key=False, nullable=False)
#3、将创建好的ORM模型 映射到数据库中执行 创建数据表
base.metadata.create_all()
创建表的ORM模型就写好了 运行一下
需要注意:使用metadata.create_all() 创建表以后 后续再添加更改表字段 也不会再映射了
Integer :整形。
Float :浮点类型。
Boolean :传递 True/False 进去。
DECIMAL :定点类型。
enum :枚举类型。
Date :传递 datetime.date() 进去。
DateTime :传递 datetime.datetime() 进去。
Time :传递 datetime.time() 进去。
String :字符类型,使用时需要指定长度,区别于 Text 类型。
Text :文本类型。
LONGTEXT :长文本类型
1、需要先构建session对象 所有的数据库的ORM操作都必须通过 叫Session的会话对象来实现 通过以下代码来获取会话对象
from sqlalchemy.orm import sessionmaker#导入sessiond对象 它的作用和declarative_base有些类似
#数据库引擎
engine = create_engine(DB_URL)
#数据库引擎装载这数据库链接信息
#将数据库引擎传入sessionmaker内 返回一个对象
Session = sessionmaker(engine)
session = Session()#将返回的对象需要再次进行实例化
"""上面两行创建Session对象的代码 也可以用下面的一行代码表示"""
session = sessionmaker(engine)()
from sqlalchemy import create_engine,Column,Integer,String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker#导入sessiond对象 它的作用和declarative_base有些类似
HOSTNAME = '127.0.0.1'
PORT = '3306'
DATABASE = 'world'
USERNAME = 'root'
PASSWORD = '123456'
DB_URL = f'mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}?charset-utf-8'
#数据库引擎
engine = create_engine(DB_URL)
base = declarative_base(engine)
#构建session对象
session = sessionmaker(engine)()
class Users(base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, nullable=False,autoincrement=True)
name = Column(String(50), nullable=False)
age = Column(Integer,primary_key=False, nullable=False)
"""--------以下是插入数据的操作-------"""
def add_data():
"""增操作"""
#将要插入的一条数据传到Users的对象中
u = Users(name="zhiliao",age=18)
#调用session的add方法 把数据传进去
session.add(u)
#这个时候 数据还只在内存中 还需要执行commit才能写入数据库
session.commit()
if __name__ == "__main__":
add_data()
'''上面是添加单条数据,再来试一下 一次多条数据'''
def add_data():
"""增操作"""
#将要插入的数据传到Users的对象中
u1 = Users(name="wuya",age=18)
u2 = Users(name="sanmu",age=21)
#u1、u2先放在列表中然后用add_all方法进行添加
session.add_all([u1,u2])
#这个时候 数据还只在内存中 还需要执行commit才能写入数据库
session.commit()
if __name__ == "__main__":
add_data()
调用session的query方法查询Users模型对应的表中查找数据 后面**all()**方法可以把这张表符合条件的数据全部查找出来
def search_data():
"""查操作"""
#调用session对象下的query方法
all_Users = session.query(Users).all()
print(all_Users)
if __name__ == "__main__":
search_data()
>>>[<__main__.Users object at 0x000001A06265C880>, <__main__.Users object at 0x000001A06265C8E0>, <__main__.Users object at 0x000001A06265CA60>, <__main__.Users object at 0x000001A06265CAF0>]
查询指定字段
# 只查age字段
def search_data():
"""查操作"""
#调用session对象下的query方法
all_Users = session.query(Users.age).all()
print(all_Users)
if __name__ == "__main__":
search_data()
聚合函数查询
func.count 统计出id的数量
from sqlalchemy import create_engine,Column,Integer,String,func#导入func对象
def search_data():
"""查操作"""
#调用session对象下的query方法
all_Users = session.query(func.count(Users.id)).all()
print(all_Users)
if __name__ == "__main__":
search_data()
其他聚合函数用法类似
使用filter_by()
传递查找条件 后面跟上all() 查询出所有 满足条件的数据
def search_data():
"""查操作"""
#调用session对象下的query方法 查询出所有name=zhiliao的字段
all_Users = session.query(Users).filter_by(name='sanmu').all()
print(all_Users)
if __name__ == "__main__":
search_data()
>>>[<__main__.Users object at 0x0000018DF75523D0>, <__main__.Users object at 0x0000018DF7552DF0>]
使用filter()方法
传递查询条件
def search_data():
"""查操作"""
all_Users = session.query(Users).filter(Users.name == 'zhiliao').all()
print(all_Users)
if __name__ == "__main__":
search_data()
>>>[<__main__.Users object at 0x0000018DF75523D0>, <__main__.Users object at 0x0000018DF7552DF0>]
使用get方法
就是根据主键 进行查找
def search_data():
"""查操作""" # 这张表的主键是id get传入2 就是查找出id为2的数据
all_Users = session.query(Users).get(2)
print(all_Users)
if __name__ == "__main__":
search_data()
>>><__main__.Users object at 0x00000249D61CF760>
first方法 拿到这张表中的第一条数据
def search_data():
"""查操作"""
#调用session对象下的query方法
all_Users = session.query(Users).first()
print(all_Users)
if __name__ == "__main__":
search_data()
>>> <__main__.Users object at 0x00000251E828B880>
解析字符串
上面查出来的 都是原始对象 现在用str魔术方法 解析出来 如果查询条件只能查出一条数据 可以直接打印 如果是多头需要遍历打印才能看到效果
def __str__(self):
return "<Users(name:%s,age:%s)"%(self.name,self.age)
def search_data():
"""查操作"""
all_Users = session.query(Users).first()
print(all_Users)
if __name__ == "__main__":
search_data()
>>><Users(name:ketang,age:20)
def update_data():
"""改操作"""
#修改数据的前提 必须要先把要修改的数据查找出来
all_Users = session.query(Users).get(2)
#修改查找出来的数据的age字段的值
all_Users.age = 20
session.commit()
if __name__ == "__main__":
update_data()
def delete_data():
"""删操作"""
#删和改一样 也需要查找到要删除的数据
all_Users = session.query(Users).first()
#调用session的删除方法 把查出来的数据传到delete中
session.delete(all_Users)
session.commit()
if __name__ == "__main__":
delete_data()
过滤是数据提取的一个很重要的功能,以下对一些常用的过滤条件进行解释,并且这些过滤条件都 是只能通过 filter 方法实现的:
#查询id等于1的全部数据
session.query(student).filter(student.id ==1).all()
#查询city是北京的全部数据
session.query(student).filter(student.city =='北京').all()
#查询city不是北京的全部数据
session.query(student).filter(student.city !='北京').all()
#查询出所有name以张为开头的数据
session.query(student).filter(student.name.like('张%')).all()
#中间的_匹配任意的一个字符
session.query(student).filter(student.name.like('张_友')).all()
in后面为何加 _ 因为要和python 的in 区分开
# 查询出所有city字段是北京上海的数据
session.query(student).filter(student.city.in_(['北京','上海'])).all()
#查询出所有city字段中除了北京上海以外的所有数据
session.query(student).filter(student.city.notin_(['北京','上海']))
session.query(student).filter(student.description==None)
session.query(student).filter(student.description!=None)
from sqlalchemy import and_
session.query(student).filter(and_(student.sex=='女',student.city=='北京'))
from sqlalchemy import or_
session.query(student).filter(or_(student.city=='北京',student.city== '重庆'))
表关系:
表之间的关系存在三种:一对一、一对多、多对多。而 SQLAlchemy 中的 ORM 也可以模拟这三种 关系。因为一对一其实在 SQLAlchemy 中底层是通过一对多的方式模拟的
在Mysql中,外键可以让表之间的关系更加紧密。而SQLAlchemy同样也支持外键。通过 ForeignKey类来实现,并且可以指定表的外键约束
外键约束有以下几项:
主表 :user表是存储用户信息的 从表Article表是存储user中用户发表的文章的
在从表中 设置uid字段为外键 引用user 表的id
from sqlalchemy import create_engine, Column, Integer, String,and_,or_,ForeignKey,Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker,relationship#导入
"""省略链接代码"""
class User(Base):
"""主表"""
__tablename__ = 'user'
id = Column(Integer,primary_key=True,autoincrement=True)
username = Column(String(50),nullable=False)
def __repr__(self):
return "<User(username:%s)>" % self.username
class Article(Base):
"""从表"""
__tablename__ = 'article'
id = Column(Integer,primary_key=True ,autoincrement=True)
title = Column(String(50),nullable=False)
content = Column(Text,nullable=False)
#定义外键 uid引用user表中的id 从而查询出user表中的数据
#ForeignKey参数: 引用的是user表的id 所以参数是user.id sid这样两张表就关联了
uid = Column(Integer,ForeignKey("user.id"))
def __repr__(self):
return "<Article(title:%s,content=%s)>" % (self.title,self.content)
#将创建好的ORM模型 映射到数据库中执行
base.metadata.create.all()
查询出 第一个文章所属的用户信息
from sqlalchemy import create_engine, Column, Integer, String,and_,or_,ForeignKey,Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker,relationship
class User(base):
"""主表"""
__tablename__ = 'user'
id = Column(Integer,primary_key=True,autoincrement=True)
username = Column(String(50),nullable=False)
def __repr__(self):
return "<User(username:%s)>" % self.username
class Article(base):
"""从表"""
__tablename__ = 'article'
id = Column(Integer,primary_key=True ,autoincrement=True)
title = Column(String(50),nullable=False)
content = Column(Text,nullable=False)
#定义外键 uid引用user表中的id 从而查询出user表中的数据
#ForeignKey参数: 引用的是user表的id 所以参数是user.id sid这样两张表就关联了
uid = Column(Integer,ForeignKey("user.id"))
#指定连接哪一个orm模型 这个要和user表链接 所以指定模型是User
author = relationship("User")
def __repr__(self):
return "<Article(title:%s,content=%s)>" % (self.title,self.content)
def search_data():
"""先查询出Article表中第一个文章数据
然后再用查询出的文章数据的uid关联到user表中相应的用户id的数据
"""
article = session.query(Article).first()
print(article.author)
print(article.author.username)
if __name__ == '__main__':
search_data()
>>><User(id:1,username:sanmu)>
>>>sanmu
用户表查询文章表,由于一个用户可以发表多篇文章,所以这种查询是一对多的关系。适用于查询出某个用户的全部文章的场景
先在User模型中定义属性关联Article模型
class User(base):
__tablename__ = 'user'
id = Column(Integer,primary_key=True,autoincrement=True)
username = Column(String(50),nullable=False)
#在user模型中也定义和Article模型关联的属性
articles = relationship('Article')
def __repr__(self):
return "<User(id:%s,username:%s)>" % (self.id,self.username)
定义好后其他就不用动了 直接在查询方法中打印即可
class User(base):
__tablename__ = 'user'
id = Column(Integer,primary_key=True,autoincrement=True)
username = Column(String(50),nullable=False)
#在user模型中也定义和Article模型关联的属性
articles = relationship('Article')
def __repr__(self):
return "<User(id:%s,username:%s)>" % (self.id,self.username)
class Article(base):
__tablename__ = 'article'
id = Column(Integer,primary_key=True ,autoincrement=True)
title = Column(String(50),nullable=False)
content = Column(Text,nullable=False)
uid = Column(Integer,ForeignKey("user.id"))
author = relationship("User")
def __repr__(self):
return "<Article(title:%s,content:%s)>" % (self.title,self.content)
def search_data():
"""先查询出user表中第一个用户
然后再用查询出的用户id 关联到article表中 查出相应的文章
"""
user = session.query(User).first()
print(user.articles)
if __name__ == '__main__':
search_data()
#查询出两篇文章
>>>[<Article(title:标题,content:随便写写)>, <Article(title:日志,content:写点东西测试一下 )>]
上面实现外键查询必须在每个模型定义relationship 属性 这样还是比较麻烦 那么可以在relationship 方法内加一个关键字参数backref
反向指向article模型 这样就只需要在一个模型中定义就可以了
class Article(base):
__tablename__ = 'article'
id = Column(Integer,primary_key=True ,autoincrement=True)
title = Column(String(50),nullable=False)
content = Column(Text,nullable=False)
uid = Column(Integer,ForeignKey("user.id"))
#backref参数是:反向引用的意思 用Article模型引用User是正向引用,那么反向引用就是User引用Article
author = relationship("User",backref ='articles' )
def __repr__(self):
return "<Article(title:%s,content:%s)>" % (self.title,self.content)
这个时候可以把User模型的relationship定义的属性注释掉了 然后打印数据看一下
class User(base):
__tablename__ = 'user'
id = Column(Integer,primary_key=True,autoincrement=True)
username = Column(String(50),nullable=False)
#在user模型中也定义和Article模型关联的属性
# articles = relationship('Article') 注释掉
def __repr__(self):
return "<User(id:%s,username:%s)>" % (self.id,self.username)
class Article(base):
__tablename__ = 'article'
id = Column(Integer,primary_key=True ,autoincrement=True)
title = Column(String(50),nullable=False)
content = Column(Text,nullable=False)
uid = Column(Integer,ForeignKey("user.id"))
#backref参数是:反向引用的意思 用Article模型引用User是正向引用,那么反向引用就是User引用Article
author = relationship("User",backref ='articles' )
def __repr__(self):
return "<Article(title:%s,content:%s)>" % (self.title,self.content)
def search_data():
user = session.query(User).first()
for i in user.articles:
print(i.content)
if __name__ == '__main__':
search_data()
>>>随便写写
>>>写点东西测试一下
假设一个使用场景 一个user表的字段过多 ,所以将一部分的user表的字段 放在了另外一张新表中 那个这两张表 存储的都是同一个用户的信息 这就是一对一的场景了
class User(Base):
__tablename__ = 'user'
id = Column(Integer,primary_key=True,autoincrement=True)
username = Column(String(50),nullable=False)
class UserExtend(Base):
"""user扩展表"""
__tablename__ = 'user_extend'
id = Column(Integer, primary_key=True, autoincrement=True)
school = Column(String(50))
#ForeignKey定义外键引用User的id
uid = Column(Integer,ForeignKey("user.id"))
#关联User模型 backref反向关联
user = relationship('User',backref="User_extend")
这个时候 并没有达到一对一的需要 因为 user表 还是可以添加多个UserExtend 的 数据
如果想要将两个模型映射成一对一的关系,需要再主表中(user表)表示对应子表(UserExtend表)只能有一条数据
class User(base):
__tablename__ = 'user'
id = Column(Integer,primary_key=True,autoincrement=True)
username = Column(String(50),nullable=False)
# uselist默认是True 代表user下的extnd是一个列表,可以添加多个UserExtend数据
# 设置成False后 extnd 就不是列表 而是一个属性了
extnd = relationship("UserExtend", uselist=False)
#在user模型中也定义和Article模型关联的属性
# articles = relationship('Article')
def __repr__(self):
return "<User(id:%s,username:%s)>" % (self.id,self.username)
class UserExtend(base):
"""user的扩展表"""
__tablename__ = 'User_extend'
id = Column(Integer, primary_key=True, autoincrement=True)
school = Column(String(50))
#ForeignKey定义外键引用User的id
uid = Column(Integer,ForeignKey('user.id'))
#关联User模型
user = relationship('User')
def add_data():
"""增加数据操作"""
user = User(username="xiaoxiao")
extend1 = UserExtend(school = "ketang")
user.extnd = extend1
session.add(user)
session.commit()
if __name__ == '__main__':
add_data()
一个文章对应着文章的分类字段表, 但一篇文章可以多个分类,同时一个分类的字段也可以给很多篇文章使用。那么文章和文章分类之间就是多对多的关系
class Article(base):
"""文章表"""
__tablename__ = 'article'
id = Column(Integer,primary_key=True,autoincrement=True)
title = Column(String(50),nullable=False)
content = Column(String(266))
def __repr__(self):
return "<Article(title:%s)>" % self.title
class Tag(base):
"""文章分类"""
__tablename__ = 'tag'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False)
def __repr__(self):
return "<Tag(name:%s)>" % self.name
文章和文章分类的表 就写完了 但要实现多对多 还需要一张中间表 来存储两张之间的关系
from sqlalchemy import create_engine, Column, Integer, String,and_,or_,ForeignKey,Text,Table#导入Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker,relationship
article_tag = Table(
"article_tag",
#Article添加对应文章表的id字段
#第一个参数的id字段名 ,
# 第二个参数设置数据类型,
# 第三个设置article_id为外键
#第四个 设置成主键
base.metadata,
# 使用Table类创建中间表
# 第一个参数是表名
# 第二个参数是创建表
# 第三个Column添加字段,因为是用Table建表的,所以不能指定属性为字段名 可以在第一个参数内指定列 为字段名
Column("article_id",Integer,ForeignKey("article.id"),primary_key=True),
Column("tag_id",Integer,ForeignKey("tag.id"),primary_key=True)
)
只要ORM使用Table创建一张中间表 在文章表定义secondary属性 这样在 需要关联的两张表可以 不需要定义外键 ORM会自动处理
article_tag = Table(
"article_tag",
#Article添加对应文章表的id字段
#第一个参数的id字段名 ,
# 第二个参数设置数据类型,
# 第三个设置article_id为外键
#第四个 设置成主键
base.metadata,
# 使用Table类创建中间表
# 第一个参数是表名
# 第二个参数是创建表
# 第三个Column添加字段,因为是用Table建表的,所以不能指定属性为字段名 可以在第一个参数内指定列 为字段名
Column("article_id",Integer,ForeignKey("article.id"),primary_key=True),
Column("tag_id",Integer,ForeignKey("tag.id"),primary_key=True)
)
class Article(base):
"""文章表"""
__tablename__ = 'article'
id = Column(Integer,primary_key=True,autoincrement=True)
title = Column(String(50),nullable=False)
content = Column(String(266))
#设置与tag表的引用与反向引用 secondary指向中间表 可以将三张表绑定在一起
tags = relationship("Tag",backref="articles",secondary=article_tag)
def __repr__(self):
return "<Article(title:%s)>" % self.title
class Tag(base):
"""文章分类"""
__tablename__ = 'tag'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False)
def __repr__(self):
return "<Tag(name:%s)>" % self.name
base.metadata.create_all()
def add_data():
article1 = Article(title = "pythonWeb开发",content="这里是文章内容")
article2 = Article(title = "pythonApi开发",content="这里是文章内容")
tag1 = Tag(name = "web开发")
tag2 = Tag(name = "API开发")
article1.tags.append(tag1)
article1.tags.append(tag2)
article2.tags.append(tag1)
article2.tags.append(tag2)
session.add(article1)
session.add(article2)
session.commit()
一篇文章的所有分类标签
article = session.query(Article).filter_by(id=5).first()
print(article.tags)
if __name__ == '__main__':
search_data()
[<Tag(name:web开发)>, <Tag(name:API开发)>]
查询一个分类下 有多少偏文章
def search_data():
"""查操作"""
tag = session.query(Tag).filter_by(id=1).first()
print(tag.articles)
if __name__ == '__main__':
search_data()
[<Article(title:pythonWeb开发)>, <Article(title:pythonApi开发)>]