Defining models is similar to Django or SQLAlchemy
译文:定义模型类似于Django或SQLAlchemy
文档
示例代码仓库
https://github.com/mouday/peewee-demo
安装
pip install peewee
测试环境
$ python --version
Python 3.7.0
$ pip show peewee
Name: peewee
Version: 3.15.3
# -*- coding: utf-8 -*-
"""
@File : database.py
"""
from peewee import SqliteDatabase
import logging
# 设置数据库
db = SqliteDatabase("demo.db")
# 打印日志
logger = logging.getLogger('peewee')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
logger.propagate = False # 不向上传播
from app.database import db
# 链接数据库
db.connect()
# 断开数据库
if not db.is_closed():
db.close()
获取多条记录
cursor = db.execute_sql("select * from tb_user where id = ?", (1,))
rows = cursor.fetchall()
print(rows)
[
(1, 'Jack', 23, '2022-10-19 18:09:07.038935', '2022-10-19 18:09:07.038940')
]
获取单条记录
cursor = db.execute_sql("select * from tb_user where id = ?", (1,))
# 将返回结果转换为dict
# https://docs.python.org/zh-cn/3.6/library/sqlite3.html#sqlite3.Connection.row_factory
def dict_factory(cursor, row):
"""将返回结果转换为dict"""
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
cursor.row_factory = dict_factory
row = cursor.fetchone()
print(row)
{
'id': 1,
'name': 'Jack',
'age': 23,
'created_time': '2022-10-19 18:09:07.038935',
'update_time': '2022-10-19 18:09:07.038940'
}
定义基类模型
# -*- coding: utf-8 -*-
"""
@File : base_model.py
"""
from peewee import Model
from app.database import db
class BaseModel(Model):
"""
# 基类,设置数据库链接
"""
class Meta:
database = db
定义模型
# -*- coding: utf-8 -*-
"""
@File : user_model.py
"""
from datetime import datetime
from peewee import CharField, DateTimeField, IntegerField, AutoField
from app.model.base_model import BaseModel
class UserModel(BaseModel):
"""
用户表
"""
id = AutoField()
name = CharField(null=False)
age = IntegerField(null=False)
created_time = DateTimeField(default=datetime.now)
update_time = DateTimeField(default=datetime.now)
class Meta:
# 指定表名
table_name = 'tb_user'
建表
UserModel.create_table()
(
'CREATE TABLE IF NOT EXISTS "tb_user" (
"id" INTEGER NOT NULL PRIMARY KEY,
"name" VARCHAR(255) NOT NULL,
"age" INTEGER NOT NULL,
"created_time" DATETIME NOT NULL,
"update_time" DATETIME NOT NULL)',
[]
)
查看表是否存在
UserModel.table_exists()
(
'SELECT name FROM "main".sqlite_master WHERE type=? ORDER BY name',
('table',)
)
删除表
UserModel.drop_table()
(
'DROP TABLE IF EXISTS "tb_user"',
[]
)
插入数据
ret = UserModel.insert({
UserModel.age: 20,
UserModel.name: 'Tom'
}).execute()
'INSERT INTO "tb_user"
("name", "age", "created_time", "update_time")
VALUES (?, ?, ?, ?)',
[
'Tom',
20,
datetime.datetime(2022, 10, 19, 17, 28, 30, 198981),
datetime.datetime(2022, 10, 19, 17, 28, 30, 198988)
]
插入字典数据
ret = UserModel.insert({
'age': 20,
'name': 'Tom'
}).execute()
'INSERT INTO "tb_user"
("name", "age", "created_time", "update_time")
VALUES (?, ?, ?, ?)',
[
'Tom',
20,
datetime.datetime(2022, 10, 19, 17, 28, 30, 198981),
datetime.datetime(2022, 10, 19, 17, 28, 30, 198988)
]
保存实例
user = UserModel(
age=21,
name='Tom'
)
user.save()
('INSERT INTO "tb_user"
("name", "age", "created_time", "update_time")
VALUES (?, ?, ?, ?)',
['Charlie', 12,
datetime.datetime(2022, 10, 19, 17, 34, 43, 376650),
datetime.datetime(2022, 10, 19, 17, 34, 43, 376652)])
插入并创建实例
user = UserModel.create(
age=22,
name='Tom'
)
('INSERT INTO "tb_user"
("name", "age", "created_time", "update_time")
VALUES (?, ?, ?, ?)',
['Charlie', 12,
datetime.datetime(2022, 10, 19, 17, 36, 16, 408224),
datetime.datetime(2022, 10, 19, 17, 36, 16, 408226)])
插入多条数据
UserModel.insert_many([
{
'age': 23,
'name': 'Tom'
},
{
'age': 24,
'name': 'Tom'
}
]).execute()
('INSERT INTO "tb_user"
("name", "age", "created_time", "update_time")
VALUES (?, ?, ?, ?), (?, ?, ?, ?)',
[
'Tom', 23,
datetime.datetime(2022, 10, 19, 17, 38, 48, 106336),
datetime.datetime(2022, 10, 19, 17, 38, 48, 106344),
'Tom', 24,
datetime.datetime(2022, 10, 19, 17, 38, 48, 106355),
datetime.datetime(2022, 10, 19, 17, 38, 48, 106360)])
分块插入,忽略重复数据
from peewee import chunked
# Insert rows 100 at a time.
with db.atomic():
for batch in chunked(data_source, 100):
MyModel.insert_many(batch).on_conflict_ignore().execute()
更新多条数据
UserModel.update(
name='Jack'
).where(
UserModel.id == 1
).execute()
('UPDATE "tb_user" SET "name" = ? WHERE ("tb_user"."id" = ?)', ['Jack', 1])
更新单条数据
UserModel.set_by_id(1, {'name': 'Jack'})
('UPDATE "tb_user" SET "name" = ? WHERE ("tb_user"."id" = ?)', ['Jack', 1])
按照主键删除
UserModel.delete_by_id(1)
('DELETE FROM "tb_user" WHERE ("tb_user"."id" = ?)', [1])
按条件删除
UserModel.delete().where(
UserModel.id == 1
).execute()
('DELETE FROM "tb_user" WHERE ("tb_user"."id" = ?)', [1])
删除实例
user = UserModel.get_by_id(1)
user.delete_instance()
('DELETE FROM "tb_user" WHERE ("tb_user"."id" = ?)', [1])
清空表数据
UserModel.truncate_table()
('DELETE FROM "tb_user"', [])
条件查询一条
row = UserModel.select().where(
UserModel.name == 'Tom'
).get()
print(row)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time"
FROM "tb_user" AS "t1"
WHERE ("t1"."name" = ?)
LIMIT ? OFFSET ?',
['Tom', 1, 0])
获取第一条
row = UserModel.select().where(
UserModel.name == 'Tom'
).first()
print(row)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time"
FROM "tb_user" AS "t1"
WHERE ("t1"."name" = ?)
LIMIT ?',
['Tom', 1])
通过获取,不存在报错
row = UserModel.get(UserModel.name == 'Tom')
print(row)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time"
FROM "tb_user" AS "t1"
WHERE ("t1"."name" = ?)
LIMIT ? OFFSET ?',
['Tom', 1, 0])
通过获取或者返回None
user = UserModel.get_or_none(UserModel.name == 'Jack')
print(user)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time"
FROM "tb_user" AS "t1"
WHERE ("t1"."name" = ?)
LIMIT ? OFFSET ?',
['Jack', 1, 0])
通过主键获取,不存在报错
user = UserModel.get_by_id(1)
print(user)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time"
FROM "tb_user" AS "t1"
WHERE ("t1"."id" = ?)
LIMIT ?
OFFSET ?',
[1, 1, 0])
获取或创建
UserModel.get_or_create(name='Tom', age=23)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time"
FROM "tb_user" AS "t1"
WHERE (("t1"."name" = ?) AND ("t1"."age" = ?))
LIMIT ? OFFSET ?',
['Tom', 23, 1, 0])
('BEGIN', None)
('INSERT INTO "tb_user"
("name", "age", "created_time", "update_time")
VALUES (?, ?, ?, ?)',
['Tom', 23,
datetime.datetime(2022, 10, 19, 18, 9, 7, 38935),
datetime.datetime(2022, 10, 19, 18, 9, 7, 38940)])
查询多条记录
# 注意,获取的是 iterator
# 可以转为 namedtuples(), tuples(), dicts()
query = UserModel.select().where(
UserModel.name == 'Tom'
)
print(list(query))
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time"
FROM "tb_user" AS "t1"
WHERE ("t1"."name" = ?)',
['Tom'])
query = UserModel.select().where(
UserModel.name == 'Tom'
).order_by(UserModel.age.desc())
print(list(query))
# [<UserModel: 1>]
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time"
FROM "tb_user" AS "t1"
WHERE ("t1"."name" = ?)
ORDER BY "t1"."age" DESC',
['Tom'])
query = UserModel.select().paginate(2, 10)
print(list(query))
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time"
FROM "tb_user" AS "t1"
LIMIT ? OFFSET ?',
[10, 10])
query = UserModel.select().count()
print(list(query))
('SELECT COUNT(1) FROM (SELECT 1 FROM "tb_user" AS "t1") AS "_wrapped"', [])
query = UserModel.select().group_by(UserModel.name)
print(list(query))
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time"
FROM "tb_user" AS "t1"
GROUP BY "t1"."name"',
[])
query = UserModel.select(
UserModel.group_id,
fn.COUNT().alias('count')
).group_by(UserModel.group_id)
print(list(query))
('SELECT "t1"."group_id", COUNT() AS "count"
FROM "tb_user" AS "t1"
GROUP BY "t1"."group_id"',
[])