由于业务需要对发布者的头像进行鉴黄风控,使用yahoo 开源的nsfw 模型进行部署上线。前期直接使用github开源的模型open_nsfw,在java上部署,但由于不能满足业务需求,需要对模型进行重新训练。本篇主要是关注tensorflow模型的搭建
通过上述开源代码提供的参数open_nsfw-weights.npy,根据每层的name去加载对应的参数。
def get_weights(layer_name, field_name, weights_path="data/open_nsfw-weights.npy"):
weights_o = np.load(weights_path, allow_pickle=True, encoding='latin1').item()
if not layer_name in weights_o:
raise ValueError("No weights for layer named '{}'".format(layer_name))
w = weights_o[layer_name]
if not field_name in w:
raise (ValueError("No entry for field '{}' in layer named '{}'").format(field_name, layer_name))
return w[field_name]
def batch_norm(name, trainable=True):
bn_epsilon = 1e-5
return layers.BatchNormalization(trainable=trainable, epsilon=bn_epsilon
, gamma_initializer=tf.constant_initializer(get_weights(name, "scale"))
, beta_initializer=tf.constant_initializer(get_weights(name, "offset"))
, moving_mean_initializer=tf.constant_initializer(get_weights(name, "mean"))
,moving_variance_initializer=tf.constant_initializer(get_weights(name, "variance"))
, name=name)
def fully_connected(name, num_outputs):
return layers.Dense(
units=num_outputs, name=name,
kernel_initializer=tf.constant_initializer(
get_weights(name, "weights")),
bias_initializer=tf.constant_initializer(
get_weights(name, "biases")))
由于此处需要先对输入数据处理,因此使用类而不是函数,类继承了layers.Layer
class Conv2d(layers.Layer):
def __init__(self, name, filter_depth, kernel_size, stride=1, padding='same', trainable=True):
super(Conv2d, self).__init__()
self.padding = padding
self.kernel_size = kernel_size
self.stride = stride
self.conv2 = layers.Conv2D(filter_depth, kernel_size=(kernel_size, kernel_size)
, strides=(stride, stride), padding="valid"
, activation=None, trainable=trainable, name=name
, kernel_initializer=tf.constant_initializer(
get_weights(name, "weights"))
, bias_initializer=tf.constant_initializer(
get_weights(name, "biases"))
)
def call(self, inputs):
if self.padding.lower() == 'same' and self.kernel_size > 1:
if self.kernel_size > 1:
oh = inputs.shape.as_list()[1]
h = inputs.shape.as_list()[1]
p = int(math.floor(((oh - 1) * self.stride + self.kernel_size - h) // 2))
inputs = tf.pad(inputs, [[0, 0], [p, p], [p, p], [0, 0]])
else:
raise Exception("unsuported kernel size for padding: '{}'".format(self.kernel_size))
return self.conv2(inputs)
class BasicBlock(layers.Layer):
def __init__(self, stage, block, filterdepths, kernel_size=3, stride=2):
super(BasicBlock, self).__init__()
self.filter_depth1, self.filter_depth2, self.filter_depth3 = filterdepths
self.conv_name_base = "conv_stage{}_block{}_branch".format(stage, block)
self.bn_name_base = "bn_stage{}_block{}_branch".format(stage, block)
self.shortcut_name_post = "_stage{}_block{}_proj_shortcut".format(stage, block)
# unit_1
self.conv0 = Conv2d("conv{}".format(self.shortcut_name_post), self.filter_depth3, kernel_size=1, stride=stride,
padding='same')
self.bn0 = batch_norm("bn{}".format(self.shortcut_name_post))
# self.relu0 = layers.Activation('relu')
# 2a
self.conv1 = Conv2d("{}2a".format(self.conv_name_base), self.filter_depth1, kernel_size=1, stride=stride,
padding='same')
self.bn1 = batch_norm("{}2a".format(self.bn_name_base))
self.relu1 = layers.Activation('relu')
# 2b
self.conv2 = Conv2d("{}2b".format(self.conv_name_base), self.filter_depth2, kernel_size=kernel_size, stride=1,
padding='same')
self.bn2 = batch_norm("{}2b".format(self.bn_name_base))
self.relu2 = layers.Activation('relu')
# 2c
self.conv3 = Conv2d("{}2c".format(self.conv_name_base), self.filter_depth3, kernel_size=1, stride=1,
padding='same')
self.bn3 = batch_norm("{}2c".format(self.bn_name_base))
def call(self, inputs, training=None):
# print("input shape", inputs.shape)
# print(self.conv0(inputs).shape)
shortcut = self.bn0(self.conv0(inputs))
x = self.conv1(inputs)
x = self.relu1(self.bn1(x))
x = self.conv2(x)
x = self.relu2(self.bn2(x))
x = self.conv3(x)
x = self.bn3(x)
x = tf.add(x, shortcut)
return tf.nn.relu(x)
class Identity_block_layer(layers.Layer):
def __init__(self, stage, block, filter_depths, kernel_size):
super(Identity_block_layer, self).__init__()
self.filter_depth1, self.filter_depth2, self.filter_depth3 = filter_depths
self.conv_name_base = "conv_stage{}_block{}_branch".format(stage, block)
self.bn_name_base = "bn_stage{}_block{}_branch".format(stage, block)
# 2a
self.conva = Conv2d("{}2a".format(self.conv_name_base), filter_depth=self.filter_depth1, kernel_size=1,
stride=1, padding="same")
self.bna = batch_norm("{}2a".format(self.bn_name_base))
# 2b
self.convb = Conv2d("{}2b".format(self.conv_name_base), filter_depth=self.filter_depth2,
kernel_size=kernel_size, stride=1, padding="same")
self.bnb = batch_norm("{}2b".format(self.bn_name_base))
# 2c
self.convc = Conv2d("{}2c".format(self.conv_name_base), filter_depth=self.filter_depth3, kernel_size=1,
stride=1, padding="same")
self.bnc = batch_norm("{}2c".format(self.bn_name_base))
def call(self, inputs):
x = self.bna(self.conva(inputs))
x = tf.nn.relu(x)
x = self.bnb(self.convb(x))
x = tf.nn.relu(x)
x = self.bnc(self.convc(x))
x = tf.add(x, inputs)
return tf.nn.relu(x)
基于以上layer,搭建ResNet50模型。
函数式API可以方便模型后期部署上线
def getModel():
inputs = keras.Input((224, 224, 3))
x = layers.Lambda(lambda x: tf.pad(x, [[0, 0], [3, 3], [3, 3], [0, 0]], 'CONSTANT'))(inputs)
x = Conv2d("conv_1", filter_depth=64, kernel_size=7, stride=2, padding="valid")(x)
x = batch_norm("bn_1")(x)
x = layers.Activation("relu")(x)
x = layers.MaxPooling2D(pool_size=3, strides=2, padding='same')(x)
# stage0
x = BasicBlock(0, 0, filterdepths=[32, 32, 128], kernel_size=3, stride=1)(x)
x = Identity_block_layer(stage=0, block=1, filter_depths=[32, 32, 128], kernel_size=3)(x)
x = Identity_block_layer(stage=0, block=2, filter_depths=[32, 32, 128], kernel_size=3)(x)
# stage1
x = BasicBlock(1, 0, filterdepths=[64, 64, 256], kernel_size=3, stride=2)(x)
x = Identity_block_layer(stage=1, block=1, filter_depths=[64, 64, 256], kernel_size=3)(x)
x = Identity_block_layer(stage=1, block=2, filter_depths=[64, 64, 256], kernel_size=3)(x)
x = Identity_block_layer(stage=1, block=3, filter_depths=[64, 64, 256], kernel_size=3)(x)
# stage2
x = BasicBlock(2, 0, filterdepths=[128, 128, 512], kernel_size=3, stride=2)(x)
x = Identity_block_layer(stage=2, block=1, filter_depths=[128, 128, 512], kernel_size=3)(x)
x = Identity_block_layer(stage=2, block=2, filter_depths=[128, 128, 512], kernel_size=3)(x)
x = Identity_block_layer(stage=2, block=3, filter_depths=[128, 128, 512], kernel_size=3)(x)
x = Identity_block_layer(stage=2, block=4, filter_depths=[128, 128, 512], kernel_size=3)(x)
x = Identity_block_layer(stage=2, block=5, filter_depths=[128, 128, 512], kernel_size=3)(x)
# stage3
x = BasicBlock(3, 0, filterdepths=[256, 256, 1024], kernel_size=3, stride=2)(x)
x = Identity_block_layer(stage=3, block=1, filter_depths=[256, 256, 1024], kernel_size=3)(x)
x = Identity_block_layer(stage=3, block=2, filter_depths=[256, 256, 1024], kernel_size=3)(x)
x = layers.AveragePooling2D(pool_size=7, strides=1, padding="valid", name="avergepool")(x)
print("averagePooling: ", x)
x = layers.Flatten()(x)
logits = fully_connected(name="fc_nsfw", num_outputs=2)(x)
output = layers.Activation("softmax")(logits)
model = tf.keras.Model(inputs=inputs, outputs=output)
return model
class ResModel(models.Model):
def __init__(self, input_type=1):
super(ResModel, self).__init__()
self.input_type = input_type
self.conv2 = Conv2d("conv_1", filter_depth=64, kernel_size=7, stride=2, padding="valid")
self.bn = batch_norm("bn_1")
self.pool = layers.MaxPooling2D(pool_size=3, strides=2, padding='same')
# stage0
self.conv_bolck_0_0 = BasicBlock(0, 0, filterdepths=[32, 32, 128], kernel_size=3, stride=1)
self.identity_0_1 = Identity_block_layer(stage=0, block=1, filter_depths=[32, 32, 128], kernel_size=3)
self.identity_0_2 = Identity_block_layer(stage=0, block=2, filter_depths=[32, 32, 128], kernel_size=3)
# stage1
self.conv_bolck_1_0 = BasicBlock(1, 0, filterdepths=[64, 64, 256], kernel_size=3, stride=2)
self.identity_1_1 = Identity_block_layer(stage=1, block=1, filter_depths=[64, 64, 256], kernel_size=3)
self.identity_1_2 = Identity_block_layer(stage=1, block=2, filter_depths=[64, 64, 256], kernel_size=3)
self.identity_1_3 = Identity_block_layer(stage=1, block=3, filter_depths=[64, 64, 256], kernel_size=3)
# stage2
self.conv_bolck_2_0 = BasicBlock(2, 0, filterdepths=[128, 128, 512], kernel_size=3, stride=2)
self.identity_2_1 = Identity_block_layer(stage=2, block=1, filter_depths=[128, 128, 512], kernel_size=3)
self.identity_2_2 = Identity_block_layer(stage=2, block=2, filter_depths=[128, 128, 512], kernel_size=3)
self.identity_2_3 = Identity_block_layer(stage=2, block=3, filter_depths=[128, 128, 512], kernel_size=3)
self.identity_2_4 = Identity_block_layer(stage=2, block=4, filter_depths=[128, 128, 512], kernel_size=3)
self.identity_2_5 = Identity_block_layer(stage=2, block=5, filter_depths=[128, 128, 512], kernel_size=3)
# stage3
self.conv_bolck_3_0 = BasicBlock(3, 0, filterdepths=[256, 256, 1024], kernel_size=3, stride=2)
self.identity_3_1 = Identity_block_layer(stage=3, block=1, filter_depths=[256, 256, 1024], kernel_size=3)
self.identity_3_2 = Identity_block_layer(stage=3, block=2, filter_depths=[256, 256, 1024], kernel_size=3)
self.average_pooling2d = layers.AveragePooling2D(pool_size=7, strides=1, padding="valid", name="avergepool")
self.logits = fully_connected(name="fc_nsfw", num_outputs=2)
def call(self, inputs):
# print("before pad : ", self.input_tensor.shape)
x = tf.pad(inputs, [[0, 0], [3, 3], [3, 3], [0, 0]], 'CONSTANT')
# print("pad", x.shape)
x = self.conv2(x)
x = self.bn(x)
x = tf.nn.relu(x)
# print("__conv2d : ", x.shape)
x = self.pool(x)
# print("MaxPool2D : ", x.shape)
# stage0
x = self.conv_bolck_0_0(x)
x = self.identity_0_1(x)
x = self.identity_0_2(x)
# stage1
x = self.conv_bolck_1_0(x)
x = self.identity_1_1(x)
x = self.identity_1_2(x)
x = self.identity_1_3(x)
# stage2
x = self.conv_bolck_2_0(x)
x = self.identity_2_1(x)
x = self.identity_2_2(x)
x = self.identity_2_3(x)
x = self.identity_2_4(x)
x = self.identity_2_5(x)
# stage3
x = self.conv_bolck_3_0(x)
x = self.identity_3_1(x)
x = self.identity_3_2(x)
x = self.average_pooling2d(x)
# print("before reshape", x.shape)
x = tf.reshape(x, shape=(-1, 1024))
x = self.logits(x)
# print("logits: ", x.shape)
x = tf.nn.softmax(x, name="predictions")
return x
其中的image_utils 和open_nsfw-weights均来自于开源的open_nsfw。
import tensorflow as tf
from nsfwmodel import ResModel,getModel
from image_utils import create_tensorflow_image_loader
from image_utils import create_yahoo_image_loader
import numpy as np
import os
IMAGE_LOADER_TENSORFLOW = "tensorflow"
IMAGE_LOADER_YAHOO = "yahoo"
model_path='./nsfwmodel'
IMAGE_DIR=r'./tensorflow-open_nsfw-master/image2'
def findAllFile(base):
for root, ds, fs in os.walk(base):
for f in fs:
if f.endswith('.jpg') or f.endswith('.png'):
# if re.match(r'.*\d.*', f):
fullname = os.path.join(root, f)
yield fullname
def load_image(input_type=1,image_loader="yahoo"):
if input_type == 1:
print('TENSOR...')
if image_loader == IMAGE_LOADER_TENSORFLOW:
print('IMAGE_LOADER_TENSORFLOW...')
fn_load_image = create_tensorflow_image_loader(tf.Session(graph=tf.Graph()))
else:
print('create_yahoo_image_loader')
fn_load_image = create_yahoo_image_loader()
elif input_type == 2:
print('BASE64_JPEG...')
import base64
fn_load_image = lambda filename: np.array([base64.urlsafe_b64encode(open(filename, "rb").read())])
return fn_load_image
def imageToTensor(inputs,input_type=1):
if input_type == 1:
input_tensor = inputs
elif input_type == 2:
from image_utils import load_base64_tensor
input_tensor = load_base64_tensor(inputs)
else:
raise ValueError("invalid input type '{}'".format(input_type))
return input_tensor
if __name__=='__main__':
input_type=1
image_loader= "yahoo"
fn_load_image=load_image(input_type,image_loader)
# model=ResModel(input_type)
model = getModel()
for i in findAllFile(IMAGE_DIR):
print('predict for: ' + i)
image = fn_load_image(i)
imageTensor=imageToTensor(image, input_type)
print(model(imageTensor))
经试验此模型结果与原版tensorflow1的误差在5%左右,略有出入。但可以基于此tensorflow2模型,在此基础上进行微调以适应自己场景。