当前位置: 首页 > 工具软件 > mysql-async > 使用案例 >

peewee mysql 好用吗_tornado+peewee-async+peewee+mysql(一)

商辰钊
2023-12-01

爬坑

最初遇到的坑,使用了最新版的peewee,连接池连接,也使用了async和await协程,但是怎么调用都会阻塞,后来发现不是阻塞单个协程,是阻塞了整个进程,因为tornado是单进程,必须数据库也使用异步操作,才能不阻塞整个进程

pip install peewee-async 的时候默认会安装符合版本要求的peewee

查看peewee-async 模块的MySQLDatabase,继承了AsyncDatabase和peewee.MySQLDatabase,AsyncDatabase方法全部使用协程实现异步

peewee-async 连接MySQL,返回database对象

单连接

import peewee_async

# db = peewee_async.MySQLDatabase(database_name, host, port, user, password)

# 或者,将自己的数据库信息封装到字典中

db_setting = {

"user": "root",

"password": "xxxxxx",

"host": "127.0.0.1",

"port": 3306,

"database": "test"

}

db = peewee_async.MySQLDatabase(**db_setting)

连接池

from playhouse.shortcuts import RetryOperationalError

from peewee_async import PooledMySQLDatabase

# 可以自动重新连接的连接池

class RetryMySQLDatabase(RetryOperationalError, PooledMySQLDatabase):

_instance = None

@staticmethod

def get_db_instance():

if not RetryMySQLDatabase._instance:

RetryMySQLDatabase._instance = RetryMySQLDatabase(database_name,

host, port, user, password,

max_connections=10)

return RetryMySQLDatabase._instance

db = RetryMySQLDatabase.get_db_instance()

返回的database对象的一些常用方法

get_tables() 返回列表,当前数据库的所有表名

get_columns(table_name) 传参表名,返回列表,包含ColumnMetadata对象,字段信息

create_tables()

第一个参数为列表,包含要创建的表model

第二个参数safe, 不传默认为False,建表的时候如果表已经存在会报错,可以加safe=True

is_closed() 判断当前连接是否关闭

close() 关闭连接

peewee

peewee 模块可以参照官方文档用法,和sqlalchemy差别不大,为了下面的操作,暂时建一个model

book.py

# 集中写一个basemodel,将数据库对象绑定在model上,类才能映射到数据库中的表

class BaseModel(Model):

class Meta:

database = db

class Book(BaseModel):

book_id = PrimaryKeyField() # int 主键自增,在peewee3.10 版本中新增了字段AutoField,表示主键自增

book_name = CharField(max_length=100, verbose_name="书名")

# 鉴于篇幅, 作者表不写,外键第一个参数为model类名,to_field表示关联的字段

book_auth = ForeignKeyField(User, to_field="user_id", verbose_name="作者id")

peewee-async Manager

管理数据库操作,实现异步操作数据库必须使用Manager,查看源码可以看到,类中的get, create, [email protected] from,在原有的数据库操作基础上做了封装

初始化传入数据库连接对象,生成manager对象,使用该对象完成数据库操作,在tornado中一般选择绑定到app上

from tornado.web import RequestHandler

from tornado import gen

import tornado.ioloop

from book import Book

class RegHandler(RequestHandler):

async def get(self):

# 在handler类中可以使用绑定到app上的manager对象执行操作,因为是异步操作需要使用await关键字

# 以下两种查询方式返回结果对象格式不同

book_res = await self.application.objects.get(Book, book_name="简爱")

id = book_res.book_id

# 只有调用了execute方法才是执行,query打印可以看到只是生成了sql语句

# 为保证异步调用必须使用peewee-async manager生成的对象执行操作函数,不能使用model的execute执行,会同步阻塞

query = Book.select().where(Book.username=="简爱")

# execute执行返回AsyncQueryWrapper对象,如果有值可以通过下标取出每个book对象

# query 对象执行前可以调用dicts()方法,返回对象内容为字典格式

# query.tuples() 返回对象内容为元组格式,相当于sqlalchemy,fetchall()

# 其他方法或者属性有需要的可以使用dir()方法和getattr()方法查看属性,以及属性调用后返回值

book_res = await self.application.objects.execute(query.dicts())

pass

async def post(self):

from tornado.escape import json_decode

body = json_decode(self.request.body)

# 增

# 如果参数是字典格式,且key值对应字段名称,可以使用peewee model里面的insert方法

await self.application.objects.execute(Book.insert(body))

# 或者使用封装的create方法,create方法源码还是调用了model类的insert方法

await self.application.objects.create(Book, boo_name=body.get("book"), book_auth=2)

pass

app = tornado.web.Application([

(r"/book", BookHandler)

])

if __name__ == '__main__':

app = tornado.web.Application(urlpaten)

import peewee_async

# 将manager对象绑定到app上

app.objects = peewee_async.Manager(database)

server = httpserver.HTTPServer(app, xheaders=True)

server.listen(80)

tornado.ioloop.IOLoop.current().start()

 类似资料: