现在位置: 首页 > PyTorch 教程 > 正文

PyTorch 分布式训练

当模型越来越大、数据越来越多时,单 GPU 已经无法满足训练需求。

分布式训练通过多 GPU 甚至多台机器并行计算,可以显著缩短训练时间。

本节详细介绍 PyTorch 中的分布式训练技术,包括 DataParallel、DistributedDataParallel 以及混合精度分布式训练。


1. 分布式训练基础

1.1 为什么需要分布式训练

深度学习模型的规模逐年增长,训练时间也随之增加。分布式训练的核心目标是:将计算任务分散到多个计算单元上,在可接受的时间内完成大规模模型和数据的训练

分布式训练主要解决两个问题:

  • 显存不足:大模型的参数、优化器状态、梯度需要大量显存
  • 训练时间过长:单 GPU 训练大模型可能需要数周甚至数月

1.2 分布式训练的两大模式

分布式训练主要分为两种模式:

模式 原理 适用场景
数据并行(Data Parallel) 每个计算单元保存完整模型,不同计算单元处理不同数据 模型能够装入单卡显存;数据量大,需要加速训练;小团队,机器资源有限
模型并行(Model Parallel) 将模型拆分到多个计算单元,每个计算单元只保存部分模型 模型超大,单卡放不下;专用集群,多机训练

实际生产环境中,数据并行是最常用的方式。PyTorch 提供了 DataParallel 和 DistributedDataParallel 两种实现。


2. DataParallel 使用指南

2.1 单机多卡:DataParallel

DataParallel(DP) 是 PyTorch 提供的最简便的多 GPU 训练方式。它只需要修改几行代码,就可以利用单机上的多个 GPU 进行训练。

实例

import torch
import torch.nn as nn
import torch.optim as optim

# ── 模型定义 ──────────────────────────────────────
class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, 10)
        )

    def forward(self, x):
        return self.net(x)


# 检查可用 GPU 数量
print(f"可用 GPU 数量: {torch.cuda.device_count()}")

# 创建模型并移到 GPU
model = SimpleModel()

# ── 方式一:使用 DataParallel(最简方式)───────
if torch.cuda.device_count() > 1:
    model = nn.DataParallel(model)

# 移动到 GPU(如果使用 DataParallel,device_ids 会自动处理)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# ── 训练循环 ──────────────────────────────────────
def train_epoch_dp(model, loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    for inputs, labels in loader:
        inputs = inputs.to(device, non_blocking=True)
        labels = labels.to(device, non_blocking=True)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * inputs.size(0)
        _, predicted = outputs.max(1)
        correct += predicted.eq(labels).sum().item()
        total += labels.size(0)

    return total_loss / total, correct / total


# 模拟数据
train_loader = [
    (torch.randn(32, 128), torch.randint(0, 10, (32,))) for _ in range(10)
]

# 开始训练
for epoch in range(3):
    loss, acc = train_epoch_dp(model, train_loader, criterion, optimizer, device)
    print(f"Epoch {epoch+1}: Loss={loss:.4f}, Acc={acc:.4f}")

print("DataParallel 训练完成!")

2.2 DataParallel 的工作原理

DataParallel 的工作流程如下:

  1. 模型复制:将模型复制到每个 GPU 上
  2. 数据分发:将 batch 数据均匀分配到各个 GPU
  3. 并行计算:每个 GPU 独立计算前向传播
  4. 梯度聚合:将所有 GPU 的梯度汇总到主 GPU
  5. 参数更新:主 GPU 更新参数后同步到其他 GPU

实例

# DataParallel 关键参数说明
model = nn.DataParallel(
    module,                    # 要并行的模型(必填)
    device_ids=[0, 1, 2, 3],   # 使用的 GPU 设备 ID,默认使用所有卡
    output_device=0,            # 输出结果汇总到的设备,默认是第一张卡
    dim=0                      # 数据划分的维度,默认是 batch 维度
)

# 查看当前模型所在的设备
print(f"模型所在设备: {next(model.parameters()).device}")

# 查看实际使用的设备
print(f"使用的 GPU 数量: {model.device_ids if hasattr(model, 'device_ids') else 'N/A'}")

DataParallel 简单易用,但存在两个主要问题:1)主 GPU 显存压力大(需要汇总梯度);2)GPU 之间的通信效率较低。因此,官方推荐使用 DistributedDataParallel。


3. DistributedDataParallel 详解

3.1 DDP 基础

DistributedDataParallel(DDP) 是 PyTorch 推荐的分布式训练方式。相比 DataParallel,DDP 具有以下优势:

  • 每个 GPU 独立计算梯度,无需在主 GPU 汇总
  • 使用高效的梯度同步算法(Ring AllReduce)
  • 支持多机多卡训练
  • 训练速度更快,显存利用更高效

3.2 DDP 单机多卡训练

实例

import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import os

# ── 初始化分布式环境 ────────────────────────────
def setup(rank, world_size):
    """设置分布式环境"""
    # 设置当前进程使用的 GPU
    torch.cuda.set_device(rank)

    # 初始化进程组
    dist.init_process_group(
        backend="nccl",           # 使用 NCCL 后端(GPU 推荐)
        init_method="env://",     # 环境变量初始化方式
        world_size=world_size,    # 总进程数
        rank=rank                 # 当前进程排名
    )


def cleanup():
    """清理分布式环境"""
    dist.destroy_process_group()


# ── 数据加载器(支持分布式)───────────────────────
def get_distributed_loader(batch_size, world_size):
    """创建支持分布式的数据加载器"""
    from torch.utils.data import DataLoader, DistributedSampler

    # 模拟数据集
    dataset = torch.utils.data.TensorDataset(
        torch.randn(100, 3, 32, 32),
        torch.randint(0, 10, (100,))
    )

    # DistributedSampler 会自动划分数据
    sampler = DistributedSampler(
        dataset,
        num_replicas=world_size,
        rank=rank,
        shuffle=True
    )

    loader = DataLoader(
        dataset,
        batch_size=batch_size,
        sampler=sampler,
        num_workers=2,
        pin_memory=True
    )

    return loader


# ── 模型定义 ──────────────────────────────────────
class ImageClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten()
        )
        self.classifier = nn.Linear(64, 10)

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x


# ── DDP 训练函数 ──────────────────────────────────
def train_ddp(rank, world_size, epochs=3):
    """分布式训练主函数"""
    # 初始化
    setup(rank, world_size)

    # 创建模型并移到对应 GPU
    model = ImageClassifier().cuda(rank)

    # 包装为 DDP 模型
    ddp_model = DDP(
        model,
        device_ids=[rank],        # 指定当前进程使用的设备
        output_device=rank,
        find_unused_parameters=False  # 是否检测未使用的参数
    )

    # 损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(ddp_model.parameters(), lr=1e-3)

    # 获取数据加载器
    loader = get_distributed_loader(batch_size=16, world_size=world_size)

    # 训练循环
    for epoch in range(epochs):
        # 每个 epoch 开始时设置 epoch 号(用于打乱数据)
        loader.sampler.set_epoch(epoch)

        for batch_idx, (inputs, labels) in enumerate(loader):
            inputs = inputs.cuda(rank, non_blocking=True)
            labels = labels.cuda(rank, non_blocking=True)

            optimizer.zero_grad()
            outputs = ddp_model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            if rank == 0 and batch_idx % 5 == 0:
                print(f"Epoch {epoch+1}, Batch {batch_idx}, Loss: {loss.item():.4f}")

    # 清理
    if rank == 0:
        print("训练完成!")
    cleanup()


# 注意:实际运行时需要通过命令行参数指定 rank 和 world_size
# 此代码需要在每个进程中独立运行

3.3 启动分布式训练

PyTorch 分布式训练需要通过 torchruntorch.distributed.launch 启动。以下是常用的启动方式:

实例

# 方式一:使用 torchrun(推荐,PyTorch 2.0+)
# 文件:train_ddp.py

"""
使用方式:
torchrun --nproc_per_node=4 train_ddp.py
"
""

# 方式二:使用 python -m(传统方式)
"""
python -m torch.distributed.launch --nproc_per_node=4 train_ddp.py
"
""

# 方式三:多机多卡启动
# 机器 1(主节点)
torchrun --nnodes=2 --nproc_per_node=4 --node_rank=0 --master_addr=192.168.1.1 --master_port=29500 train_ddp.py

# 机器 2(从节点)
torchrun --nnodes=2 --nproc_per_node=4 --node_rank=1 --master_addr=192.168.1.1 --master_port=29500 train_ddp.py

# 参数说明:
# --nproc_per_node: 每个节点的 GPU 数量
# --nnodes: 节点数量
# --node_rank: 当前节点编号(从 0 开始)
# --master_addr: 主节点 IP 地址
# --master_port: 主节点端口号

3.4 完整的 DDP 训练脚本

实例

# 完整的 DDP 训练脚本(保存为 train_ddp.py)
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
import argparse
import os


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", type=int, default=-1, help="由 torchrun 自动传递")
    parser.add_argument("--epochs", type=int, default=10)
    parser.add_argument("--batch_size", type=int, default=32)
    parser.add_argument("--lr", type=float, default=1e-3)
    return parser.parse_args()


class TrainModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(3, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(128, 10)
        )

    def forward(self, x):
        return self.net(x)


def setup(rank, world_size):
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)


def cleanup():
    dist.destroy_process_group()


def train(rank, world_size, args):
    setup(rank, world_size)

    # 创建模型
    model = TrainModel().cuda(rank)
    model = DDP(model, device_ids=[rank])

    # 优化器和损失函数
    optimizer = optim.Adam(model.parameters(), lr=args.lr)
    criterion = nn.CrossEntropyLoss()

    # 数据集
    dataset = torch.utils.data.TensorDataset(
        torch.randn(1000, 3, 32, 32),
        torch.randint(0, 10, (1000,))
    )

    # DistributedSampler 确保每个进程看到不同的数据
    sampler = DistributedSampler(
        dataset,
        num_replicas=world_size,
        rank=rank,
        shuffle=True
    )

    loader = DataLoader(
        dataset,
        batch_size=args.batch_size,
        sampler=sampler,
        num_workers=2,
        pin_memory=True
    )

    # 训练循环
    for epoch in range(args.epochs):
        # 关键:每个 epoch 都要设置 epoch,打乱数据划分
        sampler.set_epoch(epoch)
        model.train()

        epoch_loss = 0
        for inputs, labels in loader:
            inputs = inputs.cuda(rank, non_blocking=True)
            labels = labels.cuda(rank, non_blocking=True)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        # 同步所有进程,确保每个进程都完成当前 epoch
        dist.barrier()

        if rank == 0:
            avg_loss = epoch_loss / len(loader)
            print(f"Epoch {epoch+1}/{args.epochs}, Loss: {avg_loss:.4f}")

    cleanup()


if __name__ == "__main__":
    args = parse_args()

    # 从环境变量获取 world_size
    world_size = int(os.environ["WORLD_SIZE"])

    # 从环境变量获取 rank(由 torchrun 自动设置)
    rank = int(os.environ["RANK"])

    if torch.cuda.is_available():
        train(rank, world_size, args)
    else:
        print("需要 CUDA 环境才能运行分布式训练")

关键点:每个 epoch 都需要调用 sampler.set_epoch(epoch),确保数据划分被打乱,否则多个进程会看到相同的数据。


4. 分布式训练中的混合精度

4.1 DDP + AMP 组合

分布式训练与混合精度训练结合,可以进一步提升训练速度。PyTorch 2.0+ 的 DDP 已经与 AMP 完美集成。

实例

import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.cuda.amp import autocast, GradScaler


class AMPModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(3, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(128, 10)
        )

    def forward(self, x):
        return self.net(x)


def train_ddp_amp(rank, world_size):
    """分布式 + 混合精度训练"""
    # 初始化
    torch.cuda.set_device(rank)
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

    # 创建模型
    model = AMPModel().cuda(rank)
    model = DDP(model, device_ids=[rank])

    # 优化器和 GradScaler
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    scaler = GradScaler()

    criterion = nn.CrossEntropyLoss()

    # 训练循环
    model.train()
    for inputs, labels in loader:
        inputs = inputs.cuda(rank, non_blocking=True)
        labels = labels.cuda(rank, non_blocking=True)

        optimizer.zero_grad()

        # 使用 autocast 自动切换精度
        with autocast(device_type='cuda'):
            outputs = model(inputs)
            loss = criterion(outputs, labels)

        # 使用 scaler 处理梯度缩放
        scaler.scale(loss).backward()

        # 梯度裁剪(需要先 unscale)
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        scaler.step(optimizer)
        scaler.update()

    dist.destroy_process_group()


# 启动命令
# torchrun --nproc_per_node=4 train_ddp_amp.py

4.2 分布式训练中的梯度同步

DDP 使用 Ring AllReduce 算法进行梯度同步,其工作原理如下:

  1. 每个 GPU 将梯度分为 n 份(n 为 GPU 数量)
  2. 每个 GPU 与相邻 GPU 交换一份梯度
  3. 重复 n 次后,每个 GPU 拥有完整的梯度

实例

# DDP 梯度同步相关配置

# 1. 设置梯度 bucket 的缓冲区大小
model = DDP(
    model,
    device_ids=[rank],
    gradient_as_bucket_view=True,  # 节省显存(PyTorch 1.8+)
    broadcast_buffers=False,        # 不广播 buffers,减少通信
    bucket_cap_mb=25               # 梯度 bucket 大小(MB)
)

# 2. 手动同步模型参数(如需要)
# 在某些场景下,需要手动同步模型参数
def sync_params(model):
    for param in model.parameters():
        dist.broadcast(param, src=0)

# 3. 检查梯度同步状态
# DDP 会自动处理梯度同步,无需手动操作
# 但可以通过以下方式检查
for name, param in model.named_parameters():
    if param.grad is not None:
        print(f"{name}: grad requires_grad={param.grad.requires_grad}")

5. 模型并行与流水线

5.1 简单的模型并行

当模型太大,单个 GPU 无法容纳时,需要将模型拆分到多个 GPU 上。

实例

# 模型并行示例:将模型的不同层放到不同 GPU

class ModelParallel(nn.Module):
    def __init__(self, num_gpus=2):
        super().__init__()
        self.num_gpus = num_gpus

        # 第一部分:GPU 0
        self.features1 = nn.Sequential(
            nn.Conv2d(3, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        ).to(f"cuda:0")

        # 第二部分:GPU 1
        self.features2 = nn.Sequential(
            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten()
        ).to(f"cuda:1")

        # 分类器:GPU 0
        self.classifier = nn.Linear(128, 10).to(f"cuda:0")

    def forward(self, x):
        # 数据在不同 GPU 之间传递
        x = x.to("cuda:0")
        x = self.features1(x)
        x = x.to("cuda:1")
        x = self.features2(x)
        x = x.to("cuda:0")
        x = self.classifier(x)
        return x


# 使用
model = ModelParallel(num_gpus=2)
output = model(torch.randn(1, 3, 32, 32))
print(f"输出设备: {output.device}")

5.2 流水线并行(Pipeline Parallelism)

PyTorch 提供了 torch.distributed.pipeline.sync 模块(PP 包)实现流水线并行,将模型按层拆分,形成计算流水线。

实例

# 流水线并行示例(需要安装 torch-pipeline 依赖)
# pip install torch-pipeline

"""
# 注意:PyTorch 原生流水线需要使用特定版本或第三方库
# 这里展示概念,简化实现

from torch.distributed.pipeline.sync import Pipe

class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Linear(128, 256)
        self.layer2 = nn.Linear(256, 256)
        self.layer3 = nn.Linear(256, 10)

    def forward(self, x):
        x = torch.relu(self.layer1(x))
        x = torch.relu(self.layer2(x))
        x = self.layer3(x)
        return x


# 将模型按层拆分到不同 GPU
# layer1 -> GPU 0
# layer2 -> GPU 1
# layer3 -> GPU 2

model = SimpleModel()
model.layer1 = model.layer1.cuda(0)
model.layer2 = model.layer2.cuda(1)
model.layer3 = model.layer3.cuda(2)

# 使用 Pipeline 将模型连接
# 注意:实际使用需要安装对应版本的 pipeline 库
# from torch.distributed.pipeline.sync import Pipe
# model = Pipe(model, chunks=4)
"""

print("流水线并行需要额外安装 torch-pipeline 库")
print("pip install torch-pipeline-sync")

6. 分布式训练最佳实践

6.1 性能优化技巧

优化项 技巧 说明
通信优化 使用 NCCL 后端 NCCL 是 GPU 最佳选择;比 Gloo 更快
数据加载 设置 num_workers 和 pin_memory num_workers 设置为 CPU 核心数;pin_memory 加速 CPU 到 GPU 传输
梯度同步 调整 bucket_cap_mb 大模型可增大 bucket;小模型可减小以提高响应
混合精度 DDP + AMP 减少通信量;提高训练速度

6.2 常见问题与解决方案

问题 原因 解决方案
通信慢 网络带宽不足 使用 InfiniBand 或高速网络
显存不足 batch size 过大 减小 batch size 或使用梯度累积
负载不均衡 模型划分不合理 调整层分布,尽量均匀
同步错误 梯度 NaN 检查梯度裁剪和缩放设置

6.3 完整的多机多卡训练脚本

实例

# 完整的多机多卡训练脚本模板

import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
import argparse
import os


class TrainingConfig:
    def __init__(self):
        self.local_rank = -1
        self.epochs = 30
        self.batch_size = 32
        self.lr = 1e-3
        self.num_workers = 4
        self.gradient_clip = 1.0
        self.use_amp = True


def init_distributed(config):
    """初始化分布式环境"""
    # 从环境变量获取配置
    local_rank = config.local_rank

    # NCCL 配置
    os.environ["NCCL_DEBUG"] = "WARN"

    torch.cuda.set_device(local_rank)
    dist.init_process_group(backend="nccl")

    return local_rank


def create_dataloader(dataset, config, world_size, rank):
    """创建数据加载器"""
    sampler = DistributedSampler(
        dataset,
        num_replicas=world_size,
        rank=rank,
        shuffle=True,
        drop_last=True
    )

    loader = DataLoader(
        dataset,
        batch_size=config.batch_size,
        sampler=sampler,
        num_workers=config.num_workers,
        pin_memory=True,
        persistent_workers=True
    )

    return loader, sampler


def train_epoch(model, loader, criterion, optimizer, scaler, config, rank):
    """训练一个 epoch"""
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    for inputs, labels in loader:
        inputs = inputs.cuda(rank, non_blocking=True)
        labels = labels.cuda(rank, non_blocking=True)

        optimizer.zero_grad()

        if config.use_amp and scaler is not None:
            # 混合精度训练
            with torch.cuda.amp.autocast():
                outputs = model(inputs)
                loss = criterion(outputs, labels)

            scaler.scale(loss).backward()
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), config.gradient_clip)
            scaler.step(optimizer)
            scaler.update()
        else:
            # 普通训练
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), config.gradient_clip)
            optimizer.step()

        total_loss += loss.item() * inputs.size(0)
        _, predicted = outputs.max(1)
        correct += predicted.eq(labels).sum().item()
        total += labels.size(0)

    return total_loss / total, correct / total


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", type=int, default=-1)
    args = parser.parse_args()

    config = TrainingConfig()
    config.local_rank = args.local_rank

    rank = init_distributed(config)
    world_size = dist.get_world_size()

    print(f"进程 {rank}/{world_size} 初始化完成")

    # 创建模型
    model = YourModel().cuda(rank)
    model = DDP(model, device_ids=[rank])

    # 优化器
    optimizer = optim.Adam(model.parameters(), lr=config.lr)

    # GradScaler(如果使用混合精度)
    scaler = GradScaler() if config.use_amp else None

    # 数据集(替换为你的数据集)
    dataset = YourDataset()
    loader, sampler = create_dataloader(dataset, config, world_size, rank)

    criterion = nn.CrossEntropyLoss()

    # 训练循环
    for epoch in range(config.epochs):
        sampler.set_epoch(epoch)

        loss, acc = train_epoch(model, loader, criterion, optimizer, scaler, config, rank)

        # 只在 rank 0 打印
        if rank == 0:
            print(f"Epoch {epoch+1}/{config.epochs}: Loss={loss:.4f}, Acc={acc:.4f}")

    dist.destroy_process_group()


# 启动方式:
# torchrun --nnodes=2 --nproc_per_node=4 --node_rank=0 --master_addr=192.168.1.1 --master_port=29500 train.py
# torchrun --nnodes=2 --nproc_per_node=4 --node_rank=1 --master_addr=192.168.1.1 --master_port=29500 train.py

分布式训练的关键是:1)选择合适的并行模式(数据并行 vs 模型并行);2)使用 DDP 代替 DataParallel 以获得更好的性能;3)结合混合精度训练进一步加速;4)合理设置数据加载器和通信参数。


7. 分布式训练监控与调试

7.1 常用监控工具

实例

# 分布式训练监控代码

def monitor_training(rank, world_size):
    """监控分布式训练状态"""
    # 1. 检查进程组状态
    print(f"Rank: {rank}, World Size: {world_size}")
    print(f"Backend: {dist.get_backend()}")
    print(f"Rank: {dist.get_rank()}")
    print(f"CUDA Device: {torch.cuda.current_device()}")

    # 2. 监控显存使用
    if torch.cuda.is_available():
        for i in range(torch.cuda.device_count()):
            allocated = torch.cuda.memory_allocated(i) / 1024**2
            reserved = torch.cuda.memory_reserved(i) / 1024**2
            print(f"GPU {i}: 已分配 {allocated:.1f} MB, 预留 {reserved:.1f} MB")

    # 3. 监控梯度同步
    # DDP 会自动记录同步时间
    # 可以通过设置环境变量启用详细日志
    # os.environ["NCCL_DEBUG"] = "INFO"


# 在训练循环中定期调用
monitor_training(rank, dist.get_world_size())

7.2 调试技巧

  • 先在单机上测试,再扩展到多机
  • 使用 os.environ["NCCL_DEBUG"] = "INFO" 查看详细通信日志
  • 确保所有机器的时间同步(使用 NTP)
  • 检查防火墙是否允许 NCCL 通信端口

分布式训练虽然增加了复杂度,但带来的性能提升是显著的。对于大模型训练,几乎是必备的技术手段。