命名实体识别(NER)是自然语言处理的基本任务之一,通常从文本中抽取出时间,地点,组织机构,人物等实体。目前大部分命名实体识别工作都是基于预训练模型上微调实现的。本博客是基于hugging face的Transformers实现的,框架采用的是tensorflow==2.4.0。
利用transformers中的BertTokenizer构建数据特征,包括input_ids,token_type_ids,attention_mask。
def create_inputs_targets(sentences, tags, tag2id, max_len, tokenizer):
dataset_dict = {
"input_ids": [],
"token_type_ids": [],
"attention_mask": [],
"tags": []
}
for sentence, tag in zip(sentences, tags):
input_ids = []
target_tags = []
for idx, word in enumerate(sentence):
ids = tokenizer.encode(word, add_special_tokens=False)
input_ids.extend(ids.ids)
# 这个判断ids的长度会避免很多错误,tokenizer中会出现多个值,对应的label也要相加,例如对一个韩文token后会出现多个值
num_tokens = len(ids)
target_tags.extend([tag[idx]] * num_tokens)
# Pad truncate,句子前后加'[CLS]','[SEP]'
input_ids = input_ids[:max_len - 2]
target_tags = target_tags[:max_len - 2]
input_ids = [101] + input_ids + [102]
# 这里'O'对应的是16, 这里是否对应的是tag2id中的[CLS][SEP]
target_tags = [tag2id['O']] + target_tags + [tag2id['O']]
token_type_ids = [0] * len(input_ids)
attention_mask = [1] * len(input_ids)
padding_len = max_len - len(input_ids)
# vocab中 [PAD]的编码是0
input_ids = input_ids + ([0] * padding_len)
attention_mask = attention_mask + ([0] * padding_len)
token_type_ids = token_type_ids + ([0] * padding_len)
# target 这里新加一个label是应该是对应[SEP]或者[CLS],或者是'O'
# taget padding 'O'
target_tags = target_tags + ([tag2id['O']] * padding_len)
dataset_dict["input_ids"].append(input_ids)
dataset_dict["token_type_ids"].append(token_type_ids)
dataset_dict["attention_mask"].append(attention_mask)
dataset_dict["tags"].append(target_tags)
assert len(target_tags) == max_len, f'{len(input_ids)}, {len(target_tags)}'
for key in dataset_dict:
dataset_dict[key] = np.array(dataset_dict[key])
x = [
dataset_dict["input_ids"],
dataset_dict["token_type_ids"],
dataset_dict["attention_mask"],
]
y = dataset_dict["tags"]
return x, y
采用bert微调,代码如下:
def create_model(num_tags, max_len):
# # BERT encoder
encoder = TFBertModel.from_pretrained("bert-base-chinese")
# # NER Model
input_ids = layers.Input(shape=(None,), dtype=tf.int32, name="input_ids")
token_type_ids = layers.Input(shape=(None,), dtype=tf.int32, name="token_type_ids")
attention_mask = layers.Input(shape=(None,), dtype=tf.int32, name="attention_mask")
embedding = encoder(
input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask
)[0]
embedding = layers.Dropout(0.3)(embedding)
tag_logits = layers.Dense(num_tags, activation='softmax')(embedding)
model = keras.Model(
inputs=[input_ids, token_type_ids, attention_mask],
outputs=[tag_logits],
)
optimizer = keras.optimizers.Adam(lr=3e-5)
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=False, reduction=tf.keras.losses.Reduction.NONE
)
def masked_ce_loss(real, pred):
mask = tf.math.logical_not(tf.math.equal(real, 17))
loss_ = loss_object(real, pred)
mask = tf.cast(mask, dtype=loss_.dtype)
loss_ *= mask
return tf.reduce_mean(loss_)
model.compile(optimizer=optimizer, loss=masked_ce_loss, metrics=['accuracy'])
return model
采用tensorflow的高阶API keras,将模型保存为h5和pb。
def main():
train_file = "./data/train_example.txt"
dev_file = "./data/dev_example.txt"
tag2id_path = "./output/tag2id.json"
output_path = "./output/"
pb_path = "./output/1"
if not os.path.exists(output_path):
os.makedirs(output_path)
if not os.path.join(pb_path):
os.makedirs(pb_path)
tag2id = dict()
max_len = 64
batch_size = 4
epoch = 1
# load data
train_data, train_label, tag2id = load_data(train_file, tag2id)
print("train data size: ", len(train_data))
print("train label size: ", len(train_label))
print("label dict: ", tag2id)
dev_data, dev_label, tag2id = load_data(dev_file, tag2id)
print("dev data size: ", len(dev_data))
print("dev label size: ", len(dev_label))
print("label dict: ", tag2id)
# save tag2id
save_dict(tag2id, tag2id_path)
# label encoder
train_label = label_encoder(train_label, tag2id)
print("train label: ", train_label[:3])
dev_label = label_encoder(dev_label, tag2id)
print("dev label: ", dev_label[:3])
# get tokenizer
tokenizer = get_tokenizer()
# 准备模型数据
train_x, train_y = create_inputs_targets(train_data, train_label, tag2id, max_len, tokenizer)
print("train data tokenizer: ", train_x[:3])
dev_x, dev_y = create_inputs_targets(dev_data, dev_label, tag2id, max_len, tokenizer)
print("dev data tokenizer: ", dev_x[:3])
# create model
model = create_model(len(tag2id), max_len)
model.summary()
history = model.fit(train_x,
train_y,
epochs=epoch,
verbose=1,
batch_size=batch_size,
validation_data=(dev_x, dev_y),
validation_batch_size=batch_size
) # , validation_split=0.1
# model save
if not os.path.exists(output_path):
os.makedirs(output_path)
model_file = os.path.join(output_path, "ner_model.h5")
model.save_weights(model_file, overwrite=True)
# save pb model
tf.keras.models.save_model(model, pb_path, save_format="tf")
pred = model.predict(train_x, batch_size=batch_size)
print("pred shape: ", pred.shape)
按照上面的方式构建模型输入的特征,进行模型训练。
def predict(test_data, max_len, tag2id):
tokenizer = get_tokenizer()
test_x, len_list = create_infer_inputs(test_data, max_len, tokenizer)
print("test data tokenizer: ", test_x[:3])
model = create_model(len(tag2id), max_len)
model.load_weights("./output/ner_model.h5")
pred_logits = model.predict(test_x)
id2tag = {value: key for key, value in tag2id.items()}
# shape [batch_size, seq_len]
pred = np.argmax(pred_logits, axis=2).tolist()
predict_label = []
for i in range(len(len_list)):
temp = []
temp_pred = pred[i]
for j in range(min(len_list[i], max_len)):
temp.append(id2tag[temp_pred[j]])
predict_label.append(temp)
print("predict label: ", predict_label)
return predict_label
利用Flask提供http服务,方便调用。
def bert_ner_infer():
params = json.loads(request.get_data(), encoding="utf-8")
text = params["text"]
url = params["url"] # tensorflow serving 地址
x, len_list = create_infer_inputs(text, max_len, tokenizer)
print("len_list: ", len_list)
input_ids = x[0].tolist()
token_type_ids = x[1].tolist()
attention_mask = x[2].tolist()
data = json.dumps({"signature_name": "serving_default",
"inputs": {"input_ids": input_ids,
"token_type_ids": token_type_ids,
"attention_mask": attention_mask}})
headers = {"content-type": "application/json"}
result = requests.post(url, data=data, headers=headers)
result = json.loads(result.text)
pred_logits = result["outputs"][0]
pred = np.argmax(pred_logits, axis=1).tolist()
print("pred: ", pred)
predict_label = []
for j in range(min(len_list[0], max_len)):
predict_label.append(id2tag[pred[j]])
return_result = {"predict": predict_label}
return jsonify(return_result)
以上是利用transformers实现的命名实体识别。如有问题,欢迎指正。