当前位置: 首页 > 工具软件 > peewee > 使用案例 >

Python:peewee常用操作CRUD

邓德本
2023-12-01

Defining models is similar to Django or SQLAlchemy

译文:定义模型类似于Django或SQLAlchemy

文档

示例代码仓库

https://github.com/mouday/peewee-demo

安装

pip install peewee

测试环境

$ python --version
Python 3.7.0

$ pip show peewee
Name: peewee
Version: 3.15.3

1、数据库 Database

1.1、设置参数

# -*- coding: utf-8 -*-
"""
@File    : database.py
"""

from peewee import SqliteDatabase
import logging

# 设置数据库
db = SqliteDatabase("demo.db")

# 打印日志
logger = logging.getLogger('peewee')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
logger.propagate = False  # 不向上传播

1.2、连接数据库

from app.database import db

# 链接数据库
db.connect()

# 断开数据库
if not db.is_closed():
    db.close()
    

1.3、执行原生sql

获取多条记录

cursor = db.execute_sql("select * from tb_user where id = ?", (1,))
rows = cursor.fetchall()
print(rows)
[
    (1, 'Jack', 23, '2022-10-19 18:09:07.038935', '2022-10-19 18:09:07.038940')
]

获取单条记录

cursor = db.execute_sql("select * from tb_user where id = ?", (1,))

# 将返回结果转换为dict
# https://docs.python.org/zh-cn/3.6/library/sqlite3.html#sqlite3.Connection.row_factory 
def dict_factory(cursor, row):
    """将返回结果转换为dict"""
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d

cursor.row_factory = dict_factory
row = cursor.fetchone()
print(row)
{
    'id': 1, 
    'name': 'Jack', 
    'age': 23, 
    'created_time': '2022-10-19 18:09:07.038935', 
    'update_time': '2022-10-19 18:09:07.038940'
}

2、模型 Model

2.1、定义模型

定义基类模型

# -*- coding: utf-8 -*-
"""
@File    : base_model.py
"""

from peewee import Model

from app.database import db


class BaseModel(Model):
    """
    # 基类,设置数据库链接
    """

    class Meta:
        database = db

定义模型

# -*- coding: utf-8 -*-
"""
@File    : user_model.py
"""

from datetime import datetime

from peewee import CharField, DateTimeField, IntegerField, AutoField

from app.model.base_model import BaseModel


class UserModel(BaseModel):
    """
    用户表
    """

    id = AutoField()
    name = CharField(null=False)
    age = IntegerField(null=False)

    created_time = DateTimeField(default=datetime.now)
    update_time = DateTimeField(default=datetime.now)

    class Meta:
        # 指定表名
        table_name = 'tb_user'

2.2、表操作

建表

UserModel.create_table()
(
    'CREATE TABLE IF NOT EXISTS "tb_user" (
    "id" INTEGER NOT NULL PRIMARY KEY, 
    "name" VARCHAR(255) NOT NULL, 
    "age" INTEGER NOT NULL, 
    "created_time" DATETIME NOT NULL, 
    "update_time" DATETIME NOT NULL)', 
    []
)

查看表是否存在

UserModel.table_exists()
(
    'SELECT name FROM "main".sqlite_master WHERE type=? ORDER BY name',
    ('table',)
 )

删除表

UserModel.drop_table()
(
    'DROP TABLE IF EXISTS "tb_user"', 
    []
)

3、模型的CURD操作

3.1、写入操作

插入数据

ret = UserModel.insert({
    UserModel.age: 20,
    UserModel.name: 'Tom'
}).execute()
'INSERT INTO "tb_user"
("name", "age", "created_time", "update_time")
VALUES (?, ?, ?, ?)',
[
    'Tom', 
    20, 
    datetime.datetime(2022, 10, 19, 17, 28, 30, 198981), 
    datetime.datetime(2022, 10, 19, 17, 28, 30, 198988)
]

插入字典数据

ret = UserModel.insert({
    'age': 20,
    'name': 'Tom'
}).execute()
'INSERT INTO "tb_user"
("name", "age", "created_time", "update_time")
VALUES (?, ?, ?, ?)',
[
    'Tom', 
    20, 
    datetime.datetime(2022, 10, 19, 17, 28, 30, 198981), 
    datetime.datetime(2022, 10, 19, 17, 28, 30, 198988)
]

保存实例

user = UserModel(
    age=21,
    name='Tom'
)

user.save()
('INSERT INTO "tb_user" 
    ("name", "age", "created_time", "update_time") 
    VALUES (?, ?, ?, ?)', 
['Charlie', 12, 
datetime.datetime(2022, 10, 19, 17, 34, 43, 376650), 
datetime.datetime(2022, 10, 19, 17, 34, 43, 376652)])

插入并创建实例

user = UserModel.create(
    age=22,
    name='Tom'
)
('INSERT INTO "tb_user" 
    ("name", "age", "created_time", "update_time") 
    VALUES (?, ?, ?, ?)', 
    ['Charlie', 12, 
    datetime.datetime(2022, 10, 19, 17, 36, 16, 408224), 
    datetime.datetime(2022, 10, 19, 17, 36, 16, 408226)])

插入多条数据

