当前位置: 首页 > 工具软件 > tf-coreml > 使用案例 >

如何将tensorflow mobilenet v1 coco 转成 coreml 模型 ?

史涵育
2023-12-01

    好久好久没有写博客了,今天记录下前段时间做一个项目遇到的问题和解决办法。之前做一个ios app,需要将训练好的 mobilenetv1 ssd 模型部署到 app上,所以研究了下模型转换(说实话mobilenet v1 ssd coco 这个模型略微坑爹,转tensorflow lite 时也遇到诸多问题,这个有机会之后讲)。

    要点时一定要根据自己的 model.config文件,调整输入维度而且由于在box decoding 部分自定义了一些层所以无法直接愉快的转换,需要自己实现,这里参考了很多资料。

    上代码(由于项目比较急所以代码写得比较烂...... ;) )

import os, sys, zipfile
from os.path import dirname
import numpy as np
import tensorflow as tf
from tensorflow.core.framework import graph_pb2
import coremltools
import simple_label_map_util as lab_util  #use to convert the label map prototxt to dict
num_classes=1 #how many classes your model detected base on your label map
num_anchors=5118 #for 512*512 image,you also can use get_anchors() (below) function get the  anchors tensor (tensor name: Concatenate/concat:0)
input_width=512 #there our input image is 512*512  depend on the model.config (default is 300*300)
input_height=512

original=False
tf_model_path = "frozen_inference_graph.pb"
tf_labelmap_path="tf_label_map.pbtxt"
export_extractor_path="extracted_model.pb"
coremodel_save_path="coremodel_and_bias.mlmodel"
coreml_float16_save_path="coremlfloat16_adn_bias.mlmodel"
pipline_model_save_path="pipline.mlmodel"
input_tensor_shapes = {"Preprocessor/sub:0":[1,input_height,input_width,3]} # batch size is 1
output_tensor_names = ['concat:0', 'concat_1:0']

def get_lab_map_display_name(labmap_path):
  lmap_dict=lab_util.get_label_map_dict(label_map_path=labmap_path)
  return sorted(lmap_dict.keys(),key=lambda x: lmap_dict[x])


with open(tf_model_path, 'rb') as f:
  serialized = f.read()
tf.reset_default_graph()
original_gdef = tf.GraphDef()
original_gdef.ParseFromString(serialized)

with tf.Graph().as_default() as g:
  tf.import_graph_def(original_gdef, name='')

print"read tf model ok"

from tensorflow.python.tools import strip_unused_lib
from tensorflow.python.framework import dtypes
from tensorflow.python.platform import gfile

input_node_names = ['Preprocessor/sub']
output_node_names = ['concat', 'concat_1']
gdef = strip_unused_lib.strip_unused(
        input_graph_def = original_gdef,
        input_node_names = input_node_names,
        output_node_names = output_node_names,
        placeholder_type_enum = dtypes.float32.as_datatype_enum)
# Save the feature extractor to an output file
frozen_model_file = export_extractor_path
with gfile.GFile(frozen_model_file, "wb") as f:
  f.write(gdef.SerializeToString())

print "save extractor ok"

# Now we have a TF model ready to be converted to CoreML
import tfcoreml
# Supply a dictionary of input tensors' name and shape (with # batch axis)

# Output CoreML model path
coreml_model_file =coremodel_save_path
# The TF model's ouput tensor name


# Call the converter. This may take a while
if original:
  coreml_model = tfcoreml.convert(
          tf_model_path=frozen_model_file,
          mlmodel_path=coreml_model_file,
          input_name_shape_dict=input_tensor_shapes,
          output_feature_names=output_tensor_names)
else:
  coreml_model = tfcoreml.convert(
    tf_model_path=frozen_model_file,
    mlmodel_path=coreml_model_file,
    input_name_shape_dict=input_tensor_shapes,
    image_input_names=input_tensor_shapes.keys()[0],
    output_feature_names=output_tensor_names,
    image_scale=2. / 255.,
    red_bias=-1.0,
    green_bias=-1.0,
    blue_bias=-1.0
  )

print "convert to tf as a extractor success"

spec = coreml_model.get_spec()

# Rename the inputs and outputs to something more readable.
spec.description.input[0].name = "image"
spec.description.input[0].shortDescription = "Input image"
spec.description.output[0].name = "scores"
spec.description.output[0].shortDescription = "Predicted class scores for each bounding box"
spec.description.output[1].name = "boxes"
spec.description.output[1].shortDescription = "Predicted coordinates for each bounding box"

input_mlmodel = input_tensor_shapes.keys()[0].replace(":", "__").replace("/", "__")
class_output_mlmodel = output_tensor_names[1].replace(":", "__").replace("/", "__")
bbox_output_mlmodel = output_tensor_names[0].replace(":", "__").replace("/", "__")

for i in range(len(spec.neuralNetwork.layers)):
    if spec.neuralNetwork.layers[i].input[0] == input_mlmodel:
        spec.neuralNetwork.layers[i].input[0] = "image"
    if spec.neuralNetwork.layers[i].output[0] == class_output_mlmodel:
        spec.neuralNetwork.layers[i].output[0] = "scores"
    if spec.neuralNetwork.layers[i].output[0] == bbox_output_mlmodel:
        spec.neuralNetwork.layers[i].output[0] = "boxes"

spec.neuralNetwork.preprocessing[0].featureName = "image"

# For some reason the output shape of the "scores" output is not filled in.
spec.description.output[0].type.multiArrayType.shape.append(num_classes + 1)
spec.description.output[0].type.multiArrayType.shape.append(num_anchors)

# And the "boxes" output shape is (4, 1917, 1) so get rid of that last one.
del spec.description.output[1].type.multiArrayType.shape[-1]

# Convert weights to 16-bit floats to make the model smaller.
spec = coremltools.utils.convert_neural_network_spec_weights_to_fp16(spec)

coreml_model_path=coreml_float16_save_path
# Create a new MLModel from the modified spec and save it.
ssd_model = coremltools.models.MLModel(spec)
ssd_model.save(coreml_model_path)

print "float16 ssd extractor saved"


def get_anchors(sess, tensor_name):
  """
  Computes the list of anchor boxes by sending a fake image through the graph.
  Outputs an array of size (4, num_anchors) where each element is an anchor box
  given as [ycenter, xcenter, height, width] in normalized coordinates.
  """
  image_tensor = sess.graph.get_tensor_by_name("image_tensor:0")
  box_corners_tensor = sess.graph.get_tensor_by_name(tensor_name)
  box_corners = sess.run(box_corners_tensor, feed_dict={image_tensor: np.zeros((1, input_height, input_width, 3))})

  # The TensorFlow graph gives each anchor box as [ymin, xmin, ymax, xmax].
  # Convert these min/max values to a center coordinate, width and height.
  ymin, xmin, ymax, xmax = np.transpose(box_corners)
  width = xmax - xmin
  height = ymax - ymin
  ycenter = ymin + height / 2.
  xcenter = xmin + width / 2.
  return np.stack([ycenter, xcenter, height, width])

def check_anchors_in_original_model(path):
  """Loads a saved model into a graph."""
  print("Loading saved_model.pb from '%s'" % path)
  anchors_tensor = "Concatenate/concat:0"
  sess = tf.Session()
  with gfile.FastGFile(path) as f:
    graph_def = tf.GraphDef()
    graph_def.ParseFromString(f.read())
    sess.graph.as_default()
    tf.import_graph_def(graph_def, name='')  # 导入计算图

  # 需要有一个初始化的过程
  sess.run(tf.global_variables_initializer())

  anchors = get_anchors(sess,anchors_tensor)
  print "anchors", anchors.shape
  assert (anchors.shape[1] == num_anchors)
  sess.close()
  return anchors

anchors=check_anchors_in_original_model(tf_model_path)


from coremltools.models import datatypes
from coremltools.models import neural_network

# MLMultiArray inputs of neural networks must have 1 or 3 dimensions.
# We only have 2, so add an unused dimension of size one at the back.
input_features = [ ("scores", datatypes.Array(num_classes + 1, num_anchors, 1)),
                   ("boxes", datatypes.Array(4, num_anchors, 1)) ]

# The outputs of the decoder model should match the inputs of the next
# model in the pipeline, NonMaximumSuppression. This expects the number
# of bounding boxes in the first dimension.
output_features = [ ("raw_confidence", datatypes.Array(num_anchors, num_classes)),
                    ("raw_coordinates", datatypes.Array(num_anchors, 4)) ]

builder = neural_network.NeuralNetworkBuilder(input_features, output_features)

# (num_classes+1, num_anchors, 1) --> (1, num_anchors, num_classes+1)
builder.add_permute(name="permute_scores",
                    dim=(0, 3, 2, 1),
                    input_name="scores",
                    output_name="permute_scores_output")

# Strip off the "unknown" class (at index 0).
builder.add_slice(name="slice_scores",
                  input_name="permute_scores_output",
                  output_name="raw_confidence",
                  axis="width",
                  start_index=1,
                  end_index=num_classes + 1)

