from gino import Gino db = Gino()
import asyncio async def main(): # asyncpg://postgres:xxxxxx@127.0.0.1:5432/postgres await db.set_bind('driver://username:password@localhost:port/database', echo=True) asyncio.get_event_loop().run_until_complete(main()) # echo 是否输出日志 # driver 驱动类型,如:asyncpg, postgresql + psycopg2 # username 数据库用户名 # password 数据库秘密 # database 默认数据库
db = Gino() class Base(db.Model): __abstract__ = True id = db.Column(db.BigInteger, primary_key=True, autoincrement=True) created_on = db.Column(db.DateTime, default=datetime.now, server_default=db.func.now(), index=True) class User(Base): __tablename__ = 'user' nickname = db.Column(db.String(32), default='noname') user_groups = db.Column(db.Array(db.Integer))
单条创建
await User(nickname="xxxxx").create()
多条插入
from sqlalchemy.dialects.postgresql import insert data = [dict(nickname="xxxxx"), dict(nickname="yyyyy")] await insert(User).values(data).on_conflict_do_nothing().gino.status() # on_conflict_do_nothing 跳过报错,及报错不响应
单条删除
user = await User.query.where(User.id == 1).gino.first() await user.delete()
多条删除(最好使用这种操作)
await User.delete.where(User.id.in_([1, 2])).gino.status()
# 方法一 user = await User.query.where(User.id == 1).gino.first() await user.update(nickname='bbbbb').apply() # 方法二 await User.update.where(User.id == 1).values(nickname='bbbbb').gino.status()
# 方法一 await User.query.where(User.id == 1).gino.first() # 查出第一条 .all()查出所有 # 方法二 await db.select([User.id, User.nickname]).where(User.id == 1).gino.first() # 在列表中直接写表明User,表示取出左右字段
label_query = db.select([User.id, User.nickname]).alias("q") db.select([ db.func.coalesce( db.func.array_to_json( db.func.array_agg(db.func.row_to_json(db.select([db.column("q")]).label("q"))) ), db.cast(list(), db.JSON), ) ]) .where( User.id.any(label_query.c.id), ).label("labels") # db.func.row_to_json 将查询出的字段转换成json: {字段名: 值} # db.func.array_agg 将多条记录合并成数组,注意!!! 此时如果列表中记录为json,使用此函数转换后数组中的记录是文本型 # db.func.array_to_json 将列表记录转换成json,使用此函数可以格式化db.func.array_agg中值为文本型的情况 # db.func.coalesce 当参数一为空时,返回参数二 # db.cast 转换数据类型
当查询条件为列表时使用in
await User.query.where(User.id.in_([1, 2, 3]).gino.all() # SQL """ SELECT user.id, user.nickname, user.user_groups, user.created_on FROM user WHERE user.id in (1, 2, 3) """
当查询字段为列表时使用any
await User.query.where(User.user_groups.any(1).gino.first() # SQL """ SELECT user.id, user.nickname, user.user_groups, user.created_on FROM user WHERE 1 = ANY(user.user_group) """
db.func.contains()
await db.select([
db.case([(User.nickname == "admin", "管理员")], else_="普通用户").label("status")
]).gino.all()
await User.update.where(User.id == 1).values(user_groups=db.func.array_remove(3, 4)).gino.status()
db.column(db.select([User.id, User.nickname]).label("c"))
await db.select(func.sum(Incidents.severity).filter(Incidents.severity == 'SEV').label('sum_severity')).select_from(Incidents).gono.all()
await db.select([Task.request]).where(Task.request['a'].cast(db.String) == json.dumps('123')).gino.all()
"""
上面的代码转换成sql语句:
SELECT request FROM task WHERE request -> 'a' = '123'
"""
# 注: SQL语句中 -> 表示查出来的数据是JSONB格式,->> 查出来的是TEXT格式
# 所以如上sqlalchemy代码可能查出来的结果不是你想要的,那么要转换成 ->> 运算符就要用如下代码
await db.select([Task.request]).where(Task.request.op('->>')('a').cast(db.String) == json.dumps('123')).gino.all()
await db.select([Task.request]).where(Task.request.op('->>')('b').cast(db.String).like('xxx%')).gino.all()
from sqlalchemy import text
db.func.coalesce(Node.queues, text("'[]'")).label('queues') # text("'[]'")将 [] 转换成数据库认得到的列表类型
"""
COALESCE(nodes.queues, '{}') as queues # 这里的value 要加引号,且必须使用{}(数据库中认不到[])
"""
查询近12个月数据
# 方法一
from dateutil.relativedelta import relativedelta
format_data = db.func.to_char(User.created_on, "yyyy-mm").label("date")
start_date = (datetime.now() - relativedelta(months=12)).strftime("%Y-%m")
await db.select([format_data]).where(
format_data >= start_date
).group_by(format_data).gino.all()
# 方法二
from sqlalchemy import extract
format_data = db.func.to_char(User.created_on, "yyyy-mm").label("date")
start_year = (datetime.now() - relativedelta(years=1)).strftime("%Y")
start_month = (datetime.now() - relativedelta(years=1)).strftime("%m")
await db.select([format_data]).where(db.and_(
extract('year', User.created_on) >= start_year,
extract('month', User.created_on) >= start_month
)).group_by(format_data).gino.all()
查询今年到当前月份的数据
from dateutil.relativedelta import relativedelta
await db.select([User]).where(db.and_(
User.created_on.between(datetime.now().strftime("%Y-01"), datetime.now().strftime("%Y-%m"))
)).group_by(format_data).gino.all()
User.id.label("user_id")
db.select([User.id, User.nickname]).where(User.id == 1).alias('_u')
db.select([User.user_groups[0]]).where(User.id == 1).alias('_u')
"""
字段 id 值为 {1}
直接使用 id[0] 取出值进行操作,取不到返回null
SELECT user.user_groups[0] FROM user WHERE user.id = 1
"""
当查询语句未使用.gino.xxxx()时,就未对数据库进行操作。
所以在此之前可以将复杂查询组合好,最后一次性对数据库进行操作。
from sqlalchemy.dialects import postgresql
query = User.query.where(User.id == 1)
print(query.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}))
1. primary_key:设置某个字段为主键。默认值是True
2. autoincrement:设置这个字段为⾃增长。默认值是True
3. default: 设置字段的默认值。
4. nullable:指定某个字段是否为空。默认值是True,可以为空。
5. unique:指定某个字段的值是否唯⼀,默认是False。
6. onupdate:在数据更新的时候会调这个参数指定的值或者函数。在第一次插这条数据的时候,不会用onupdate的值,只会使用default的值。(每次更新数据的时候都要更新该字段值)。
7. name:指定ORM模型的中某个属性映射到表中的字段名。如果不指定,那么会使用这个属性的名字来作为字段名。如果指定了,就会使用指定的这个值作为表字段名。这个参数也可以当作位置参数,在第1个参数来指定