更改sqlite为mysql
from tortoise import Tortoise
import asyncio
async def init():
user = 'root'
password = '123456'
db_name = 'test'
await Tortoise.init(
#指定mysql信息
db_url=f'mysql://{user}:{password}@127.0.0.1:3306/{db_name}',
#指定models
modules={'models': ['sanic_bp.models']}
)
#按照模型生成表
# await Tortoise.generate_schemas()
asyncio.run(init())
在sanic中使用:
from sanic import Sanic
from tortoise import Tortoise
app = Sanic(__name__)
@app.listener('after_server_start')
async def notify_server_started(app, loop):
print('sanic sanic服务启动后建立mysql连接')
#实例化mysql连接
await Tortoise.init(
db_url='mysql://root:123456@127.0.0.1:3306/test?maxsize=50&minsize=3',
modules={'models': ['sanic_bp.models']}
)
基础使用
from tortoise.contrib.sanic import register_tortoise
register_tortoise(
app, db_url="sqlite://:memory:", modules={"models": ["models"]}, generate_schemas=True
)
sanic 和 fastapi中使用
tortoise-orm 连接多个库
async def run():
await Tortoise.init(
{
#这里面指定两个不同的数据库,虽然它填写的都是一样的
"connections": {
"first": {
"engine": "tortoise.backends.sqlite",
"credentials": {"file_path": "example.sqlite3"},
},
"second": {
"engine": "tortoise.backends.sqlite",
"credentials": {"file_path": "example1.sqlite3"},
},
},
#可以指定多个不同的models, 嗯,它指定的也是一样的
"apps": {
"tournaments": {"models": ["__main__"], "default_connection": "first"},
"events": {"models": ["__main__"], "default_connection": "second"},
},
}
)
#await Tortoise.generate_schemas()
#选取数据库
client = Tortoise.get_connection("first")
#选取 models
second_client = Tortoise.get_connection("second")
#然后这下面就可以进行orm操作了
增删改查
from typing import List
from fastapi import APIRouter, HTTPException
from tortoise.contrib.fastapi import HTTPNotFoundError
from schemas.MaintainedData import MaintainedData_Pydantic, MaintainedDataIn_Pydantic
from models.MaintainedData import MaintainedData
from schemas.basic import Response404, Response200
data = APIRouter(tags=['数据模块'])
@data.get("/", summary="数据列表", response_model=List[MaintainedData_Pydantic])
async def data_list(order:str,limit: int = 10, page: int = 1):
# limit 是显示的条数 ,page是页数
skip = (page - 1) * limit
# select * from movie limit offset,limit
# select * from movie limit 0,10
return await MaintainedData_Pydantic.from_queryset(MaintainedData.all().offset(skip).limit(limit))
@data.post("/data", summary="新增数据", )
async def add_data(md_data: MaintainedData_Pydantic):
movie_obj = await MaintainedData.create(**md_data.dict(exclude_unset=True))
# movie_obj = await Movie.create(name="",year="",xx="")
return await MaintainedData_Pydantic.from_tortoise_orm(movie_obj)
@data.put("/data/{data_id}", summary="编辑数据", responses={404: {"model": HTTPNotFoundError}})
async def update_data(site_id: str, md_data: MaintainedDataIn_Pydantic):
updated_count = await MaintainedData.filter(site_id=site_id).update(**md_data.dict(exclude_unset=True))
print(updated_count)
if not updated_count:
return Response404(msg=f"data {site_id} not found")
return Response200(data=await MaintainedData_Pydantic.from_queryset_single(MaintainedData.get(site_id=site_id)))
@data.get("/data/{data_id}", summary="查找数据", response_model=MaintainedData_Pydantic,
responses={404: {"model": HTTPNotFoundError}})
async def get_user(site_id: str):
return await MaintainedData_Pydantic.from_queryset_single(MaintainedData.get(site_id=site_id))
@data.delete("/data/{data_id}", summary="删除数据", responses={404: {"model": HTTPNotFoundError}})
async def del_movie(site_id: str):
deleted_count = await MaintainedData.filter(site_id=site_id).delete()
if not deleted_count:
raise HTTPException(status_code=404, detail=f"data {site_id} not found")
return HTTPException(status_code=200, detail=f"Deleted Movie {site_id}")
models.py
from datetime import datetime
from typing import Optional, Iterable
from tortoise import fields, models, BaseDBAsyncClient
class MaintainedData(models.Model):
"""
The User model
"""
id = fields.SmallIntField(pk=True, index=True, description="ID", )
site_id = fields.CharField(max_length=20, unique=True, description="网站ID")
site_name = fields.CharField(max_length=128, null=False, description="网站名称")
site_type = fields.CharField(max_length=16, null=False, description="网站类型")
description = fields.CharField(max_length=255, default="",description="网站备注", )
site_path_name = fields.CharField(max_length=128, null=True, description="目录名称")
site_path_url = fields.CharField(max_length=128, null=True, description="目录链接")
status = fields.CharField(max_length=128, null=True, description="脚本状态")
run_computer = fields.CharField(max_length=255, null=True, description="运行电脑")
run_directory = fields.CharField(max_length=255, null=True, description="运行目录")
crawling_time :Optional[datetime]= fields.DatetimeField(auto_add=True, description="运行时间")
err_message = fields.CharField(max_length=255, null=True, description="错误信息")
order by
db_Secc = MaintainedData.all().order_by("crawling_time")
Eum 枚举
class order_byEnum(str, Enum):
pear = ""
banana = '-'
@data.get("/", summary="数据列表", response_model=List[MaintainedData_Pydantic])
async def data_list(
order_by: order_byEnum = "",
limit: int = 10,
page: int = 1):
# limit 是显示的条数 ,page是页数
skip = (page - 1) * limit
return await MaintainedData_Pydantic.from_queryset(MaintainedData.all().offset(
skip).limit(
limit).order_by(
f"{order_by}crawling_time"))