# Grab the y, x coordinates (channels 0-1).
builder.add_slice(name="slice_yx",
                  input_name="boxes",
                  output_name="slice_yx_output",
                  axis="channel",
                  start_index=0,
                  end_index=2)

# boxes_yx / 10
builder.add_elementwise(name="scale_yx",
                        input_names="slice_yx_output",
                        output_name="scale_yx_output",
                        mode="MULTIPLY",
                        alpha=0.1)

# Split the anchors into two (2, 1917, 1) arrays.
anchors_yx = np.expand_dims(anchors[:2, :], axis=-1)
anchors_hw = np.expand_dims(anchors[2:, :], axis=-1)

builder.add_load_constant(name="anchors_yx",
                          output_name="anchors_yx",
                          constant_value=anchors_yx,
                          shape=[2, num_anchors, 1])

builder.add_load_constant(name="anchors_hw",
                          output_name="anchors_hw",
                          constant_value=anchors_hw,
                          shape=[2, num_anchors, 1])

# (boxes_yx / 10) * anchors_hw
builder.add_elementwise(name="yw_times_hw",
                        input_names=["scale_yx_output", "anchors_hw"],
                        output_name="yw_times_hw_output",
                        mode="MULTIPLY")

# (boxes_yx / 10) * anchors_hw + anchors_yx
builder.add_elementwise(name="decoded_yx",
                        input_names=["yw_times_hw_output", "anchors_yx"],
                        output_name="decoded_yx_output",
                        mode="ADD")

# Grab the height and width (channels 2-3).
builder.add_slice(name="slice_hw",
                  input_name="boxes",
                  output_name="slice_hw_output",
                  axis="channel",
                  start_index=2,
                  end_index=4)

# (boxes_hw / 5)
builder.add_elementwise(name="scale_hw",
                        input_names="slice_hw_output",
                        output_name="scale_hw_output",
                        mode="MULTIPLY",
                        alpha=0.2)

# exp(boxes_hw / 5)
builder.add_unary(name="exp_hw",
                  input_name="scale_hw_output",
                  output_name="exp_hw_output",
                  mode="exp")

# exp(boxes_hw / 5) * anchors_hw
builder.add_elementwise(name="decoded_hw",
                        input_names=["exp_hw_output", "anchors_hw"],
                        output_name="decoded_hw_output",
                        mode="MULTIPLY")

# The coordinates are now (y, x) and (height, width) but NonMaximumSuppression
# wants them as (x, y, width, height). So create four slices and then concat
# them into the right order.
builder.add_slice(name="slice_y",
                  input_name="decoded_yx_output",
                  output_name="slice_y_output",
                  axis="channel",
                  start_index=0,
                  end_index=1)

builder.add_slice(name="slice_x",
                  input_name="decoded_yx_output",
                  output_name="slice_x_output",
                  axis="channel",
                  start_index=1,
                  end_index=2)

builder.add_slice(name="slice_h",
                  input_name="decoded_hw_output",
                  output_name="slice_h_output",
                  axis="channel",
                  start_index=0,
                  end_index=1)

builder.add_slice(name="slice_w",
                  input_name="decoded_hw_output",
                  output_name="slice_w_output",
                  axis="channel",
                  start_index=1,
                  end_index=2)

builder.add_elementwise(name="concat",
                        input_names=["slice_x_output", "slice_y_output",
                                     "slice_w_output", "slice_h_output"],
                        output_name="concat_output",
                        mode="CONCAT")

# (4, num_anchors, 1) --> (1, num_anchors, 4)
builder.add_permute(name="permute_output",
                    dim=(0, 3, 2, 1),
                    input_name="concat_output",
                    output_name="raw_coordinates")

decoder_model = coremltools.models.MLModel(builder.spec)
decoder_model.save("Decoder.mlmodel")

print "saved box decoder sub model"

nms_spec = coremltools.proto.Model_pb2.Model()
nms_spec.specificationVersion = 3

for i in range(2):
  decoder_output = decoder_model._spec.description.output[i].SerializeToString()

  nms_spec.description.input.add()
  nms_spec.description.input[i].ParseFromString(decoder_output)

  nms_spec.description.output.add()
  nms_spec.description.output[i].ParseFromString(decoder_output)

nms_spec.description.output[0].name = "confidence"
nms_spec.description.output[1].name = "coordinates"

output_sizes = [num_classes, 4]
for i in range(2):
  ma_type = nms_spec.description.output[i].type.multiArrayType
  ma_type.shapeRange.sizeRanges.add()
  ma_type.shapeRange.sizeRanges[0].lowerBound = 0
  ma_type.shapeRange.sizeRanges[0].upperBound = -1
  ma_type.shapeRange.sizeRanges.add()
  ma_type.shapeRange.sizeRanges[1].lowerBound = output_sizes[i]
  ma_type.shapeRange.sizeRanges[1].upperBound = output_sizes[i]
  del ma_type.shape[:]

nms = nms_spec.nonMaximumSuppression
nms.confidenceInputFeatureName = "raw_confidence"
nms.coordinatesInputFeatureName = "raw_coordinates"
nms.confidenceOutputFeatureName = "confidence"
nms.coordinatesOutputFeatureName = "coordinates"
nms.iouThresholdInputFeatureName = "iouThreshold"
nms.confidenceThresholdInputFeatureName = "confidenceThreshold"

default_iou_threshold = 0.2
default_confidence_threshold = 0.01
nms.iouThreshold = default_iou_threshold
nms.confidenceThreshold = default_confidence_threshold

nms.pickTop.perClass = True

labels = np.array(get_lab_map_display_name(tf_labelmap_path), dtype=str)
nms.stringClassLabels.vector.extend(labels)

nms_model = coremltools.models.MLModel(nms_spec)
nms_model.save("NMS.mlmodel")

print "NMS model saved"


from coremltools.models.pipeline import *

input_features = [ ("image", datatypes.Array(3, input_height, input_width)),
                   ("iouThreshold", datatypes.Double()),
                   ("confidenceThreshold", datatypes.Double()) ]

output_features = [ "confidence", "coordinates" ]

pipeline = Pipeline(input_features, output_features)

# We added a dimension of size 1 to the back of the inputs of the decoder
# model, so we should also add this to the output of the SSD model or else
# the inputs and outputs do not match and the pipeline is not valid.
ssd_output = ssd_model._spec.description.output
ssd_output[0].type.multiArrayType.shape[:] = [num_classes + 1, num_anchors, 1]
ssd_output[1].type.multiArrayType.shape[:] = [4, num_anchors, 1]

pipeline.add_model(ssd_model)
pipeline.add_model(decoder_model)
pipeline.add_model(nms_model)

# The "image" input should really be an image, not a multi-array.
pipeline.spec.description.input[0].ParseFromString(ssd_model._spec.description.input[0].SerializeToString())

# Copy the declarations of the "confidence" and "coordinates" outputs.
# The Pipeline makes these strings by default.
pipeline.spec.description.output[0].ParseFromString(nms_model._spec.description.output[0].SerializeToString())
pipeline.spec.description.output[1].ParseFromString(nms_model._spec.description.output[1].SerializeToString())

# Add descriptions to the inputs and outputs.
pipeline.spec.description.input[1].shortDescription = "(optional) IOU Threshold override"
pipeline.spec.description.input[2].shortDescription = "(optional) Confidence Threshold override"
pipeline.spec.description.output[0].shortDescription = u"Boxes \xd7 Class confidence"
pipeline.spec.description.output[1].shortDescription = u"Boxes \xd7 [x, y, width, height] (relative to image size)"

# Add metadata to the model.
pipeline.spec.description.metadata.versionString = "ssd_mobilenet_v1_coco"
pipeline.spec.description.metadata.shortDescription = "MobileNetV1 + SSD, trained on Chinadrinks"
pipeline.spec.description.metadata.author = "Converted to Core ML by me :)"
pipeline.spec.description.metadata.license = "https://www.pornhub.com"

# Add the list of class labels and the default threshold values too.
user_defined_metadata = {
    "iou_threshold": str(default_iou_threshold),
    "confidence_threshold": str(default_confidence_threshold),
    "classes": ",".join(labels)
}
pipeline.spec.description.metadata.userDefined.update(user_defined_metadata)

# Don't forget this or Core ML might attempt to run the model on an unsupported
# operating system version!
pipeline.spec.specificationVersion = 3

final_model = coremltools.models.MLModel(pipeline.spec)
final_model.save(pipline_model_save_path)

print(final_model)
print("Done!")

参考资料:

https://github.com/tf-coreml/tf-coreml/blob/master/examples/ssd_example.ipynb

https://github.com/tf-coreml/tf-coreml/issues/107

https://github.com/tf-coreml/tf-coreml/issues/279

https://github.com/vonholst/SSDMobileNet_CoreML

 

 

 

 

 类似资料: