利用hugging face的Transformers实现命名实体识别

花俊雄
2023-12-01

命名实体识别

命名实体识别(NER)是自然语言处理的基本任务之一,通常从文本中抽取出时间,地点,组织机构,人物等实体。目前大部分命名实体识别工作都是基于预训练模型上微调实现的。本博客是基于hugging face的Transformers实现的,框架采用的是tensorflow==2.4.0。

数据特征

利用transformers中的BertTokenizer构建数据特征,包括input_ids,token_type_ids,attention_mask。

def create_inputs_targets(sentences, tags, tag2id, max_len, tokenizer):
    dataset_dict = {
        "input_ids": [],
        "token_type_ids": [],
        "attention_mask": [],
        "tags": []
    }

    for sentence, tag in zip(sentences, tags):
        input_ids = []
        target_tags = []
        for idx, word in enumerate(sentence):
            ids = tokenizer.encode(word, add_special_tokens=False)
            input_ids.extend(ids.ids)
            # 这个判断ids的长度会避免很多错误,tokenizer中会出现多个值,对应的label也要相加,例如对一个韩文token后会出现多个值
            num_tokens = len(ids)
            target_tags.extend([tag[idx]] * num_tokens)

        # Pad truncate,句子前后加'[CLS]','[SEP]'
        input_ids = input_ids[:max_len - 2]
        target_tags = target_tags[:max_len - 2]

        input_ids = [101] + input_ids + [102]
        # 这里'O'对应的是16, 这里是否对应的是tag2id中的[CLS][SEP]
        target_tags = [tag2id['O']] + target_tags + [tag2id['O']]
        token_type_ids = [0] * len(input_ids)
        attention_mask = [1] * len(input_ids)
        padding_len = max_len - len(input_ids)
        # vocab中 [PAD]的编码是0
        input_ids = input_ids + ([0] * padding_len)
        attention_mask = attention_mask + ([0] * padding_len)
        token_type_ids = token_type_ids + ([0] * padding_len)
        # target 这里新加一个label是应该是对应[SEP]或者[CLS],或者是'O'
        # taget padding 'O'
        target_tags = target_tags + ([tag2id['O']] * padding_len)

        dataset_dict["input_ids"].append(input_ids)
        dataset_dict["token_type_ids"].append(token_type_ids)
        dataset_dict["attention_mask"].append(attention_mask)
        dataset_dict["tags"].append(target_tags)
        assert len(target_tags) == max_len, f'{len(input_ids)}, {len(target_tags)}'

    for key in dataset_dict:
        dataset_dict[key] = np.array(dataset_dict[key])

    x = [
        dataset_dict["input_ids"],
        dataset_dict["token_type_ids"],
        dataset_dict["attention_mask"],
    ]
    y = dataset_dict["tags"]
    return x, y

模型

采用bert微调,代码如下:

def create_model(num_tags, max_len):
    # # BERT encoder
    encoder = TFBertModel.from_pretrained("bert-base-chinese")
    # # NER Model
    input_ids = layers.Input(shape=(None,), dtype=tf.int32, name="input_ids")
    token_type_ids = layers.Input(shape=(None,), dtype=tf.int32, name="token_type_ids")
    attention_mask = layers.Input(shape=(None,), dtype=tf.int32, name="attention_mask")
    embedding = encoder(
        input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask
    )[0]
    embedding = layers.Dropout(0.3)(embedding)

    tag_logits = layers.Dense(num_tags, activation='softmax')(embedding)

    model = keras.Model(
        inputs=[input_ids, token_type_ids, attention_mask],
        outputs=[tag_logits],
    )
    optimizer = keras.optimizers.Adam(lr=3e-5)
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=False, reduction=tf.keras.losses.Reduction.NONE
    )

    def masked_ce_loss(real, pred):
        mask = tf.math.logical_not(tf.math.equal(real, 17))
        loss_ = loss_object(real, pred)

        mask = tf.cast(mask, dtype=loss_.dtype)
        loss_ *= mask

        return tf.reduce_mean(loss_)
    model.compile(optimizer=optimizer, loss=masked_ce_loss, metrics=['accuracy'])
    return model

模型训练

采用tensorflow的高阶API keras,将模型保存为h5和pb。

def main():
    train_file = "./data/train_example.txt"
    dev_file = "./data/dev_example.txt"
    tag2id_path = "./output/tag2id.json"
    output_path = "./output/"
    pb_path = "./output/1"
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    if not os.path.join(pb_path):
        os.makedirs(pb_path)
    tag2id = dict()
    max_len = 64
    batch_size = 4
    epoch = 1
    # load data
    train_data, train_label, tag2id = load_data(train_file, tag2id)
    print("train data size: ", len(train_data))
    print("train label size: ", len(train_label))
    print("label dict: ", tag2id)
    dev_data, dev_label, tag2id = load_data(dev_file, tag2id)
    print("dev data size: ", len(dev_data))
    print("dev label size: ", len(dev_label))
    print("label dict: ", tag2id)
    # save tag2id
    save_dict(tag2id, tag2id_path)
    # label encoder
    train_label = label_encoder(train_label, tag2id)
    print("train label: ", train_label[:3])
    dev_label = label_encoder(dev_label, tag2id)
    print("dev label: ", dev_label[:3])
    # get tokenizer
    tokenizer = get_tokenizer()
    # 准备模型数据
    train_x, train_y = create_inputs_targets(train_data, train_label, tag2id, max_len, tokenizer)
    print("train data tokenizer: ", train_x[:3])
    dev_x, dev_y = create_inputs_targets(dev_data, dev_label, tag2id, max_len, tokenizer)
    print("dev data tokenizer: ", dev_x[:3])

    # create model
    model = create_model(len(tag2id), max_len)
    model.summary()
    history = model.fit(train_x,
                        train_y,
                        epochs=epoch,
                        verbose=1,
                        batch_size=batch_size,
                        validation_data=(dev_x, dev_y),
                        validation_batch_size=batch_size
                        )   # , validation_split=0.1

    # model save
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    model_file = os.path.join(output_path, "ner_model.h5")
    model.save_weights(model_file, overwrite=True)

    # save pb model
    tf.keras.models.save_model(model, pb_path, save_format="tf")
    pred = model.predict(train_x, batch_size=batch_size)
    print("pred shape: ", pred.shape)

模型预测

按照上面的方式构建模型输入的特征,进行模型训练。

def predict(test_data, max_len, tag2id):
    tokenizer = get_tokenizer()
    test_x, len_list = create_infer_inputs(test_data, max_len, tokenizer)
    print("test data tokenizer: ", test_x[:3])
    model = create_model(len(tag2id), max_len)
    model.load_weights("./output/ner_model.h5")
    pred_logits = model.predict(test_x)
    id2tag = {value: key for key, value in tag2id.items()}
    # shape [batch_size, seq_len]
    pred = np.argmax(pred_logits, axis=2).tolist()
    predict_label = []
    for i in range(len(len_list)):
        temp = []
        temp_pred = pred[i]
        for j in range(min(len_list[i], max_len)):
            temp.append(id2tag[temp_pred[j]])
        predict_label.append(temp)
    print("predict label: ", predict_label)
    return predict_label

Flask服务

利用Flask提供http服务,方便调用。

def bert_ner_infer():
    params = json.loads(request.get_data(), encoding="utf-8")
    text = params["text"]
    url = params["url"]  # tensorflow serving 地址
    x, len_list = create_infer_inputs(text, max_len, tokenizer)
    print("len_list: ", len_list)
    input_ids = x[0].tolist()
    token_type_ids = x[1].tolist()
    attention_mask = x[2].tolist()
    data = json.dumps({"signature_name": "serving_default",
                       "inputs": {"input_ids": input_ids,
                                  "token_type_ids": token_type_ids,
                                  "attention_mask": attention_mask}})
    headers = {"content-type": "application/json"}
    result = requests.post(url, data=data, headers=headers)
    result = json.loads(result.text)
    pred_logits = result["outputs"][0]
    pred = np.argmax(pred_logits, axis=1).tolist()
    print("pred: ", pred)
    predict_label = []
    for j in range(min(len_list[0], max_len)):
        predict_label.append(id2tag[pred[j]])

    return_result = {"predict": predict_label}
    return jsonify(return_result)

以上是利用transformers实现的命名实体识别。如有问题,欢迎指正。

 类似资料: