『tools-1』HuggingFace
HuggingFace 基础组件
pipeline
查看 pipeline 支持的所有任务类型
1
2
3
4from transformers.pipelines import SUPPORTED_TASKS
for k, v in SUPPORTED_TASKS.items():
print(k, v)
# k-v: NLP任务名称-任务实现创建并调用 pipeline
1
2
3
4
5
6
7
8
9# 直接指定任务类型+模型仓库
pipe = pipeline("<task_name>", model="<repository>")
# 加载预训练模型+分词器
model = AutoModel.from_pretrained("<repository>")
tokenizer = AutoTokenizer.from_pretrained("<repository>")
pipe = pipeline("<task_name>", model=model, tokenizer=tokenizer)
pipe("<some_sentence>")注意:加载预训练方式必须同时指定模型和分词器
指定模型加载到显卡上:默认在CPU上
1
2# 指定GPU的数字编号
pipe = pipeline(..., device=0)查看 pipeline 的调用实例:点进入 pipe 类实例,查看__call__方法
tokenizer
加载与保存分词器
1
2
3
4
5
6# 加载预训练分词器
tokenizer = AutoTokenizer.from_pretrained("<repository>")
# 保存分词器到本地
tokenizer.save_pretrained("<local_path>")
# 之后可以直接从 <local_path> 所在位置加载分词器 (无需通过远程<repository>加载)查看词典
1
2
3
4
5# 打印词典全部内容
print(tokenizer.vocab)
# 打印词典大小
print(tokenizer.vocab_size)编码与解码:调用 encode/decode 方法,或直接调用 tokenizer 的 __call__ 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15sen = "<some_sentence>"
常规方法
ids = tokenizer.encode(sen)
words = tokenizer.decode(ids)
# 将编码后的句子填充到指定长度
ids = tokenizer.encode(sen, padding="max_length", max_length=20)
# 将编码后的句子截断为指定长度
ids = tokenizer.decode(sen, max_length=5, truncation=True)
###### 对一条语句编码, 可输出含 ids, type_id 和 attention_masked 的字典 ######
ids = tokenizer(sen, return_tensors="pt", (padding, truncation...))
###### 对批次语句编码 ######
sens = ["<sen_1>", ..., "<sens_n>"]
res = tokenizer(sens, ...)注意:上述批次处理 比 for循环处理多条语句更高效
快慢分词器:快分词器基于 Rust 实现,慢分词器基于 Python 实现;默认使用快分词器
1
2
3
4
5
6# 快分词器支持分析拆分得到的"子词"
# 快分词器支持 return_offsets_mapping, 如"jumping"实际被拆分为"jump"和"ing",表示为[(0,0),(0,4),(4,7),(0,0)]
inputs = fast_tokenizer(sen, return_offsets_mapping=True)
# 快分词器支持 word_ids() 方法,如"i eating"实际被拆分为"i"和"eat""ing",表示为[None,0,1,1,None]
inputs.word_ids() # = [None, ..., <original_wordidx_for_subword>, ..., None]
model
任务头:纯 AutoModel 只输出编码结果(不带任务头),AutoModelForXXX 指带有不同任务头的模型
模型类型:Encoder,Decoder,Encoder-Decoder
- Encoder:BERT系列,文本分类 + 阅读理解;计算一个 token 可见完整上下文
- Decoder:GPT系列、LLaMa系列,文本生成;计算一个 token 仅可见上文
- Encoder-Decoder:GLM、BART,文本摘要 + 机器翻译
获取模型参数:
1
2
3
4
5
6
7# 获得部分参数
print(model.config)
# 获取并修改完整参数
from transformers import AutoConfig
config = AutoConfig.from_pretrained("<model_name>")
config.xxx模型调用
1
2
3
4
5inputs = tokenizer(sen, return_tensors="pt") # 要输入模型,所以要返回tensor
output = model(**input) # 注意传入的是Dict,要用**解包
output.logits # 取最后一层分数
output.loss # 训练时取出loss.backward()
datasets
加载数据集:使用 load_dataset 方法
1
2
3
4
5
6
7
8
9from datasets import load_dataset
# 返回一个DatasetDict, 即 DatasetDict({train: Dataset, validation: Dataset})
dataset = load_dataset("<repository>")
# 加载数据集中的特定任务子集
dataset = load_dataset("<repository>", "<subset>")
# 仅加载训练集的 m~n 行
dataset = load_dataset("<repository>", spilt="validate[<m>:<n>]")
# 支持 百分比 加载
dataset = load_dataset("<repository>", split=["train[:50%]", "validation[:20%]"])查看数据集:
1
2
3
4
5
6
7
8
9# 查看训练集中的前2条, 返回一个字典, 即 {field_1: [x11, x12], ..., field_n: [xn1, xn2]}
print(dataset["train"][:2])
# 查看训练集中"field"字段的前5条, 返回一个列表,即 [x0, ..., x4]
print(dataset["train"]["field"][:5])
# 查看训练集中的各字段名, 返回一个列表
print(dataset["train"].colume_names)
# 查看训练集中各字段信息, 返回一个字典
print(dataset["train"].features)数据集划分:使用 dataset 的 train_test_split 方法
1
2
3
4
5train_set = dataset["train"]
# 指定训练集比例
train_set.train_test_split(test_size=0.1)
# 也可以按标签类别划分, 分层抽样保证数据集划分均衡
train_set.train_test_split(test_size=0.1, stratify_by_column="label")数据选取与过滤:使用 select 和 filter 方法
1
2
3
4
5# 选取训练集中特定索引的样本,仍返回一个Dataset
dataset["train"].select([0, 2, 5]) # 选取训练集第0, 第2和第5个样本
# 保留训练集中满足一定条件的样本,仍返回一个Dataset
dataset["train"].filter(lambda example: <bool expr on example>) # 对每个example做判断, True则保留数据集映射:使用 map 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15tokenizer = AutoTokenizer.from_pretrained("xxx")
# 对数据集中的一个样本做预处理
def preprocess(example):
content = tokenizer(example["content"], ...) # 分词
label = tokenzier(example["label"], ...) # 分词
model_inputs["content"], model_inputs["label"] = content, label
return model_inputs
dataset.map(preprocess) # 可同时处理 训练集 和 测试集
# 加速1: 快速分词器支持批处理
dataset.map(preprocess, batched=True)
# 加速2: 支持多进程处理
dataset.map(preprocess, num_proc=4)
# 可以移除不需要的字段
dataset.map(preprocess, remove_columns=<list_of_fields>)注意:使用多进程时,需要将 tokenizer 一并传入 "preprocess" 函数(多进程各资源不共享)
保存数据与加载数据:一对方法
1
2
3
4
5# 保存至本地
dataset.save_to_disk("<local_path>")
# 从本地加载
dataset.load_from_disk("<local_path>")加载自定义生数据集:仍使用 load_dataset 接口
1
2
3
4
5
6
7# 第一个字段可填文件类型
dataset = load_dataset("csv", data_files=["data.csv", ...])
# 从 pandas 加载
import pandas as pd
data = pd.read_csv("./data.csv")
dataset = Dataset.from_pandas(data) # 调用Dataset类的from_pandas方法编写加载脚本:用于自动构建数据集,需要实现三个抽象方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42# load_script.py
import datasets
from datasets import DownloadManager, DatasetInfo
class DATASET2024(datasets.GeneratorBasedBuilder):
# 实现 _info, 定义数据集和字段信息
def _info(self) -> DatasetInfo:
return datasets.DatasetInfo(
# 数据集描述
description="DATASET 2024",
# 数据集字段
features=datasets.Features({
"<field_name>": <type>
})
"""
这里的<type>可以为
datasets.Value("string"), datasets.Value("int32") 等
或 复合类型
datasets.features.Sequence({"<field>": <type>, ... }) 等
"""
)
# 实现 _split_generators, 定义数据划分方式
def _split_generators(self, dl_manager: DownloadManager):
return [datasets.SplitGenerator(name=datasets.Split.TRAIN,
gen_kwargs={"filepath": <local_path_to_train>}),
datasets.SplitGenerator(name=datasets.Split.TEST,
gen_kwargs={"filepath": <local_path_to_test>})]
# 实现 _generate_examples, 接收上面传入的filepath, 产生一个样本
def _generate_examples(self, filepath):
with open(filepath) as f:
<for-loop>
id = <get_id>
data = <get_data_from_f>
# 必须有id, 类型可以是整数或字符串
yield id, {
"field": <data>,
...
}
###### 注意这里的 field 必须和 _info 中定义的字段名相同
<for-loop>之后直接调用 load_dataset("./load_script.py") 即可载入自定义数据集
evaluate
加载并查看评估函数
1
2
3
4
5
6
7
8
9import evaluate
# 加载评估函数
eval_func = evaluate.load("<eval_name>") # 输入评估函数名, 如"accuracy"
# 查看评估函数 简单描述
print(eval_func.description)
# 查看评估函数 具体文档
print(eval_func.inputs_description)指标计算
1
2
3
4
5
6
7
8
9
10
11# 单指标全局计算, 返回字典{"eval": value}
eval_func.compute(predictions=<list_of_pred>, reference=<list_of_ref>)
# 单指标迭代计算
for ref, pred in zip(<list_of_ref>, <list_of_pred>):
eval_func.add(reference=ref, prediction=pred)
eval_func.compute()
# 多指标计算, 返回字典{"eval1": value1, "eval2": value2, ...}
metrics = evaluate.combine(["accuracy", "f1", "recall", "precision"])
metrics.compute(predictions=<list_of_pred>, reference=<list_of_ref>)注意:可以在 huggingface 中的 task 文档里查询,针对特定任务可采用哪些 metric
Trainer
创建 trainer:Trainer 自身集成了优化器
1
2
3
4
5
6
7
8
9
10
11
12
13
14# 构建训练参数, 可指定很多参数
args = TrainingArguments(...)
# 创建训练器
trainer = Trainer(
model=model,
args=args,
train_dataset=dataset["train"],
eval_dataset=dataset["test"],
# 相当于tokenizer指定padding="max_length", 以及max_length
data_collator=DataCollatorWithPadding(tokenizer=tokenzier),
compute_metrics=metrics,
...
)训练流程:
1
2
3
4
5
6# 启动训练
trainer.train()
# 效果评估
trainer.evaluate() # 可传入自定义验证集
# 模型预测
trainer.predict(dataset["test"])