当前位置: 首页 > 工具软件 > GINO > 使用案例 >

Gino —— Postgresql

壤驷鸿
2023-12-01


官方文档
git示例

一、连接数据库

from gino import Gino

db = Gino()

import asyncio

async def main():
   # asyncpg://postgres:xxxxxx@127.0.0.1:5432/postgres
   await db.set_bind('driver://username:password@localhost:port/database', echo=True)
asyncio.get_event_loop().run_until_complete(main())  

# echo 是否输出日志
# driver 驱动类型,如:asyncpg, postgresql + psycopg2
# username 数据库用户名
# password 数据库秘密
# database 默认数据库
db = Gino()

class Base(db.Model):
   __abstract__ = True
   
   id = db.Column(db.BigInteger, primary_key=True, autoincrement=True)
   created_on = db.Column(db.DateTime, default=datetime.now, server_default=db.func.now(), index=True)


class User(Base):
   __tablename__ = 'user'
   
   nickname = db.Column(db.String(32), default='noname')
   user_groups = db.Column(db.Array(db.Integer))

二、基本操作(CRUD)

单条创建

await User(nickname="xxxxx").create()

多条插入

from sqlalchemy.dialects.postgresql import insert

data = [dict(nickname="xxxxx"), dict(nickname="yyyyy")]
await insert(User).values(data).on_conflict_do_nothing().gino.status()
# on_conflict_do_nothing 跳过报错,及报错不响应

单条删除

user = await User.query.where(User.id == 1).gino.first()
await user.delete()

多条删除(最好使用这种操作)

await User.delete.where(User.id.in_([1, 2])).gino.status() 

# 方法一
user = await User.query.where(User.id == 1).gino.first()
await user.update(nickname='bbbbb').apply()

# 方法二
await User.update.where(User.id == 1).values(nickname='bbbbb').gino.status()

# 方法一
await User.query.where(User.id == 1).gino.first()  # 查出第一条 .all()查出所有

# 方法二
await db.select([User.id, User.nickname]).where(User.id == 1).gino.first()  # 在列表中直接写表明User,表示取出左右字段

三、进阶操作

1. 多条记录转json

label_query = db.select([User.id, User.nickname]).alias("q")
db.select([
       db.func.coalesce(
           db.func.array_to_json(
               db.func.array_agg(db.func.row_to_json(db.select([db.column("q")]).label("q")))
           ),
           db.cast(list(), db.JSON),
       )
])
.where(
   User.id.any(label_query.c.id),
).label("labels")

# db.func.row_to_json 将查询出的字段转换成json: {字段名: 值}
# db.func.array_agg 将多条记录合并成数组,注意!!! 此时如果列表中记录为json,使用此函数转换后数组中的记录是文本型
# db.func.array_to_json 将列表记录转换成json,使用此函数可以格式化db.func.array_agg中值为文本型的情况
# db.func.coalesce 当参数一为空时,返回参数二
# db.cast 转换数据类型

2. in_()与any()

当查询条件为列表时使用in

