安装
pip install django-ninja
创建django项目
python django-admin startproject myproject
单应用项目
应用创建应用urls.py同目录下的api.py(也可以直接在view.py中实现)
from ninja import NinjaAPI
api = NinjiaAPI()
@api.get("/hello")
def hello(request):
dosomething
在url.py配置url路由
from django.contrib import admin
from django.urls import path
from .api import api
urlpatterns = [
path("admin/", admin.site.urls),
path("api/", api.urls)
]
请求方法选择。如果一个方法一个处理函数,直接使用@api.get(path)
;如果是多个方法一个处理函数,则使用@api.api_operation(method, path)
。其中,method用列表表示。
当有多个应用时,在每个应用中的创建一个api.py模块(或者直接在views.py模块)中写各自的路由.
from ninja import Router
from .models import xxx
router = Router()
@router.get("/")
def list_events(request):
return [
{"id": elem.id, "title": elem.title}
for elem in xxx.objects.all()
]
在项目文件夹urls.py中实现多个应用的router注册
from ninja import NinjaAPI
from xxx.api import router as xxx_router
from yyy.api import router as yyy_router
api = NinjaAPI()
api.add_router("/xxx/", xxx_router)
api.add_router("/yyy/", yyy_router)
urlpatterns = [
path("admin/", admin.site.urls),
path("api/v1/", api.urls)
]
api.add_router("/xxx/", xxx_router, auth=BasicAuth())
# or we can write as this
# router = Router(auth=BasicAuth())
可以使用tags参数将标签应用于路由器声明的操作。
api.add_router("/xxx/", xxx_router, tags=["xxx"])
# or we can write as this
# router = Router(tags=["xxx"])
from django.contrib import admin
from django.urls import path
from ninja import NinjaAPI, Router
API = NinjaAPI()
first_router = Router()
second_router = Router()
third_router = Router()
@api.get("/add")
def add(request, a: int, b: int):
return {"result": a+ b}
@first_router.get("/add")
def add(request, a: int, b: int):
return {"result": a+ b}
@second_router.get("/add")
def add(request, a: int, b: int):
return {"result": a+ b}
@third_router.get("/add")
def add(request, a: int, b: int):
return {"result": a+ b}
second_router.add_router("l3", third_router)
first_router.add_router("l2", second_router)
api.add_router("l1", first_router)
urlpatterns = [
path("admin/", admin.site.urls),
path("api/", api.urls),
]
# 以上路由可有以下路径
# /api/add
# /api/l1/add
# /api/l1/l2/add
# /api/l1/l2/l3/add
所有的路径参数都会按照给定的类型自动转化,如果转化失败,则报错。
常规python格式化字符串形式
# 不指定参数类型,默认为字符串
@api.get("/items/{item_id}")
def read_item(request, item_id):
return {"item_id": item_id}
# 指定参数类型
@api.get("/items/{item_id}")
def read_item(request, item_id: int):
return {"item_id": item_id}
django路径参数转换器
@api.get("/items/{int:item_id}")
def read_item(request, item_id):
return {"item_id": item_id}
注:{int:item_id}之间不允许有空格,有空格会报错。
使用Schema
import datetime
from ninja import Schema, Path
class PathSchema(Schema):
year: int
month: int
day: int
def value(self):
return datetime.date(self.year, self.month, self.day)
@api.get("/events/{year}/{month}/{day}")
def events(request, date: PathSchema = Path(...))
请求参数分为两种:位置参数和可选参数。
位置参数。不给定参数类型,则默认为字符串类型。
@api.get("/weapons")
def list_weapons(request, limit, offset):
# type(limit) == str
# type(offset) == str
可选参数。通过给定参数默认值实现。
@api.get("/weapons")
def list_weapons(request, limit: int = 10, offset: int = 0):
return weapons[offset:offset+limit]
位置参数和可选参数。
@api.get("/weapons/search")
def search_weapons(request, keyword: str, offset: int = 0):
results = [w for w in weapons if keyword in w.lower()]
return results[offset: offset+10]
使用Schema
import datetime
from typing import List
from pydantic import Field
from ninja import Query, Schema
class Filters(Schema):
limit: int = 100
offset: int = None
query: str = None
category__in: List[str] = Field(None, alias="categories")
@api.get("/filter")
def events(request, filters: Filters = Query(...)):
return {"filters": filters.dict()}
使用Schema
from ninja import Schema
class Item(Schema):
name: str
description: str = None
price: float
quantity: int
@api.post("/items")
def create(request, item: Item):
return item
from ninja import Schema
class Item(Schema):
name: str
description: str = None
price: float
quantity: int
@api.post("/items/{item_id}")
def update(request, item_id: int, item: Item, q: str):
return {"item_id": item_id, "item": item.dict(), "q": q}
三者同时出现时,解析如下:
Form数据作为参数
from ninja import Form
@api.post("/login")
def login(request, username: str = Form(...), password: str = Form(...)):
return {"username": username, "password": password}
使用Schema
from ninja import Schema, Form
class Item(Schema):
name: str
description: str = None
price: float
quantity: int
@api.post("/item")
def create(request, item: Item = Form(...)):
return item
路径参数,请求参数和表单
from ninja import Schema, Form
class Item(Schema):
name: str
description: str = None
price: float
quantity: int
@api.post("/items/{item_id}")
def update(request, item_id: int, q: str, item: Item=Form(...)):
return {"item_id": item_id, "item": item, "q": q}
将空表单值设置为默认值
from ninja import Schema, Form
from pydantic.fields import ModelField
from typing import Generic, TypeVar
PydanticField = TypeVar("PydanticField")
class EmptyStrToDefault(Generic[PydanticField]):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, value: PydanticField, field: ModelField) -> PydanticField:
if value == "":
return field.default
return value
class Item(Schema):
name: str
description: str = None
price: EmptyStrToDefault[float] = 0.0
quantity: EmptyStrToDefault[int] = 0
in_stock: EmptyStrToDefault[bool] = True
@api.post("/item-blank-default")
def update(request, item: Item=Form(...)):
return item.dict()
单个文件上传
from ninja import File
from ninja.files import UploadedFile
@api.post("/upload")
def upload(request, file: UploadedFile = File(...)):
data = file.read()
return {"name": file.name, "len": len(data)}
多个文件上传
from typing import List
from ninja import File
from ninja.files import UploadedFile
@api.post("/upload-many")
def upload_many(request, files: List[UploadedFile] = File(...)):
return [f.name for f in files]
上传的文件属性和方法和django中相同,主要包括以下:
一般在应用中创建一个schema.py存储。
返回体为简单对象
from ninja import Schema
class UserIn(Schema):
username: str
password: str
class UserOut(Schema):
id: int
username: str
@api.post("/users/", response=UserOut)
def create_user(request, data: UserIn):
user = User(username=data.username)
user.set_password(data.pasword)
user.save()
return user
注:响应提Schema会从限制返回数据,仅仅返回定义在Schema中的数据。
返回体为嵌套对象
# model.py
from django.db import models
class Task(models.Model):
title = models.CharField(max_length=200)
is_completed = models.BooleanFied(default=False)
owner = models.ForeignKey("auth.User", null=True, blank=True)
# api.py
from typing import List
from ninja import Schema
class UserSchema(Schema):
id: int
first_name: str
last_name: str
class TaskSchema(Schema):
di: int
title: str
is_completed: bool
owner: UserSchema = None # None -> to mark it as optional
@api.get("/tasks", response=List[TaskSchema])
def tasks(request):
queryset = Task.objects.all()
return list(queryset) # or return queryset
返回文件对象或图片
文件或图片的schema均返回地址。
# model.py
class Picture(models.Model):
title = models.CharField(max_length=100)
image = models.ImageField(upload_to="images")
# api.py
class PictureSchema(Schema):
title: str
image: str
返回状态码和数据
from ninja import Schema
class Token(Schema):
token: str
expires: date
class Message(Schema):
message: str
@api.post("/login", response={200: Token, 401:Message, 402:Message})
def login(request, payload: Auth):
if auth_not_valid:
return 401, {"message": "Unauthorized"}
if negative_balance:
return 402, {"message": "xxxx"}
return 200, {"token": xx, ....}
当返回状态码和数据一致时,可以使用4xx
from ninja.response import codes_1xx
from ninja.response import codes_2xx
from ninja.response import codes_3xx
from ninja.response import codes_4xx
from ninja.response import codes_5xx
@api.post('/login', response={200: Token, codes_4xx: Message})
def login(request, payload: Auth):
if auth_not_valid:
return 401, {'message': 'Unauthorized'}
if negative_balance:
return 402, {'message': 'Insufficient balance amount. Please proceed to a payment page.'}
return 200, {'token': xxx, ...}
自我套用
class Organization(Schema):
title: str
part_of: 'Organization' = None
Organization.update_forward_refs() # this is important
选择包含字段
from django.contrib.auth.models import User
from ninjia import ModelSchema
class UserSchema(ModelSchema):
class Config:
model = User
model_fields = ["id", "username", "first_name", "last_name"]
选择不包含字段
from django.contrib.auth.models import User
from ninja import ModelSchema
class UserSchema(ModelSchema):
class Config:
model = User
model_exclude = ["password", "last_login", "user_permissions"]
覆盖字段(修改某些字段注释或者添加新字段)
class GroupSchema(ModelSchema):
class Config:
model = Group
model_fields = ["id", "name"]
class UserSchema(ModelSchema):
groups: List[GroupSchema] = []
class Config:
model = User
model_fields = ["id", "username", "first_name", "last_name"]
使用create_schema
语法
def create_schema(
model, # django model
name = "", # name for the genarated class ,if empty model name is used
depth = 0, # if>0 schema will be also created for nested ForeignKeys and Many2Many (with the provided depth of lookup)
fields: list[str] = None,
exclude: list[str] = None,
custom_fields: list[typle[str, Any, Any]] = None # if passed - this will override default field types (or add new fields)
)
使用model参数
from django.contrib.auth.models import User
from ninja.orm import create_schema
UserSchema = create_schema(User)
注: 仅仅使用model参数会将所有的User信息返回,容易暴露敏感数据。
使用fields参数
UserSchema = create_schema(User, fields=["id", "username"])
使用exclude参数
UserSchema = create_schema(User, exclude=["password", "last_login", ...])
使用depth参数
UserSchema = create_schema(User, depth=1, fields=["username", "groups"])
# will create the following schema:
# class UserSchema(Schema):
# username: str
# groups: List[Group]
自定义schema
参考官方文档:
- https://django-ninja.rest-framework.com/tutorial/config-pydantic/
一般建议把认证相关可调用对象放在util包中。
使用django验证
from ninja import NinjaAPI
from ninja.security import django_auth
api = NinjiaAPI(csrf=True)
@api.get("/pets", auth=django_auth)
def pets(request):
return "Authenticated user {}".format(request.auth)
访问"/pets"路由时,会使用Django会话身份验证(默认是基于cookie),验证通过调用对应视图函数,否则返回HTTP-401错误。
全局认证
from ninja import NinjaAPI, Form
from ninja.security import HttpBearer
class GlobalAuth(HttpBearer):
def authenticate(self, request, token):
if token == "supersecret":
return token
api = NinjaAPI(auth=GlobalAuth())
在全局认证时,如果某些函数不需要全局认证,则将该路由路径中的auth设置为None。
from ninja import NinjaAPI, Form
from ninja.security import HttpBearer
class GlobalAuth(HttpBearer):
def authenticate(self ,request, token):
if token == "supersecret":
return token
api = NinjaAPI(auth=GlobalAuth())
@api.post("/token", auth=None) # overriding global auth
def get_token(request, username: str = Form(...), password: str = Form(...)):
if username == "admin" and password == "password":
return {"token": "supersecret"}
路由器认证
api.add_router("/events", events_router, auth=BasicAuth())
# or we can write as this
# router = Router(auth=BasicAuth())
自定义功能
request.auth
。请求参数中带有api_key验证信息
# GET /something?api_key=abcdefg12345
from ninja.security import APIKeyQuery
from someapp.models import Client
class ApiKey(APIKeyQuery):
param_name = "api_key"
def authenticate(self, request, key):
try:
return Client.objects.get(key=key)
except Client.DoesNotExist:
pass
@api.get("/apikey", auth=ApiKey())
def apikey(request):
assert isinstance(request.auth, Client)
return "Hello {}".format(request.auth)
param_name
是江北检查的GET参数的部分。如果未设置,将使用默认值"key"。
请求头中带有X-API-KEY验证信息
# GET /something HTTP/1.1
# X-API-Key: abcdef12345
from ninja.security import APIKeyHeader
class ApiKey(APIKeyHeader):
param_name = "X-API-Key"
def authenticate(self, request, key):
if key == "supersecret":
return key
@api.get("/headerKey", auth=ApiKey())
def apikey(request):
return "Token = {}".format(request.auth)
cookie中带有验证
# GET /something HTTP/1.1
# Cookie: X-API-KEY=abcdef12345
from ninja.security import APIKeyCookie
class CookieKey(APIKeyCookie):
def authenticate(self, request, key):
if key = "supersecret":
return key
@api.get("/cookiekey", auth=CookieKey())
def apikey(request):
return "Token = {}".format(request.auth)
HTTP JWT 验证
from ninja.security import HttpBearer
class AuthBearer(HttpBearer):
def authenticate(self, request, token):
if token == "supersecret":
return token
@api.get("/bearer", auth=AuthBearer())
def bearer(request):
return {"token": request.auth}
HTTP基本身份验证
from ninja.security import HttpBasicAuth
class BasicAuth(HttpBasicAuth):
def authenticate(self, request, username, password):
if username == "admin" and password == "secret":
return username
@api.get("/basic", auth=BasicAuth())
def basic(request):
return {"httpuser": request.auth}
多个验证器
from ninja.security import APIKeyQuery, APIKeyHeader
class AuthCheck:
def authenticate(self, request, key):
if key == "supersecret":
return key
class QueryKey(AuthCheck, APIKeyQuery):
pass
class HeaderKey(AuthCheck, APIKeyHeader):
pass
@api.get("/multiple", auth=[QueryKey(), HeaderKey()])
def multiple(request):
return "Token = {}".format(request.auth)
改变出现错误时返回值(自定义异常)
from ninja import NinjaAPI
from ninja.security import HttpBearer
api = NinjaAPI()
class InvalidToken(Exception):
pass
@api.exception_handler(InvalidToken)
def on_invalid_token(request, exc):
return api.create_response(request, {"detail": "Invalid token supplid"}, status=401)
class AuthBearer(HttpBearer):
def authenticate(self, request, token):
if token == "supersecret":
return token
raise InvalidToken
@api.get("/bearer", auth=AuthBearer())
def bearer(request):
return {"token": request.auth}
OpenAPI上分组依据,默认按router分组。
使用tags参数(list[str])对API操作进行分组
@api.get("/orders/", tags=["orders"])
def create_order(request, order: Order):
return {"success": True}
路由器标签
api.add_router("/xxx/", xxx_router, tags=['xxx'])
# or we can write like this
# router = Router(tags=["xxx"])
OpenAPI上操作的可读名称,默认为视图函数名称大写生成。
使用summary参数进行修改。
@api.get("/hello", summary="Say Hello")
def hello(request, name: str):
return {"hello": name}
OpenAPI上显示操作的的说明信息。
@api.post("/orders", description="Create an order and updates stock")
def create_order(request, order: Order):
return {"success": True}
注:当需要提供很长的多行描述时,可以使用 Pythondocstrings
进行函数定义:
OpenAPIoperationId
是一个可选的唯一字符串,用于标识操作。如果提供,这些 ID 在您的 API 中描述的所有操作中必须是唯一的。
默认情况下,Django Ninja将其设置为module name
+ function name
。
每个操作单独设置。
@api.post("/tasks", operation_id="create_task")
def new_task(request):
...
覆盖全局行为。
from ninja import NinjaAPI
class MySuperApi(NinjaAPI):
def get_openapi_operation_id(self, operation):
# here you can access operation ( .path , .view_func, etc)
return ...
api = MySuperApi()
@api.get()
...
将操作标记为已弃用。
使用deprecated参数。
@api.post("/make-order", deprecated=True)
def xxx_old_method(request, order: Order):
return {"success": True}
从OpenAPI模式中排除某些操作。
使用include_in_schema参数。
@api.poat("/hidden", include_in_schema=False)
def xxx_hiden_operation(request):
pass
允许设置api端点url名称(使用django路径命名)。
@api.post("/tasks", url_name="tasks")
def xxx_operation(request):
pass
# then you can get the url with
reverse("api-1.0.0: tasks")
使用Django Ninja,可以轻松地从单个 Django 项目运行多个 API 版本。
# api_v1.py
from ninja import NinjaAPI
api = NinjaAPI(version="1.0.0")
@api.get("/hello")
def hello(request):
return {"message": "Hello form v1"}
# api_v2.py
from ninja import NinjaAPI
api = NinjaAPI(version="2.0.0")
@api.get("/hello")
def hello(reqeust):
return {"message": "Hello from v2"}
# urls.py
from api_v1 import api as api_v1
from api_v2 import api as api_v2
urlpatterns = [
...
path("api/v1/", api_v1.urls),
path("api/v2/", api_v2.urls),
]
...
api = NinjaAPI(auth=token_auth, urls_namespace="public_api")
...
api_private = NinjaAPI(auth=session_auth, urls_namespace="pricate_api")
...
urlpatterns = [
...
path("api/", api.urls),
path("internal-api/", api_private.urls),
]
在大多数情况下,REST API 的默认内容类型是 JSON,但如果您需要使用其他内容类型(如 YAML、XML、CSV)或使用更快的 JSON 解析器,Django Ninja提供了parser
配置。
import yaml
from typing import List
from ninja import NinjaAPI, Schema
from ninja.parser import Parser
class MyYamlParser(Parser):
def parse_body(self, request):
return yaml.safe_load(request.body)
api = NinjaAPI(parser=MyYamlParser())
class Payload(Schema):
ints: List[int]
string: str
f: float
@api.post("/yaml")
def operation(request, payload: Payload):
return payload.dict()
import orjson
from ninja import NinjaAPI
from ninja.parser import Parser
class ORJSONParser(Parser):
def parse_body(self, request):
return orjson.loads(request.body)
api = NinjaAPI(parser=ORJSONParser())
REST API 最常见的响应类型通常是JSON。Django Ninja还支持自定义渲染器,这可以让您灵活地设计自己的媒体类型
from ninja import NinjaAPI
from ninja.renderers import BaseRender
class MyRenderer(BaseRender):
media_type = "text/plain"
def render(self, request, data, *, response_status):
return ... # your serialization here
api = NinjaAPI(renderer=MyRenderer())
orjson是一个快速、准确的 Python JSON 库。它作为 JSON 最快的 Python 库进行基准测试,并且比标准json
库或其他第三方库更准确。它还本机序列化数据类、日期时间、numpy 和 UUID 实例。
import orjson
from ninja import NinjaAPI
from ninja.renderers import BaseRender
class ORJSONRenderer(BaseRender):
media_type = "application/json"
def render(self, request, data, *, status_code):
return orjson.dumps(data)
api = NinjaAPI(renderer=ORJSONRenderer())
将所有响应输出为XML的渲染器。
from io import StringIO
from django.utils.encoding import force_str
from django.utils.xmlutils import SimpleXMLGenerator
from ninja import NinjaAPI
from ninja.renderers import BaseRenderer
class XMLRenderer(BaserRenderer):
media_type = "text/xml"
def render(self, request, data, *, status_code):
stream = StringIO()
xml = SimpleXMLGenerator(stream, "utf-8")
xml.startDocument()
xml.startElement("data", {})
self._to_xml(xml, data)
xml.endElement("data")
xml.endDocument()
return stream.getvalue()
def _to_xml(self, xml, data):
if isinstance(data, (list, tuple)):
for item in data:
xml.startElement("item", {})
self._to_xml(xml, item)
xml.endElement("item")
elif isinstance(data, dict):
for key, value in data.items():
xml.startElement(key, {})
self._to_xml(xml, value)
xml.endElement(key)
elif data is None:
pass
else:
xml.characters(force_str(data))
api = NinjaAPI(renderer=XMLRenderer())
定义一些异常(或使用现有的)
使用api.exception_handler装饰器
api = NinjaAPI()
class ServiceUnavailableError(Exception):
pass
@api.exception_handler(ServiceUnvailableError)
def service_unvailable(request, exc):
return api.create_response(
request,
{"message": "Please retry later"},
status=503
)
@api.get("/service")
def some_operation(request):
if random.choice([True, False]):
raise ServiceUnavailableError()
return {"message": "Hello"}
默认初始化异常
覆盖默认处理程序
from ninja.errors import ValidationError
...
@api.exception_handler(ValidationError)
def validation_errors(request, exc):
return HttpResponse("Invalid input", status_code=422)
from ninja.errors import HttpError
@api.get("/xxx/resource")
def xxx_operation(request):
if True:
raise HttpError(503, "Service Unavailable. please retry later.")
默认情况下,Django Ninja对所有操作都关闭了CSRF 。要打开它,您需要使用csrf
NinjaAPI 类的参数:
from ninja import NinjaAPI
api = NinjaAPI(csrf=True)
如果这样做,会报错。
from ninja import NinjaAPI
from ninja.security import django_auth
api = NinjaAPI(auth=django_auth)
必须启用csrf检查,基于cokkie的身份验证才安全。
from ninja import NinjaAPI
from ninja.security import django_auth
api = NinjaAPI(auth=django_auth, csrf=True)
import time
@api.get("/say-after")
async def say_after(request, delay: int, word: str):
await asyncio.sleep(delay)
return {"saying": word}
注意:要运行此代码,必须使用Uvicorn或Daphne这样的ASGI服务器。
使用Uvicorn
安装
pip install uvicorn
启动服务器
uvicorn your_project.asgi.application --reload
项目中可以同时使用同步和异步操作,Django Ninja会指自动路由。
@api.get("/sya-sync")
def say_after_sync(request, delay: int, word: str):
time.sleep(delay)
return {"saying": word}
@api.get("/say_async")
async def say_after_async(request, delay: int, word: str):
await asyncio.sleep(delay)
return {"saying": word}
from ninja import NinjaAPI
from elasticsearch import AsyncElasticsearch
api = NinjaAPI()
es = AsyncElasticsearch()
@api.get("/search")
async def search(request, q: str):
resp = await es.search(
index = "document",
body={"query": {"query_string": {"query": q}}},
size=20
)
return resp["hits"]
目前,(2020 年 7 月)Django 的某些关键部分无法在异步环境中安全运行,因为它们具有无法感知协程的全局状态。Django 的这些部分被归类为“async-unsafe”,并且在异步环境中不会被执行。ORM是主要示例,但还有其他部分也以这种方式受到保护。
如果直接用异步操作ORM,则会报错。
# 会报错
@api.get("/blog/{post_id}")
async def search(request, post_id: int):
blog = Blog.objects.get(pk=post_id)
...
使用装饰器,将同步转化为异步
from asgiref.sync import sync_to_async
@sync_to_async
def get_blog(post_id):
return Blog.objects.get(pk=post_id)
@api.get("/blog/{post_id}")
async def search(request, post_id: int):
blog = await get_blog(post_id)
...
不适用装饰器
@api.get("/blog/{post_id}")
async def search(request, post_id: int):
blog = await sync_to_async(Blog.objects.get)(pk=post_id)
不会理解执行
all_blogs = await sync_to_async(Blog.objects.all)()
会立即执行
all_blogs = await sync_to_async(list)(Blog.objects.all())
参考文献: