23.10 图片 OCR
优质
小牛编辑
132浏览
2023-12-01
此示例使用卷积堆栈,后跟递归堆栈和 CTC logloss 函数,以对生成的文本图像进行光学字符识别。 我没有证据表明它实际上是学习文本的一般形状,还是仅仅能够识别所抛出的所有不同字体……它的目的更多是为了在 Keras 中演示CTC。 请注意,可能需要针对使用中的特定操作系统更新字体列表。
它从 4 个字母词开始。对于前12个轮次,使用 TextImageGenerator 类(同时是测试/训练数据的生成器类和 Keras 回调类)会逐渐增加难度。 20个 轮次后,通过重新编译模型以处理更宽的图像并重建单词列表以包含两个以空格分隔的单词,将抛出更长的序列。
下表显示了标准化的编辑距离值。 Theano 使用的 CTC 实现略有不同,因此结果也有所不同。
Epoch | TF | TH |
---|---|---|
10 | 0.027 | 0.064 |
15 | 0.038 | 0.035 |
20 | 0.043 | 0.045 |
25 | 0.014 | 0.019 |
需要 cairo
和 editdistance
包:
首先,安装 Cairo 库: https://cairographics.org/
然后安装 Python 依赖:
pip install cairocffi
pip install editdistance
Created by Mike Henry https://github.com/mbhenry/
import os
import itertools
import codecs
import re
import datetime
import cairocffi as cairo
import editdistance
import numpy as np
from scipy import ndimage
import pylab
from keras import backend as K
from keras.layers.convolutional import Conv2D, MaxPooling2D
from keras.layers import Input, Dense, Activation
from keras.layers import Reshape, Lambda
from keras.layers.merge import add, concatenate
from keras.models import Model
from keras.layers.recurrent import GRU
from keras.optimizers import SGD
from keras.utils.data_utils import get_file
from keras.preprocessing import image
import keras.callbacks
OUTPUT_DIR = 'image_ocr'
# 字符类和匹配的正则表达式过滤器
regex = r'^[a-z ]+$'
alphabet = u'abcdefghijklmnopqrstuvwxyz '
np.random.seed(55)
# 这会产生更大的 "斑点" 噪声,
# 看起来比仅添加高斯噪声更为真实,
# 它假定像素的灰度范围为 0 到 1
def speckle(img):
severity = np.random.uniform(0, 0.6)
blur = ndimage.gaussian_filter(np.random.randn(*img.shape) * severity, 1)
img_speck = (img + blur)
img_speck[img_speck > 1] = 1
img_speck[img_speck <= 0] = 0
return img_speck
# 在随机位置绘制字符串,边界框也使用随机字体、轻微的随机旋转和随机的斑点噪声
def paint_text(text, w, h, rotate=False, ud=False, multi_fonts=False):
surface = cairo.ImageSurface(cairo.FORMAT_RGB24, w, h)
with cairo.Context(surface) as context:
context.set_source_rgb(1, 1, 1) # 白色
context.paint()
# 此字体列表可在 CentOS 7 中使用
if multi_fonts:
fonts = [
'Century Schoolbook', 'Courier', 'STIX',
'URW Chancery L', 'FreeMono']
context.select_font_face(
np.random.choice(fonts),
cairo.FONT_SLANT_NORMAL,
np.random.choice([cairo.FONT_WEIGHT_BOLD, cairo.FONT_WEIGHT_NORMAL]))
else:
context.select_font_face('Courier',
cairo.FONT_SLANT_NORMAL,
cairo.FONT_WEIGHT_BOLD)
context.set_font_size(25)
box = context.text_extents(text)
border_w_h = (4, 4)
if box[2] > (w - 2 * border_w_h[1]) or box[3] > (h - 2 * border_w_h[0]):
raise IOError(('Could not fit string into image.'
'Max char count is too large for given image width.'))
# 通过在画布上随机放置文本框并旋转一些空间来教会 RNN 平移不变性
max_shift_x = w - box[2] - border_w_h[0]
max_shift_y = h - box[3] - border_w_h[1]
top_left_x = np.random.randint(0, int(max_shift_x))
if ud:
top_left_y = np.random.randint(0, int(max_shift_y))
else:
top_left_y = h // 2
context.move_to(top_left_x - int(box[0]), top_left_y - int(box[1]))
context.set_source_rgb(0, 0, 0)
context.show_text(text)
buf = surface.get_data()
a = np.frombuffer(buf, np.uint8)
a.shape = (h, w, 4)
a = a[:, :, 0] # 抓取单个通道
a = a.astype(np.float32) / 255
a = np.expand_dims(a, 0)
if rotate:
a = image.random_rotation(a, 3 * (w - top_left_x) / w + 1)
a = speckle(a)
return a
def shuffle_mats_or_lists(matrix_list, stop_ind=None):
ret = []
assert all([len(i) == len(matrix_list[0]) for i in matrix_list])
len_val = len(matrix_list[0])
if stop_ind is None:
stop_ind = len_val
assert stop_ind <= len_val
a = list(range(stop_ind))
np.random.shuffle(a)
a += list(range(stop_ind, len_val))
for mat in matrix_list:
if isinstance(mat, np.ndarray):
ret.append(mat[a])
elif isinstance(mat, list):
ret.append([mat[i] for i in a])
else:
raise TypeError('`shuffle_mats_or_lists` only supports '
'numpy.array and list objects.')
return ret
# 将字符转换为唯一的整数值
def text_to_labels(text):
ret = []
for char in text:
ret.append(alphabet.find(char))
return ret
# 将数字类反向转换回字符
def labels_to_text(labels):
ret = []
for c in labels:
if c == len(alphabet): # CTC 空白
ret.append("")
else:
ret.append(alphabet[c])
return "".join(ret)
# 仅 a-z 和空格..可能不难扩展为大写和符号
def is_valid_str(in_str):
search = re.compile(regex, re.UNICODE).search
return bool(search(in_str))
# 使用生成器函数提供训练/测试数据。每次使用随机扰动动态创建图像渲染和文本
class TextImageGenerator(keras.callbacks.Callback):
def __init__(self, monogram_file, bigram_file, minibatch_size,
img_w, img_h, downsample_factor, val_split,
absolute_max_string_len=16):
self.minibatch_size = minibatch_size
self.img_w = img_w
self.img_h = img_h
self.monogram_file = monogram_file
self.bigram_file = bigram_file
self.downsample_factor = downsample_factor
self.val_split = val_split
self.blank_label = self.get_output_size() - 1
self.absolute_max_string_len = absolute_max_string_len
def get_output_size(self):
return len(alphabet) + 1
# 由于使用生成器,因此 num_words可以与轮次大小无关,因为 max_string_len 增长, num_words 也会增长
def build_word_list(self, num_words, max_string_len=None, mono_fraction=0.5):
assert max_string_len <= self.absolute_max_string_len
assert num_words % self.minibatch_size == 0
assert (self.val_split * num_words) % self.minibatch_size == 0
self.num_words = num_words
self.string_list = [''] * self.num_words
tmp_string_list = []
self.max_string_len = max_string_len
self.Y_data = np.ones([self.num_words, self.absolute_max_string_len]) * -1
self.X_text = []
self.Y_len = [0] * self.num_words
def _is_length_of_word_valid(word):
return (max_string_len == -1 or
max_string_len is None or
len(word) <= max_string_len)
# 会标文件按英语语音中的频率排序
with codecs.open(self.monogram_file, mode='r', encoding='utf-8') as f:
for line in f:
if len(tmp_string_list) == int(self.num_words * mono_fraction):
break
word = line.rstrip()
if _is_length_of_word_valid(word):
tmp_string_list.append(word)
# bigram文件包含英语语音中的常用单词对
with codecs.open(self.bigram_file, mode='r', encoding='utf-8') as f:
lines = f.readlines()
for line in lines:
if len(tmp_string_list) == self.num_words:
break
columns = line.lower().split()
word = columns[0] + ' ' + columns[1]
if is_valid_str(word) and _is_length_of_word_valid(word):
tmp_string_list.append(word)
if len(tmp_string_list) != self.num_words:
raise IOError('Could not pull enough words'
'from supplied monogram and bigram files.')
# 隔行扫描以混合易用词和难用词
self.string_list[::2] = tmp_string_list[:self.num_words // 2]
self.string_list[1::2] = tmp_string_list[self.num_words // 2:]
for i, word in enumerate(self.string_list):
self.Y_len[i] = len(word)
self.Y_data[i, 0:len(word)] = text_to_labels(word)
self.X_text.append(word)
self.Y_len = np.expand_dims(np.array(self.Y_len), 1)
self.cur_val_index = self.val_split
self.cur_train_index = 0
# 每次从训练/验证/测试中请求图像时,都会对文本进行新的随机绘制
def get_batch(self, index, size, train):
# width 和 height 按典型的 Keras 约定反向,因为 width 是将其馈入 RNN 时的时间维。
if K.image_data_format() == 'channels_first':
X_data = np.ones([size, 1, self.img_w, self.img_h])
else:
X_data = np.ones([size, self.img_w, self.img_h, 1])
labels = np.ones([size, self.absolute_max_string_len])
input_length = np.zeros([size, 1])
label_length = np.zeros([size, 1])
source_str = []
for i in range(size):
# 混合一些空白输入。这对于实现翻译不变性似乎很重要
if train and i > size - 4:
if K.image_data_format() == 'channels_first':
X_data[i, 0, 0:self.img_w, :] = self.paint_func('')[0, :, :].T
else:
X_data[i, 0:self.img_w, :, 0] = self.paint_func('',)[0, :, :].T
labels[i, 0] = self.blank_label
input_length[i] = self.img_w // self.downsample_factor - 2
label_length[i] = 1
source_str.append('')
else:
if K.image_data_format() == 'channels_first':
X_data[i, 0, 0:self.img_w, :] = (
self.paint_func(self.X_text[index + i])[0, :, :].T)
else:
X_data[i, 0:self.img_w, :, 0] = (
self.paint_func(self.X_text[index + i])[0, :, :].T)
labels[i, :] = self.Y_data[index + i]
input_length[i] = self.img_w // self.downsample_factor - 2
label_length[i] = self.Y_len[index + i]
source_str.append(self.X_text[index + i])
inputs = {'the_input': X_data,
'the_labels': labels,
'input_length': input_length,
'label_length': label_length,
'source_str': source_str # 仅用于可视化
}
outputs = {'ctc': np.zeros([size])} # 虚拟数据,用于虚拟 loss 函数
return (inputs, outputs)
def next_train(self):
while 1:
ret = self.get_batch(self.cur_train_index,
self.minibatch_size, train=True)
self.cur_train_index += self.minibatch_size
if self.cur_train_index >= self.val_split:
self.cur_train_index = self.cur_train_index % 32
(self.X_text, self.Y_data, self.Y_len) = shuffle_mats_or_lists(
[self.X_text, self.Y_data, self.Y_len], self.val_split)
yield ret
def next_val(self):
while 1:
ret = self.get_batch(self.cur_val_index,
self.minibatch_size, train=False)
self.cur_val_index += self.minibatch_size
if self.cur_val_index >= self.num_words:
self.cur_val_index = self.val_split + self.cur_val_index % 32
yield ret
def on_train_begin(self, logs={}):
self.build_word_list(16000, 4, 1)
self.paint_func = lambda text: paint_text(
text, self.img_w, self.img_h,
rotate=False, ud=False, multi_fonts=False)
def on_epoch_begin(self, epoch, logs={}):
# 重新结合绘画功能以实现课程学习
if 3 <= epoch < 6:
self.paint_func = lambda text: paint_text(
text, self.img_w, self.img_h,
rotate=False, ud=True, multi_fonts=False)
elif 6 <= epoch < 9:
self.paint_func = lambda text: paint_text(
text, self.img_w, self.img_h,
rotate=False, ud=True, multi_fonts=True)
elif epoch >= 9:
self.paint_func = lambda text: paint_text(
text, self.img_w, self.img_h,
rotate=True, ud=True, multi_fonts=True)
if epoch >= 21 and self.max_string_len < 12:
self.build_word_list(32000, 12, 0.5)
# 尽管不是内部 Keras 损失函数,但实际损失计算仍在此处发生
def ctc_lambda_func(args):
y_pred, labels, input_length, label_length = args
# 这里的 2 是至关重要的,因为 RNN 的前几个输出往往是垃圾:
y_pred = y_pred[:, 2:, :]
return K.ctc_batch_cost(labels, y_pred, input_length, label_length)
# 对于真正的 OCR 应用程序,这应该是带有字典和语言模型的波束搜索。
# 对于此示例,最佳路径就足够了。
def decode_batch(test_func, word_batch):
out = test_func([word_batch])[0]
ret = []
for j in range(out.shape[0]):
out_best = list(np.argmax(out[j, 2:], 1))
out_best = [k for k, g in itertools.groupby(out_best)]
outstr = labels_to_text(out_best)
ret.append(outstr)
return ret
class VizCallback(keras.callbacks.Callback):
def __init__(self, run_name, test_func, text_img_gen, num_display_words=6):
self.test_func = test_func
self.output_dir = os.path.join(
OUTPUT_DIR, run_name)
self.text_img_gen = text_img_gen
self.num_display_words = num_display_words
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
def show_edit_distance(self, num):
num_left = num
mean_norm_ed = 0.0
mean_ed = 0.0
while num_left > 0:
word_batch = next(self.text_img_gen)[0]
num_proc = min(word_batch['the_input'].shape[0], num_left)
decoded_res = decode_batch(self.test_func,
word_batch['the_input'][0:num_proc])
for j in range(num_proc):
edit_dist = editdistance.eval(decoded_res[j],
word_batch['source_str'][j])
mean_ed += float(edit_dist)
mean_norm_ed += float(edit_dist) / len(word_batch['source_str'][j])
num_left -= num_proc
mean_norm_ed = mean_norm_ed / num
mean_ed = mean_ed / num
print('\nOut of %d samples: Mean edit distance:'
'%.3f Mean normalized edit distance: %0.3f'
% (num, mean_ed, mean_norm_ed))
def on_epoch_end(self, epoch, logs={}):
self.model.save_weights(
os.path.join(self.output_dir, 'weights%02d.h5' % (epoch)))
self.show_edit_distance(256)
word_batch = next(self.text_img_gen)[0]
res = decode_batch(self.test_func,
word_batch['the_input'][0:self.num_display_words])
if word_batch['the_input'][0].shape[0] < 256:
cols = 2
else:
cols = 1
for i in range(self.num_display_words):
pylab.subplot(self.num_display_words // cols, cols, i + 1)
if K.image_data_format() == 'channels_first':
the_input = word_batch['the_input'][i, 0, :, :]
else:
the_input = word_batch['the_input'][i, :, :, 0]
pylab.imshow(the_input.T, cmap='Greys_r')
pylab.xlabel(
'Truth = \'%s\'\nDecoded = \'%s\'' %
(word_batch['source_str'][i], res[i]))
fig = pylab.gcf()
fig.set_size_inches(10, 13)
pylab.savefig(os.path.join(self.output_dir, 'e%02d.png' % (epoch)))
pylab.close()
def train(run_name, start_epoch, stop_epoch, img_w):
# 输入参数
img_h = 64
words_per_epoch = 16000
val_split = 0.2
val_words = int(words_per_epoch * (val_split))
# 网络参数
conv_filters = 16
kernel_size = (3, 3)
pool_size = 2
time_dense_size = 32
rnn_size = 512
minibatch_size = 32
if K.image_data_format() == 'channels_first':
input_shape = (1, img_w, img_h)
else:
input_shape = (img_w, img_h, 1)
fdir = os.path.dirname(
get_file('wordlists.tgz',
origin='http://www.mythic-ai.com/datasets/wordlists.tgz',
untar=True))
img_gen = TextImageGenerator(
monogram_file=os.path.join(fdir, 'wordlist_mono_clean.txt'),
bigram_file=os.path.join(fdir, 'wordlist_bi_clean.txt'),
minibatch_size=minibatch_size,
img_w=img_w,
img_h=img_h,
downsample_factor=(pool_size ** 2),
val_split=words_per_epoch - val_words)
act = 'relu'
input_data = Input(name='the_input', shape=input_shape, dtype='float32')
inner = Conv2D(conv_filters, kernel_size, padding='same',
activation=act, kernel_initializer='he_normal',
name='conv1')(input_data)
inner = MaxPooling2D(pool_size=(pool_size, pool_size), name='max1')(inner)
inner = Conv2D(conv_filters, kernel_size, padding='same',
activation=act, kernel_initializer='he_normal',
name='conv2')(inner)
inner = MaxPooling2D(pool_size=(pool_size, pool_size), name='max2')(inner)
conv_to_rnn_dims = (img_w // (pool_size ** 2),
(img_h // (pool_size ** 2)) * conv_filters)
inner = Reshape(target_shape=conv_to_rnn_dims, name='reshape')(inner)
# 减少进入 RNN 的输入大小:
inner = Dense(time_dense_size, activation=act, name='dense1')(inner)
# 两层双向GRU
# 单层 GRU 似乎也可以,如果不比 LSTM 强:
gru_1 = GRU(rnn_size, return_sequences=True,
kernel_initializer='he_normal', name='gru1')(inner)
gru_1b = GRU(rnn_size, return_sequences=True,
go_backwards=True, kernel_initializer='he_normal',
name='gru1_b')(inner)
gru1_merged = add([gru_1, gru_1b])
gru_2 = GRU(rnn_size, return_sequences=True,
kernel_initializer='he_normal', name='gru2')(gru1_merged)
gru_2b = GRU(rnn_size, return_sequences=True, go_backwards=True,
kernel_initializer='he_normal', name='gru2_b')(gru1_merged)
# 将 RNN 输出转换为字符激活:
inner = Dense(img_gen.get_output_size(), kernel_initializer='he_normal',
name='dense2')(concatenate([gru_2, gru_2b]))
y_pred = Activation('softmax', name='softmax')(inner)
Model(inputs=input_data, outputs=y_pred).summary()
labels = Input(name='the_labels',
shape=[img_gen.absolute_max_string_len], dtype='float32')
input_length = Input(name='input_length', shape=[1], dtype='int64')
label_length = Input(name='label_length', shape=[1], dtype='int64')
# Keras 当前不支持带有额外参数的损失函数,因此 CTC 损失在 Lambda 层中实现
loss_out = Lambda(
ctc_lambda_func, output_shape=(1,),
name='ctc')([y_pred, labels, input_length, label_length])
# clipnorm 似乎加快了收敛速度
sgd = SGD(learning_rate=0.02,
decay=1e-6,
momentum=0.9,
nesterov=True)
model = Model(inputs=[input_data, labels, input_length, label_length],
outputs=loss_out)
# 损失计算发生在其他地方,因此请使用虚拟 lambda 函数补偿损失
model.compile(loss={'ctc': lambda y_true, y_pred: y_pred}, optimizer=sgd)
if start_epoch > 0:
weight_file = os.path.join(
OUTPUT_DIR,
os.path.join(run_name, 'weights%02d.h5' % (start_epoch - 1)))
model.load_weights(weight_file)
# 捕获 softmax 的输出,以便我们可以在可视化过程中解码输出
test_func = K.function([input_data], [y_pred])
viz_cb = VizCallback(run_name, test_func, img_gen.next_val())
model.fit_generator(
generator=img_gen.next_train(),
steps_per_epoch=(words_per_epoch - val_words) // minibatch_size,
epochs=stop_epoch,
validation_data=img_gen.next_val(),
validation_steps=val_words // minibatch_size,
callbacks=[viz_cb, img_gen],
initial_epoch=start_epoch)
if __name__ == '__main__':
run_name = datetime.datetime.now().strftime('%Y:%m:%d:%H:%M:%S')
train(run_name, 0, 20, 128)
# 增加到更宽的图像并从第 20 个轮次开始。
# 学到的重量会重新加载
train(run_name, 20, 25, 512)