await User.query.where(User.id.in_([1, 2, 3]).gino.all()

# SQL
"""
SELECT
	user.id, user.nickname, user.user_groups, user.created_on
FROM
	user
WHERE
	user.id in (1, 2, 3)
"""

当查询字段为列表时使用any

await User.query.where(User.user_groups.any(1).gino.first()
                      
# SQL
"""
SELECT
	user.id, user.nickname, user.user_groups, user.created_on
FROM
	user
WHERE
	1 = ANY(user.user_group)
"""

3. 其他函数

CONTAINS 包含

db.func.contains()

CASE 条件选择

await db.select([
    db.case([(User.nickname == "admin", "管理员")], else_="普通用户").label("status")
]).gino.all()

ARRAY_REMOVE 删除列表中的值

await User.update.where(User.id == 1).values(user_groups=db.func.array_remove(3, 4)).gino.status()

COLUMN 获取查询/表字段名

db.column(db.select([User.id, User.nickname]).label("c"))

FILTER 聚合筛选,在条件中根据结果筛选结果

await db.select(func.sum(Incidents.severity).filter(Incidents.severity == 'SEV').label('sum_severity')).select_from(Incidents).gono.all()

查询Json

await db.select([Task.request]).where(Task.request['a'].cast(db.String) == json.dumps('123')).gino.all()
"""
上面的代码转换成sql语句:
SELECT   request FROM task WHERE  request -> 'a' = '123'
"""
# 注: SQL语句中  -> 表示查出来的数据是JSONB格式,->> 查出来的是TEXT格式
# 所以如上sqlalchemy代码可能查出来的结果不是你想要的,那么要转换成 ->> 运算符就要用如下代码
await db.select([Task.request]).where(Task.request.op('->>')('a').cast(db.String) == json.dumps('123')).gino.all()
await db.select([Task.request]).where(Task.request.op('->>')('b').cast(db.String).like('xxx%')).gino.all()

COALESCE 查询空值,并赋值为默认值

from sqlalchemy import text

db.func.coalesce(Node.queues, text("'[]'")).label('queues')  # text("'[]'")将 [] 转换成数据库认得到的列表类型

"""
COALESCE(nodes.queues, '{}') as queues  # 这里的value 要加引号,且必须使用{}(数据库中认不到[])
"""

TO_CHAR 格式化日期时间

查询近12个月数据

# 方法一
from dateutil.relativedelta import relativedelta

format_data = db.func.to_char(User.created_on, "yyyy-mm").label("date")
start_date = (datetime.now() - relativedelta(months=12)).strftime("%Y-%m") 
await db.select([format_data]).where(
    format_data >= start_date
).group_by(format_data).gino.all()

# 方法二
from sqlalchemy import extract

format_data = db.func.to_char(User.created_on, "yyyy-mm").label("date")
start_year = (datetime.now() - relativedelta(years=1)).strftime("%Y")
start_month = (datetime.now() - relativedelta(years=1)).strftime("%m")
await db.select([format_data]).where(db.and_(
    extract('year', User.created_on) >= start_year,
    extract('month', User.created_on) >= start_month
)).group_by(format_data).gino.all()

查询今年到当前月份的数据

from dateutil.relativedelta import relativedelta

await db.select([User]).where(db.and_(
    User.created_on.between(datetime.now().strftime("%Y-01"), datetime.now().strftime("%Y-%m"))
)).group_by(format_data).gino.all()

LABEL 重命名字段名

User.id.label("user_id")

ALIAS 重命名查询

db.select([User.id, User.nickname]).where(User.id == 1).alias('_u')

字段数组型取出对应值

db.select([User.user_groups[0]]).where(User.id == 1).alias('_u')
"""
字段 id 值为 {1}
直接使用 id[0] 取出值进行操作,取不到返回null
SELECT user.user_groups[0] FROM user WHERE user.id = 1
"""

四、扩展

1. Gino异步流程

当查询语句未使用.gino.xxxx()时,就未对数据库进行操作。
所以在此之前可以将复杂查询组合好,最后一次性对数据库进行操作。

2. Gino 转 SQL语句

from sqlalchemy.dialects import postgresql

query = User.query.where(User.id == 1)
print(query.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}))

3. 数据模型字段参数

1. primary_key:设置某个字段为主键。默认值是True
2. autoincrement:设置这个字段为⾃增长。默认值是True
3. default: 设置字段的默认值。
4. nullable:指定某个字段是否为空。默认值是True,可以为空。
5. unique:指定某个字段的值是否唯⼀,默认是False。
6. onupdate:在数据更新的时候会调这个参数指定的值或者函数。在第一次插这条数据的时候,不会用onupdate的值,只会使用default的值。(每次更新数据的时候都要更新该字段值)。
7. name:指定ORM模型的中某个属性映射到表中的字段名。如果不指定,那么会使用这个属性的名字来作为字段名。如果指定了,就会使用指定的这个值作为表字段名。这个参数也可以当作位置参数,在第1个参数来指定
 类似资料: