ORM之SQLAlchemy+database实现增删改查+工程化设计数据库层

东郭阳德
2023-12-01

ORM之SQLAlchemy+database实现增删改查+工程化设计数据库层

python的ORM之SQLAlchemy简单的增删改查

在上一篇文章我们也看到了,增删改查数据库表都需要我们创建对应的类才可以,好麻烦啊,有没有办法直接从获取到数据库表的类呢,在实际工作中我们是这么做的

安装databases和aiomysql

(venv) ZHR:fastapitest zc$ pip3 install databases
Collecting databases
  Downloading databases-0.5.3-py3-none-any.whl (20 kB)
Requirement already satisfied: sqlalchemy<1.5,>=1.4 in ./venv/lib/python3.8/site-packages (from databases) (1.4.28)
Requirement already satisfied: greenlet!=0.4.17 in ./venv/lib/python3.8/site-packages (from sqlalchemy<1.5,>=1.4->databases) (1.1.2)
Installing collected packages: databases
Successfully installed databases-0.5.3
(venv) ZHR:pythonProject zc$ pip3 install aiomysql

实现增删改查

"""
@File    :   UserTwo.py
@Modify Time      @Author    @Version
------------      -------    --------
@Desciption:
"""
import time
import datetime

import databases
import sqlalchemy
import asyncio


class UserManager:
    def __init__(self, userid, env):
        url = "mysql+pymysql://{user}:{passwd}@{host}:{port}/{db}".format(**env)
        # 通过database链接数据库
        self.database = databases.Database(url)
        # 通过sqlalchemy获取数据库的元数据,元数据,说人话就是他能将你链接的数据库的所有的表的信息,都抽象出来
        self.metadata = sqlalchemy.MetaData()
        # 通过create_engine初始化数据库链接
        # pool_size: 是连接池的大小,默认为5个,0表示连接数无限制
        # pool_recycle: MySQL 默认情况下如果一个连接8小时内容没有任何动作(查询请求)就会自动断开链接,出现 MySQL has gone away的错误。设置了 pool_recycle 后 SQLAlchemy 就会在指定时间内回收连接。如果设置为3600 就表示 1小时后该连接会被自动回收。
        # pool_timeout: 定义超时时间
        # max_overflow:超过连接池大小外最多创建的连接
        self.engine = sqlalchemy.create_engine(url, encoding='utf-8', max_overflow=0, pool_size=5,
                                               pool_timeout=30, pool_recycle=3600, )
        #  来创建所有 Base 派生类所对应的数据表,如果存在则忽略
        self.metadata.create_all(self.engine)
        self.userid = userid

    def _user_cls(self):
        """
        初始化类user表的抽象信息,具体返回了什么看get_user_name里的讲解
        :return:
        """
        return sqlalchemy.Table(
            "m_user",
            self.metadata,
            autoload=True,
            autoload_with=self.engine,
        )

    async def get_user_name(self, email: str) -> int or bool:
        """
        查询用户名称
        :param email: 这个只是为了演示多个查询条件的
        :return: 如果有则返回用户名,没有的话就返回False
        其中,这里使用了async和await来使用协程
        """
        print("_user_cls")
        print(self._user_cls())
        # _user_cls()返回的是一个类型为sqlalchemy.sql.schema.Table的类,名字就是我们的表名m_user
        print(type(self._user_cls()))
        print("_user_cls().c")
        print(self._user_cls().c)
        # _user_cls().c 返回的是对应表的字段的Collection
        print(type(self._user_cls().c))
        async with self.database as cur:
            # \的意义只是用来换行的,让代码看起来更整洁易懂,query定义查询条件
            query = sqlalchemy.select([self._user_cls().c.username]). \
                where(sqlalchemy.and_(
                # 查询条件
                self._user_cls().c.id == self.userid,
                self._user_cls().c.email == email
            ))
            # fetch_one,通过游标获取唯一一条数据,由于fetch_one也是async异步的(点进去看一下源码就可以了),所以我们可以在这里使用await
            res = await cur.fetch_one(query)
            print("res")
            print(res)
            # 返回值是sqlalchemy.engine.row.Row的,我们可以想想成元组就行了
            print(type(res))
            if res:
                return res[0]
            else:
                return False

    async def add_user(self, user_id_add: int, username: str, avatar: str, email: str, password: str, status: int):
        """
        新增功能
        :param user_id_add: 这里没有用self.userid是因为那个值是用来查询修改和删除的,咱们现在是新增,每次都不一样
        :param username: 用户名
        :param avatar: 图标
        :param email: 邮箱
        :param password: 密码
        :param status: 状态
        :return:返回插入数据库结果,1则为插入成功
        """
        # 获取当前时间,这个具体怎么定义取决你的数据库的时间字段是怎么设计的,下面两种方式是一样的
        created = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        last = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print(created)
        print(last)
        async with self.database as cur:
            query = self._user_cls().insert(). \
                values({self._user_cls().c.id: user_id_add,
                        self._user_cls().c.username: username,
                        self._user_cls().c.email: email,
                        self._user_cls().c.avatar: avatar,
                        self._user_cls().c.password: password,
                        self._user_cls().c.status: status,
                        self._user_cls().c.created: created,
                        self._user_cls().c.last_login: last})
            result = await cur.execute(query)
            print(result)
            return result

    async def update_username(self, username_update: str, status_update: int = 1) -> bool:
        """
        更新对应记录的用户名
        :param username_update: 要修改成的用户名
        :param status_update: 只修改用户状态为1的记录
        :return:返回1即为更新成功,返回0则为更新失败,比如有根据where条件无查询结果,就没办法更新,这样就属于失败的情况
        """
        async with self.database as cur:
            last = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            query = self._user_cls().update(). \
                where(sqlalchemy.and_(self._user_cls().c.id == self.userid,
                                      self._user_cls().c.status == status_update)). \
                values({self._user_cls().c.username: username_update,
                        self._user_cls().c.last_login: last})

            res = await cur.execute(query)
            print('res更新')
            print(res)
            return res

    async def deleta_user(self):
        """
        删除用户
        :return: 删除成功则返回1,删除失败则返回0
        """
        async with self.database as cur:
            query = self._user_cls().delete(). \
                where(sqlalchemy.and_(self._user_cls().c.id == self.userid))
            res = await cur.execute(query)
            print('res删除')
            print(res)
            return res


if __name__ == "__main__":
    env = {"user": "xxx", "passwd": "xxx",
           "host": "rm-xxx.mysql.rds.aliyuncs.com",
           "port": 3306, "db": "xxx"}
    userid = 24
    user = UserManager(userid, env)
    email = "zhangxi@qq.com"
    user_name = asyncio.run(user.get_user_name(email))
    print(user_name)
    # 由于使用了async和await,我们需要使用asyncio.run来调用方法才可以,当然,这里仅限于我们自己调试
    asyncio.run(user.add_user(24, "huanghe", "hahaha", "zhangsan@qq.com", "password", 1))
    # asyncio.run(user.update_username("update_name22"))
    asyncio.run(user.deleta_user())

工程化设计数据库层

上面我们已经实现了增删改查的基础功能,那我们可以基于增删改查来实现各种各样的功能,我们可以这样设计框架

项目包
	db_module  # 这个包下的是数据库层
			user_module # 每一张我们需要用到的表,都新建一个python包,取名为表名_module
			   __init__.py # 将user.py import进来,然后根据增删改查来实现各种功能这才是我们暴露出去的功能
			   user.py # 实现每一张表的增删改查功能
			__init__.py # 将每个表名_module都引入进来,统一向外提供服务

user.module.init.py

"""
@File    :   __init__.py.py    
@Modify Time      @Author    @Version
------------      -------    --------    
@Desciption:这里才是我们暴露出去的方法,当然我们这里还存在可以优化的点,比如获取环境,从配置中读取,现在只做演示
"""
from db_module.user_module.UserTWo import *


async def set_user_name(user_id: int, user_name: str, user_email: str):
    env = {"user": "xxx", "passwd": "xxx",
           "host": "rm-xxx.mysql.rds.aliyuncs.com",
           "port": 3306, "db": "xxx"}
    user_cls = UserManager(user_id, env)
    # 首先调用查询方法,看看是不是存在这条记录
    result = await user_cls.get_user_name(user_email)
    if result:
        # 如果存在,则更新
        await user_cls.update_username(user_name)
    else:
        # 如果不存在,则返回False,这里就看你具体的业务怎么写了
        return False

    # return res if True else False


if __name__ == "__main__":
    email_init = "zhangxi@qq.com"
    user_name_init = "init更新"
    asyncio.run(set_user_name(1, user_name_init, email_init))

db_module.init.py

"""
@File    :   __init__.py.py    
@Modify Time      @Author    @Version
------------      -------    --------    
@Desciption:
"""

from db_module.user_module import *
 类似资料: