python == 3.8.0
allennlp == 2.4.0
pip install allennlp -i https://pypi.tuna.tsinghua.edu.cn/simple
注意
:在使用大数据进行训练的时候使用lazy模式是极其重要的,但是记得使用lazy模式之前需要进行数据的按照label的分布进行总体数据的shuffle,使得在训练的数据整体上是分布均匀的。这个可以使用sklearn实现,参考这里
这里需要知道默认的dataset_loader是:multiprocess_data_loader.py
而这个里面需要指定:"max_instances_in_memory": 80
, 才能使用lazy
在jsonnet中配置:
"data_loader": {
"batch_size": 8,
"max_instances_in_memory": 80,
"cuda_device": 0,
"shuffle": true
},
跟lazy同样道理的是 vocab的构建:由于训练数据比较大的时候,在每一次训练时从头构建vocab(需要完整遍历一边所有的数据集)是比较耗时的,所以我们这里手动构建一个vocab,然后在每次修改模型,训练模型的时候直接load就行了。
1、在jsonnet中设置:
"datasets_for_vocab_creation": ["train","validation"],
2、提前使用build-vocab命令生成vocab.tar.gz
allennlp build-vocab scripts/my_text_classifier.jsonnet data/vocab.tar.gz --include-package my_text_classifier
我们的生成的位置在:data/vocab.tar.gz
这里也可以使用自己的脚本生成,然后仿照vocab.tar.gz
填入就行了,结构如下:
-- vocab.tar.gz
|___ labels.txt
|___ non_padded_namespaces.txt
|___ tokens.txt
3、只有生成之后才能在jsonnet中删除"datasets_for_vocab_creation": ["train","test"],
然后再添加:
"vocabulary":{
"type": "from_files",
"directory": "data/vocab_model.tar.gz"
},
这样之后的训练就会直接加载这个vocab了,不会浪费更多的时间。
需要在自定义的predictor中重写这个方法:
@overrides
def load_line(self, line: str) -> JsonDict:
"""
If your inputs are not in JSON-lines format (e.g. you have a CSV)
you can override this function to parse them correctly.
"""
return {"sentence": line}
需要在自定义的predictor中重写这个方法:
@overrides
def predict_batch_json(self, inputs: List[JsonDict]) -> List[JsonDict]:
instances = self._batch_json_to_instances(inputs)
outputs = self.predict_batch_instance(instances)
outputs = [{"paperid": i["sentence"].split("\t")[0], "categories":j["label"]} for i, j in zip(inputs, outputs)]
return outputs
需要在自定义的predictor中重写这个方法:
@overrides
def dump_line(self, outputs: JsonDict) -> str:
"""
If you don't want your outputs in JSON-lines format
you can override this function to output them differently.
"""
if "paperid" in outputs:
return str(outputs["paperid"]) + "," + str(outputs["categories"]) + "\n"
else:
# 这种情况是最后不在batch中的需要单独预测
return str(self.line.split("\t")[0]) + "," + str(outputs["label"]) + "\n"
还需要再自定义模型中重写这个方法:
def make_output_human_readable(
self, output_dict: Dict[str, torch.Tensor]
) -> Dict[str, torch.Tensor]:
label = torch.argmax(output_dict["probs"], dim=1)
label = [self.vocab.get_token_from_index(int(i), "labels") for i in label]
output_dict["label"] = label
return output_dict
allennlp predict checkpoint/model.tar.gz data/paper_classification/test.csv --output-file data/paper_classification/predict_result.csv --include-package my_text_classifier --predictor sentence_classifier --batch-size 8 --silent
注意:只有在predictor中重写了predict_batch_json
方法,同时在预测命令中指定:--batch-size 8
,才能batch 预测。
nohup allennlp predict /home/featurize/data/checkpoint/model/model.tar.gz /home/featurize/data/test.csv
--output-file /home/featurize/data/predict_result.csv --include-package my_text_classifier
--predictor sentence_classifier --batch-size 16 --cuda-device 0 --silent &
注意:只有指定--cuda-device 0
才能在预测阶段使用gpu。
之前指定训练一个epoch,训练完成之后修改jsonnet,想接着训练更多的epoch。
分两种情况:
直接修改scripts/train.jsonnet
和checkpoint/config.jsonnet
中的num_epochs
,然后训练命令改为:
nohup allennlp train scripts/my_text_classifier_robert_gpu.jsonnet --serialization-dir /home/featurize/data/checkpoint/model --include-package my_text_classifier --recover &
注意:继续训练的命令是后面跟--recover
注意:因为这里是 recover ,所以千万不能跟 -f,要不然会把checkpoint 的内容删除掉
model.tar.gz
的情况下:1、修改tain.jsonnet
中的model,同时修改自己想要训练的num_epochs
:
"model": {
"type": "from_archive",
"archive_file": "/home/featurize/data/model.tar.gz"
},
2、继续训练:
nohup allennlp train scripts/robert_continue_train.jsonnet --serialization-dir /home/featurize/data/checkpoint_continue/ --include-package my_text_classifier &
注意:这个的前提是之前训练的moderl.tar.gz
中的config.json
中的 “model” 是自定义的而不是从archive中得到的,如果同样也是从archive中得到的,那么需要修改config.json中的“model”:
mkdir model
# 解压命令
tar -xvf model.tar.gz -C model/
cd model
vi config.jsonnet
# 修改完之后压缩命令
tar -zcvf model.tar.gz config.json meta.json vocabulary weights.th
由于默认情况下的jsonnet
没有注册Checkpointer
,所以需要我们注册一个,然后把save_every_num_seconds
作为传参通过jsonnet
传进去。
1、重写一个 checkpointer,目的是可以在jsonnet中使用
from typing import Optional, Union
import os
from allennlp.training.checkpointer import Checkpointer
@Checkpointer.register("simple_checkpointer")
class SimpleCheckpointer(Checkpointer):
def __init__(
self,
serialization_dir: Union[str, os.PathLike],
save_every_num_seconds: Optional[float] = None):
super().__init__(serialization_dir)
self._save_every_num_seconds = save_every_num_seconds
self._serialization_dir = str(serialization_dir)
save_every_num_seconds
: "trainer": {
"checkpointer":{
"type": "simple_checkpointer",
"serialization_dir":"checkpoint",
"save_every_num_seconds": 1200
},
}
这里要考虑两个因素:一个是训练阶段读取训练数据需要使用_read()
和text_to_instance()
,一个是预测阶段读取测试数据需要使用text_to_instance()
,所以text_to_instance()
的label默认要设置为None。
def text_to_instance(self, title: str, abstract: str, label: str = None) -> Instance:
tokens_title = self.tokenizer.tokenize(title)
tokens_abstract = self.tokenizer.tokenize(abstract)
if self.max_tokens:
tokens_title = tokens_title[: self.max_tokens]
tokens_abstract = tokens_abstract[: self.max_tokens]
text_field_title = TextField(tokens_title, self.token_indexers)
text_field_abstract = TextField(tokens_abstract, self.token_indexers)
# 4、同时加入两个就是两个inputs text了
fields = {"title": text_field_title, "abstract": text_field_abstract}
# 3、如果含有label的话,就加入,说明是训练;没有的话不加入,说明是测试
if label:
fields["label"] = LabelField(label)
return Instance(fields)
def _read(self, file_path: str) -> Iterable[Instance]:
with open(file_path, "r") as lines:
for line in lines:
line = line.strip().split("\t")
# 1、这里判断输入的train(含有label)还是test(不含label)
if len(line) == 4:
paperid, title, abstract, categories = line
else:
paperid, title, abstract = line
categories = None
# 2、这里判断是不是表头,如果是就略过
if paperid == "paperid":
continue
yield self.text_to_instance(title, abstract, categories)
def text_to_instance(self, text_a: str, text_b: str, label: str = None) -> Instance:
# 80% of the text_a length in the training set is less than 256, 512 - 256 = 256.
tokens_a = self.tokenizer.tokenize(text_a)[:self.max_tokens//2]
tokens_b = self.tokenizer.tokenize(text_b)[:self.max_tokens-len(tokens_a)]
# 4、text_a+text_b 中间是sep 同时输入 bert
tokens = self.tokenizer.add_special_tokens(tokens_a[1:-1], tokens_b[1:-1])
text_field = TextField(tokens, self.token_indexers)
fields = {"text": text_field}
# 3、如果含有label的话,就加入,说明是训练;没有的话不加入,说明是测试
if label:
fields["label"] = LabelField(label)
return Instance(fields)
def _read(self, file_path: str) -> Iterable[Instance]:
with open(file_path, "r", encoding="utf-8") as lines:
for line in lines:
line = line.strip().split("\t")
# 1、这里判断输入的train(含有label)还是test(不含label)
if len(line) == 3:
text_a, text_b, categories = line
else:
text_a, text_b, = line
categories = None
if text_a == "text_a":
continue
yield self.text_to_instance(text_a, text_b, categories)
1、我们在做不同任务的时候需要指定不同的衡量指标,比如分类任务有acc
和f1
:那么在编写自定义模型的时候要添加一下:
from allennlp.training.metrics import CategoricalAccuracy, FBetaMeasure
...
def __init__(
self, vocab: Vocabulary, embedder: TextFieldEmbedder, encoder: Seq2VecEncoder
):
super().__init__(vocab)
...
self.accuracy = CategoricalAccuracy()
self.accuracy_2 = FBetaMeasure(average="macro")
...
def get_metrics(self, reset: bool = False) -> Dict[str, float]:
return {"accuracy": self.accuracy.get_metric(reset),
"f1": self.accuracy_2.get_metric(reset)["fscore"]}
2、在选择best model的时候,我们同样叶需要指定按照哪个指标来选择,默认是loss指标,如果想要其他指标需要在jsonnet
中指定:
"trainer": {
"validation_metric": "+f1",
...
}
"data_loader": {
"batch_sampler": {
"type": "bucket",
"batch_size": 16
},
"cuda_device": 0,
"max_instances_in_memory": 1600
},
当连续两个epoch的accuracy没有提升,就early stop
"trainer": {
"validation_metric": ["+accuracy","-loss"],
"patience": 2,
"optimizer": {
"type": "huggingface_adamw",
"lr": 2.0e-5
},
"num_epochs": 5
}
1、在jsonnet中设置callbacks
"trainer": {
"validation_metric": ["+accuracy","-loss"],
"patience": 2,
"callbacks":["tensorboard"],
"optimizer": {
"type": "huggingface_adamw",
"lr": 1.0e-5
},
"num_epochs": 5
}
2、环境安装tensorboard,然后启动服务,并指定logdir: checkpoint/log
pip install tensorboard
tensorboard --logdir=checkpoint/log