在实际生产中,深度学习模型往往需要部署在服务器中,前端通过接口来调用模型推理过程,并获得返回值。
FastAPI是一种快速、高性能的Web框架。本文中使用FastAPI对目标检测模型进行了http封装,前端只需要将图片转为base64传入,生成json格式的检测结果返回。
检测部分使用的opencv dnn调用darknet训练的yolo模型进行检测。
这里封装一个目标检测类,以便之后实例化多个检测器。
import cv2
import sys
import numpy as np
class object_detector:
def __init__(self, model, cfg, classes, inputSize=416):
self.model = model
self.cfg = cfg
self.classes = classes
self.inputSize = inputSize
self.framework = None
self.load_model()
def load_model(self):
if self.model.endswith('weights') and self.cfg.endswith('cfg') and self.classes.endswith('names'):
self.net = cv2.dnn.readNetFromDarknet(self.cfg, self.model)
self.LABELS = open(self.classes).read().strip().split("\n")
self.framework = 'Darknet'
else:
sys.exit('Wrong input for model weights and cfg')
#这里使用CUDA加速,如果没有GPU支持会自动转到CPU推理
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
def predict(self, srcImg):
boxes = []
confidences = []
classIDs = []
json_result = {}
dict_list = []
(H, W) = srcImg.shape[:2]
# Create a 4D blob from a srcImg.
if self.framework == 'Darknet':
blob = cv2.dnn.blobFromImage(srcImg, 1/255.0, (self.inputSize, self.inputSize), swapRB=True, crop=False)
else:
print("Not a Darknet models")
ln = self.net.getLayerNames()
ln = [ln[i[0] - 1] for i in self.net.getUnconnectedOutLayers()]
# Run a model
self.net.setInput(blob)
layerOutputs = self.net.forward(ln)
# 在每层输出上循环
for output in layerOutputs:
# 对每个检测进行循环
for detection in output:
scores = detection[5:]
classID = np.argmax(scores)
confidence = scores[classID]
# 过滤掉那些置信度较小的检测结果
if confidence > 0.02:
# 框后接框的宽度和高度
box = detection[0:4] * np.array([W, H, W, H])
(centerX, centerY, width, height) = box.astype("int")
# 边框的左上角
x = int(centerX - (width / 2))
y = int(centerY - (height / 2))
# 更新检测出来的框
boxes.append([x, y, int(width), int(height)])
confidences.append(float(confidence))
classIDs.append(classID)
# 极大值抑制
idxs = cv2.dnn.NMSBoxes(boxes, confidences, score_threshold=.4, nms_threshold=.3)
if len(idxs) > 0:
for i in idxs.flatten():
(x, y) = (boxes[i][0], boxes[i][1])
(w, h) = (boxes[i][2], boxes[i][3])
data_location = {}
data_result = {}
data_det = {}
data_location["score"] = float(confidences[i])
data_location["left"] = int(x)
data_location["top"] = int(y)
data_location["width"] = int(w)
data_location["height"] = int(h)
data_result["score"] = float(confidences[i])
data_result["label"] = str(self.LABELS[classIDs[i]])
data_det["location"] = data_location
data_det["result"] = data_result
dict_list.append(data_det)
json_result["code"] = 200
json_result["data"] = dict_list
# print(json_result)
return json_result
这里模型加载后,使用了CUDA加速。如果OpenCV不支持CUDA,会自动转入CPU推理。想要支持CUDA的OpenCV,可以尝试用OpenCV和OpenCV-contrib源码编译生成python的cv2库,具体过程暂时没整理。
这里建一个main.py,首先实例化一个检测器,实例化过程中即完成了模型的加载。
然后需要定义接收数据的结构、创建路径等,具体见下面代码。
from fastapi import FastAPI
from pydantic import BaseModel
import uvicorn
import cv2
import random
import os
import base64
import json
import time
import numpy as np
from enum import Enum
#加载前面的检测类
from object_detection import object_detector
app = FastAPI()
def base64toCv(base64_src):
img_b64decode = base64.b64decode(base64_src) # base64解码
img_array = np.fromstring(img_b64decode, np.uint8) # 转换np序列
img_cv = cv2.imdecode(img_array, cv2.COLOR_BGR2RGB) # 转换OpenCV格式
return img_cv
# 将识别的类别加入枚举
class Targets(str, Enum):
det_target1 = "face"
# 定义接收数据的结构
class Item(BaseModel):
base64: str = None #图片base64
target: Targets = None #识别类型
@app.post('/detector')
async def calculate(request_data: Item):
Target = request_data.target
img_base64 = request_data.base64
# 输出检测的信息和调用时间
print("Detection for", Target.value , "! Time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
if Target == "face":
src_img = base64toCv(img_base64)
result = face_detector.predict(src_img)
return result
else:
print("Target parameter error!")
return {"code": 201, "data": []}
if __name__ == '__main__':
# 实例化一个检测器,输入参数分别是weights、cfg、names文件的路径,和模型的输入尺寸。
face_detector = object_detector("./models/face_model/face.weights",
"./models/face_model/face.cfg",
"./models/face_model/face.names",
416)
print('Loaded face model!')
print("Service start!")
uvicorn.run(app=app,
host="0.0.0.0", # 服务器填写0.0.0.0
port=12455,
workers=1)
这里新建一个test.py, 读取一张图片传入接口做检测, 并输出检测结果。
首先运行运行main.py,再打开一个终端运行test.py
import requests
import json
import base64
def test(img_path):
with open(img_path, 'rb') as f:
base64_data = base64.b64encode(f.read())
img = base64_data.decode()
datas = json.dumps({'base64': img, 'target': "face"})
rec = requests.post("http://0.0.0.0:12455/detector", data=datas)
return rec.text
result= test('1.jpg')
print(result)
以上是使用FastAPI封装目标检测模型的一种实现方式。检测器的部分如果不想用opencv,换成pytorch、tensorflow都可以。
代码下载:FastAPI封装目标检测推理过程
如果代码存在什么问题,欢迎大家指出!