UserModel.insert_many([
    {
        'age': 23,
        'name': 'Tom'
    },
    {
        'age': 24,
        'name': 'Tom'
    }
]).execute()
('INSERT INTO "tb_user" 
    ("name", "age", "created_time", "update_time") 
    VALUES (?, ?, ?, ?), (?, ?, ?, ?)', 
    [
    'Tom', 23, 
    datetime.datetime(2022, 10, 19, 17, 38, 48, 106336), 
    datetime.datetime(2022, 10, 19, 17, 38, 48, 106344), 
    'Tom', 24, 
    datetime.datetime(2022, 10, 19, 17, 38, 48, 106355), 
    datetime.datetime(2022, 10, 19, 17, 38, 48, 106360)])

分块插入,忽略重复数据

from peewee import chunked

# Insert rows 100 at a time.
with db.atomic():
    for batch in chunked(data_source, 100):
        MyModel.insert_many(batch).on_conflict_ignore().execute()

3.2、更新数据

更新多条数据

UserModel.update(
    name='Jack'
).where(
    UserModel.id == 1
).execute()
('UPDATE "tb_user" SET "name" = ? WHERE ("tb_user"."id" = ?)', ['Jack', 1])

更新单条数据

UserModel.set_by_id(1, {'name': 'Jack'})
('UPDATE "tb_user" SET "name" = ? WHERE ("tb_user"."id" = ?)', ['Jack', 1])

3.3、删除数据

按照主键删除

UserModel.delete_by_id(1)
('DELETE FROM "tb_user" WHERE ("tb_user"."id" = ?)', [1])

按条件删除

UserModel.delete().where(
    UserModel.id == 1
).execute()
('DELETE FROM "tb_user" WHERE ("tb_user"."id" = ?)', [1])

删除实例

user = UserModel.get_by_id(1)
       
user.delete_instance()
('DELETE FROM "tb_user" WHERE ("tb_user"."id" = ?)', [1])

清空表数据

UserModel.truncate_table()
('DELETE FROM "tb_user"', [])

3.4、取单条数据

条件查询一条

row = UserModel.select().where(
    UserModel.name == 'Tom'
).get()

print(row)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    WHERE ("t1"."name" = ?) 
    LIMIT ? OFFSET ?', 
    ['Tom', 1, 0])

获取第一条

row = UserModel.select().where(
    UserModel.name == 'Tom'
).first()

print(row)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    WHERE ("t1"."name" = ?) 
    LIMIT ?', 
    ['Tom', 1])

通过获取,不存在报错

row = UserModel.get(UserModel.name == 'Tom')
print(row)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    WHERE ("t1"."name" = ?) 
    LIMIT ? OFFSET ?', 
    ['Tom', 1, 0])

通过获取或者返回None

user = UserModel.get_or_none(UserModel.name == 'Jack')
print(user)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    WHERE ("t1"."name" = ?) 
    LIMIT ? OFFSET ?', 
    ['Jack', 1, 0])

通过主键获取,不存在报错

user = UserModel.get_by_id(1)

print(user)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    WHERE ("t1"."id" = ?) 
    LIMIT ? 
    OFFSET ?', 
    [1, 1, 0])

获取或创建

UserModel.get_or_create(name='Tom', age=23)
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    WHERE (("t1"."name" = ?) AND ("t1"."age" = ?)) 
    LIMIT ? OFFSET ?', 
    ['Tom', 23, 1, 0])

('BEGIN', None)

('INSERT INTO "tb_user" 
    ("name", "age", "created_time", "update_time") 
    VALUES (?, ?, ?, ?)', 
    ['Tom', 23, 
    datetime.datetime(2022, 10, 19, 18, 9, 7, 38935), 
    datetime.datetime(2022, 10, 19, 18, 9, 7, 38940)])

3.5、取多条数据

查询多条记录

# 注意,获取的是 iterator
# 可以转为 namedtuples(), tuples(), dicts()

query = UserModel.select().where(
    UserModel.name == 'Tom'
)

print(list(query))
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    WHERE ("t1"."name" = ?)', 
    ['Tom'])

4、排序分组统计

1、排序

query = UserModel.select().where(
    UserModel.name == 'Tom'
).order_by(UserModel.age.desc())

print(list(query))
# [<UserModel: 1>]
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    WHERE ("t1"."name" = ?) 
    ORDER BY "t1"."age" DESC', 
    ['Tom'])

4.2、分页

query = UserModel.select().paginate(2, 10)

print(list(query))
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    LIMIT ? OFFSET ?', 
    [10, 10])

4.3、统计

query = UserModel.select().count()
print(list(query))
('SELECT COUNT(1) FROM (SELECT 1 FROM "tb_user" AS "t1") AS "_wrapped"', [])

4.4、分组

query = UserModel.select().group_by(UserModel.name)

print(list(query))
('SELECT "t1"."id", "t1"."name", "t1"."age", "t1"."created_time", "t1"."update_time" 
    FROM "tb_user" AS "t1" 
    GROUP BY "t1"."name"', 
    [])

4.5、分组统计

query = UserModel.select(
	        UserModel.group_id,
	        fn.COUNT().alias('count')
	    ).group_by(UserModel.group_id)
	    
print(list(query))
('SELECT "t1"."group_id", COUNT() AS "count" 
	FROM "tb_user" AS "t1" 
	GROUP BY "t1"."group_id"', 
[])
 类似资料: