在上一篇文章我们也看到了,增删改查数据库表都需要我们创建对应的类才可以,好麻烦啊,有没有办法直接从获取到数据库表的类呢,在实际工作中我们是这么做的
(venv) ZHR:fastapitest zc$ pip3 install databases
Collecting databases
Downloading databases-0.5.3-py3-none-any.whl (20 kB)
Requirement already satisfied: sqlalchemy<1.5,>=1.4 in ./venv/lib/python3.8/site-packages (from databases) (1.4.28)
Requirement already satisfied: greenlet!=0.4.17 in ./venv/lib/python3.8/site-packages (from sqlalchemy<1.5,>=1.4->databases) (1.1.2)
Installing collected packages: databases
Successfully installed databases-0.5.3
(venv) ZHR:pythonProject zc$ pip3 install aiomysql
"""
@File : UserTwo.py
@Modify Time @Author @Version
------------ ------- --------
@Desciption:
"""
import time
import datetime
import databases
import sqlalchemy
import asyncio
class UserManager:
def __init__(self, userid, env):
url = "mysql+pymysql://{user}:{passwd}@{host}:{port}/{db}".format(**env)
# 通过database链接数据库
self.database = databases.Database(url)
# 通过sqlalchemy获取数据库的元数据,元数据,说人话就是他能将你链接的数据库的所有的表的信息,都抽象出来
self.metadata = sqlalchemy.MetaData()
# 通过create_engine初始化数据库链接
# pool_size: 是连接池的大小,默认为5个,0表示连接数无限制
# pool_recycle: MySQL 默认情况下如果一个连接8小时内容没有任何动作(查询请求)就会自动断开链接,出现 MySQL has gone away的错误。设置了 pool_recycle 后 SQLAlchemy 就会在指定时间内回收连接。如果设置为3600 就表示 1小时后该连接会被自动回收。
# pool_timeout: 定义超时时间
# max_overflow:超过连接池大小外最多创建的连接
self.engine = sqlalchemy.create_engine(url, encoding='utf-8', max_overflow=0, pool_size=5,
pool_timeout=30, pool_recycle=3600, )
# 来创建所有 Base 派生类所对应的数据表,如果存在则忽略
self.metadata.create_all(self.engine)
self.userid = userid
def _user_cls(self):
"""
初始化类user表的抽象信息,具体返回了什么看get_user_name里的讲解
:return:
"""
return sqlalchemy.Table(
"m_user",
self.metadata,
autoload=True,
autoload_with=self.engine,
)
async def get_user_name(self, email: str) -> int or bool:
"""
查询用户名称
:param email: 这个只是为了演示多个查询条件的
:return: 如果有则返回用户名,没有的话就返回False
其中,这里使用了async和await来使用协程
"""
print("_user_cls")
print(self._user_cls())
# _user_cls()返回的是一个类型为sqlalchemy.sql.schema.Table的类,名字就是我们的表名m_user
print(type(self._user_cls()))
print("_user_cls().c")
print(self._user_cls().c)
# _user_cls().c 返回的是对应表的字段的Collection
print(type(self._user_cls().c))
async with self.database as cur:
# \的意义只是用来换行的,让代码看起来更整洁易懂,query定义查询条件
query = sqlalchemy.select([self._user_cls().c.username]). \
where(sqlalchemy.and_(
# 查询条件
self._user_cls().c.id == self.userid,
self._user_cls().c.email == email
))
# fetch_one,通过游标获取唯一一条数据,由于fetch_one也是async异步的(点进去看一下源码就可以了),所以我们可以在这里使用await
res = await cur.fetch_one(query)
print("res")
print(res)
# 返回值是sqlalchemy.engine.row.Row的,我们可以想想成元组就行了
print(type(res))
if res:
return res[0]
else:
return False
async def add_user(self, user_id_add: int, username: str, avatar: str, email: str, password: str, status: int):
"""
新增功能
:param user_id_add: 这里没有用self.userid是因为那个值是用来查询修改和删除的,咱们现在是新增,每次都不一样
:param username: 用户名
:param avatar: 图标
:param email: 邮箱
:param password: 密码
:param status: 状态
:return:返回插入数据库结果,1则为插入成功
"""
# 获取当前时间,这个具体怎么定义取决你的数据库的时间字段是怎么设计的,下面两种方式是一样的
created = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
last = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(created)
print(last)
async with self.database as cur:
query = self._user_cls().insert(). \
values({self._user_cls().c.id: user_id_add,
self._user_cls().c.username: username,
self._user_cls().c.email: email,
self._user_cls().c.avatar: avatar,
self._user_cls().c.password: password,
self._user_cls().c.status: status,
self._user_cls().c.created: created,
self._user_cls().c.last_login: last})
result = await cur.execute(query)
print(result)
return result
async def update_username(self, username_update: str, status_update: int = 1) -> bool:
"""
更新对应记录的用户名
:param username_update: 要修改成的用户名
:param status_update: 只修改用户状态为1的记录
:return:返回1即为更新成功,返回0则为更新失败,比如有根据where条件无查询结果,就没办法更新,这样就属于失败的情况
"""
async with self.database as cur:
last = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
query = self._user_cls().update(). \
where(sqlalchemy.and_(self._user_cls().c.id == self.userid,
self._user_cls().c.status == status_update)). \
values({self._user_cls().c.username: username_update,
self._user_cls().c.last_login: last})
res = await cur.execute(query)
print('res更新')
print(res)
return res
async def deleta_user(self):
"""
删除用户
:return: 删除成功则返回1,删除失败则返回0
"""
async with self.database as cur:
query = self._user_cls().delete(). \
where(sqlalchemy.and_(self._user_cls().c.id == self.userid))
res = await cur.execute(query)
print('res删除')
print(res)
return res
if __name__ == "__main__":
env = {"user": "xxx", "passwd": "xxx",
"host": "rm-xxx.mysql.rds.aliyuncs.com",
"port": 3306, "db": "xxx"}
userid = 24
user = UserManager(userid, env)
email = "zhangxi@qq.com"
user_name = asyncio.run(user.get_user_name(email))
print(user_name)
# 由于使用了async和await,我们需要使用asyncio.run来调用方法才可以,当然,这里仅限于我们自己调试
asyncio.run(user.add_user(24, "huanghe", "hahaha", "zhangsan@qq.com", "password", 1))
# asyncio.run(user.update_username("update_name22"))
asyncio.run(user.deleta_user())
上面我们已经实现了增删改查的基础功能,那我们可以基于增删改查来实现各种各样的功能,我们可以这样设计框架
项目包
db_module # 这个包下的是数据库层
user_module # 每一张我们需要用到的表,都新建一个python包,取名为表名_module
__init__.py # 将user.py import进来,然后根据增删改查来实现各种功能这才是我们暴露出去的功能
user.py # 实现每一张表的增删改查功能
__init__.py # 将每个表名_module都引入进来,统一向外提供服务
user.module.init.py
"""
@File : __init__.py.py
@Modify Time @Author @Version
------------ ------- --------
@Desciption:这里才是我们暴露出去的方法,当然我们这里还存在可以优化的点,比如获取环境,从配置中读取,现在只做演示
"""
from db_module.user_module.UserTWo import *
async def set_user_name(user_id: int, user_name: str, user_email: str):
env = {"user": "xxx", "passwd": "xxx",
"host": "rm-xxx.mysql.rds.aliyuncs.com",
"port": 3306, "db": "xxx"}
user_cls = UserManager(user_id, env)
# 首先调用查询方法,看看是不是存在这条记录
result = await user_cls.get_user_name(user_email)
if result:
# 如果存在,则更新
await user_cls.update_username(user_name)
else:
# 如果不存在,则返回False,这里就看你具体的业务怎么写了
return False
# return res if True else False
if __name__ == "__main__":
email_init = "zhangxi@qq.com"
user_name_init = "init更新"
asyncio.run(set_user_name(1, user_name_init, email_init))
db_module.init.py
"""
@File : __init__.py.py
@Modify Time @Author @Version
------------ ------- --------
@Desciption:
"""
from db_module.user_module import *