/ NLP, DEEPLEARNING

分布式训练推理Accelerate

分布式训练的加速策略详情可见本人上篇文章《 大语言模型的应用及训练》,一般有三种,分别是数据并行,流水线式并行和张量并行,本文主推使用hugging face的accelerate库来进行模型分布式的训练和推理。

一. torch常用的分布式训练工具

1.1 DataParallel

DP(DataParallel):实现数据并行方式的分布式训练,采用的是PS(worker-server)模式,不推荐

  • 单进程多线程
  • 只能在单机上使用
  • 训练速度慢,且由于是PS模式(存在负载不均衡的问题),随着worker的个数增多,训练速度越慢

1.2 DistributedDataParallel

DDP(DistributedDataParallel):实现数据并行方式的分布式训练,采用的是ring-all-reduce模式。它将模型复制到每个 GPU 上 ,同时复制了每个dataloader,并且当 loss.backward() 被调用进行反向传播的时候,所有这些模型副本的梯度将被同步地平均/下降 (reduce)。这确保每个设备在执行优化器步骤后具有相同的权重。

  • 多进程

  • 支持多机多卡

  • 训练速度较DP快,ring-all-reduce模式下,所有worker只和自己相邻的两个worker进行通信

1.3 amp

apex由英伟达开发了一个支持半精度自动训练的pytorch拓展插件。Apex 对混合精度训练的过程进行了封装,从而大幅度降低显存占用,节约运算时间。torch 原生支持的amp,pytorch的版本一定>1.6。不用额外再装apex,所以推荐使用这种方式使用amp

from torch.cuda.amp import autocast as autocast, GradScaler

# 在训练最开始之前实例化一个GradScaler对象
scaler = GradScaler()

for input, target in tqdm(loader):
        # 前向过程(model + loss)开启 autocast
        with autocast():
            output = model(input)
            loss = loss_fn(output, target)

        # Scales loss,这是因为半精度的数值范围有限,因此需要用它放大
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

二. Accelerate分布式训练和推理

Accelerate:在无需大幅修改代码的情况下完成并行化。同时还支持DeepSpeed的多种ZeRO策略,简直不要太爽。

  • 代码效率高,支持无论是单机单卡还是多机多卡适配同一套代码。

  • 允许在单个实例上训练更大的数据集:Accelerate 还可以使 DataLoaders 更高效。这是通过自定义采样器实现的,它可以在训练期间自动将部分批次发送到不同的设备,从而允许每个设备只需要储存数据的一部分,而不是一次将数据复制四份存入内存。

  • 支持DeepSpeed:无需更改代码,只用配置文件即可对DeepSpeed开箱即用。

2.1 分布式推理

当模型参数大到单张卡放不下时候(哪怕batchsize为1也会报显存不足的情况),这里就需要将大模型中的不同layer放到不同的GPU上,而每个GPU只负责其中一部分训练,当然数据在不同卡上流转的时候,都需要自动将数据放到对应的卡上。

from accelerate import dispatch_model

# device_map设置为自动的最方便了,不用自己设计把模型的layer分配到哪个GPU
model = dispatch_model(model, device_map="auto")

# 打印device_map
print(model.hf_device_map)
print(f'memory_allocated {torch.cuda.memory_allocated()}')

2.2 分布式训练

下面是官方的例子,只需要更新几行代码即可开启分布式训练之旅啦!

  • accelerator = Accelerator()实例化

  • accelerator.prepare把我们的模型、数据、优化器等等都放进accelerate里面,让他帮我们操作

  • accelerator.backward(loss)替换掉常用的loss.backword()

  • 如果需要梯度裁剪,这里必须使用accelerator.clip_grad_norm_()

  • 涉及到模型存储的时候,需要unwrap 下模型,因为在通过 prepare() 方法时,模型可能被 wrap 从而用于分布式训练

  import torch
  import torch.nn.functional as F
  from datasets import load_dataset
+ from accelerate import Accelerator

+ accelerator = Accelerator()
- device = 'cpu'
+ device = accelerator.device

  model = torch.nn.Transformer().to(device)
  optimizer = torch.optim.Adam(model.parameters())

  dataset = load_dataset('my_dataset')
  data = torch.utils.data.DataLoader(dataset, shuffle=True)
 
+ model, optimizer, data = accelerator.prepare(model, optimizer, data)

  model.train()
  for epoch in range(10):
      for source, targets in data:
          source = source.to(device)
          targets = targets.to(device)

          optimizer.zero_grad()

          output = model(source)
          loss = F.cross_entropy(output, targets)

-         loss.backward()
+         accelerator.backward(loss)

          optimizer.step()
  
   # 等待所有进程达到一定程度后再执行指令
   accelerator.wait_for_everyone()
   # 只在主进程中保存模型
   if self._accelerator.is_main_process:
         + unwrapped_model = accelerator.unwrap_model(model)
         + accelerator.save(unwrapped_model,model_path)
         - torch.save(unwrapped_model.state_dict, "./model/accelerate.pt")

之后,只需要配置下accelerate的config文件,使用accelerate launch --config_file default_config.yaml train.py启动脚本开始训练啦!

三. accelearte使用例子

3.1 单机多卡

下面是以单机多卡(1机器共2卡)的config.yaml例子,这里是我根据accelerate config 生成后的config文件:

compute_environment: LOCAL_MACHINE
distributed_type: MULTI_GPU
downcast_bf16: 'no'
gpu_ids: all
mixed_precision: 'no'
num_machines: 1
num_processes: 2
rdzv_backend: static
same_network: false
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false

3.2 多机多卡

下面是以多机多卡(2机器共4卡)的config.yaml例子,这里是我根据accelerate config 生成后的config文件:

compute_environment: LOCAL_MACHINE
distributed_type: MULTI_GPU
downcast_bf16: 'no'
gpu_ids: all
machine_rank: 0
main_process_ip: 主机器的ip
main_process_port: 端口号
main_training_function: main
mixed_precision: 'no'
num_machines: 2
num_processes: 4
rdzv_backend: static
same_network: false
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false

3.3 DeepSpeed集成在accelerate上的使用

Deepspeed集成了ZeRO的三种方式,在accelerate中,我们可以直接根据配置文件来选择使用:

  • Stage 1:将 optimizer states 分片到数据并行 workers/GPUs 上。
  • Stage 2:将 optimizer states + gradients 分片到数据并行 workers/GPUs 上。
  • Stage 3 :将 optimizer states + gradients + model parameters 分片到数据并行 workers/GPUs 上。
  • Optimizer Offload:将 optimizer states + gradients 卸载到 CPU/Disk ,建立在 ZERO Stage 2 之上。
  • Param Offload:将 model parameters 卸载到 CPU/Disk ,建立在 ZERO Stage 3 之上。

下面是使用stage3的accelerate配置的例子:

compute_environment: LOCAL_MACHINE
deepspeed_config:
  deepspeed_multinode_launcher: standard
  gradient_accumulation_steps: 4
  offload_optimizer_device: none
  offload_param_device: none
  zero3_init_flag: false
  zero3_save_16bit_model: false
  zero_stage: 3
distributed_type: DEEPSPEED
downcast_bf16: 'no'
machine_rank: 0
main_process_ip: 主机器的ip
main_process_port: 端口号
main_training_function: main
mixed_precision: 'no'
num_machines: 2
num_processes: 4
rdzv_backend: static
same_network: false
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false

在从机器上我们只需要复制这个config文件和所有的代码数据,并把config.yaml中的machine_rank改成1即可。

两台机器同时启动脚本:accelerate launch --config_file default_config.yaml train.py