PyTorch-09-使用示例与最佳实践#
完整训练流程示例#
图像分类任务(ResNet-18)#
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
# ============ 1. 数据准备 ============
transform_train = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
transform_val = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
train_dataset = datasets.ImageFolder('data/train', transform=transform_train)
val_dataset = datasets.ImageFolder('data/val', transform=transform_val)
train_loader = DataLoader(
train_dataset,
batch_size=32,
shuffle=True,
num_workers=4,
pin_memory=True # CUDA优化:固定内存加速传输
)
val_loader = DataLoader(
val_dataset,
batch_size=32,
shuffle=False,
num_workers=4,
pin_memory=True
)
# ============ 2. 模型构建 ============
model = models.resnet18(pretrained=False, num_classes=10)
model = model.cuda()
# 多GPU训练
if torch.cuda.device_count() > 1:
model = nn.DataParallel(model)
# 或使用DDP(推荐)
# model = nn.parallel.DistributedDataParallel(model)
# ============ 3. 损失函数和优化器 ============
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(
model.parameters(),
lr=0.1,
momentum=0.9,
weight_decay=1e-4
)
# 学习率调度
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)
# ============ 4. 混合精度训练 ============
from torch.cuda.amp import autocast, GradScaler
scaler = GradScaler()
# ============ 5. 训练循环 ============
def train_epoch(model, loader, criterion, optimizer, scaler):
model.train()
running_loss = 0.0
correct = 0
total = 0
for batch_idx, (inputs, targets) in enumerate(loader):
inputs, targets = inputs.cuda(), targets.cuda()
optimizer.zero_grad()
# 混合精度前向传播
with autocast():
outputs = model(inputs)
loss = criterion(outputs, targets)
# 混合精度反向传播
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
# 统计
running_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
if batch_idx % 100 == 0:
print(f'Batch {batch_idx}/{len(loader)}, '
f'Loss: {running_loss/(batch_idx+1):.3f}, '
f'Acc: {100.*correct/total:.2f}%')
return running_loss / len(loader), 100. * correct / total
def validate(model, loader, criterion):
model.eval()
running_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for inputs, targets in loader:
inputs, targets = inputs.cuda(), targets.cuda()
outputs = model(inputs)
loss = criterion(outputs, targets)
running_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
return running_loss / len(loader), 100. * correct / total
# ============ 6. 训练主循环 ============
num_epochs = 200
best_acc = 0
for epoch in range(num_epochs):
print(f'\nEpoch {epoch+1}/{num_epochs}')
print('-' * 60)
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, scaler)
val_loss, val_acc = validate(model, val_loader, criterion)
print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
# 学习率调度
scheduler.step()
# 保存最佳模型
if val_acc > best_acc:
best_acc = val_acc
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'acc': val_acc,
}, 'best_model.pth')
print(f'Saved best model with acc: {best_acc:.2f}%')
性能优化最佳实践#
1. 数据加载优化#
# ✅ 推荐:多进程加载 + pin_memory
train_loader = DataLoader(
dataset,
batch_size=32,
num_workers=4, # 使用多进程
pin_memory=True, # 固定内存(CUDA优化)
prefetch_factor=2, # 预取批次数
persistent_workers=True # 保持worker进程(减少重启开销)
)
# ❌ 避免:单进程加载
train_loader = DataLoader(dataset, batch_size=32, num_workers=0)
2. 内存优化#
# ✅ 梯度累积(模拟大batch size)
accumulation_steps = 4
model.zero_grad()
for i, (inputs, targets) in enumerate(train_loader):
outputs = model(inputs)
loss = criterion(outputs, targets)
loss = loss / accumulation_steps # 归一化
loss.backward()
if (i + 1) % accumulation_steps == 0:
optimizer.step()
model.zero_grad()
# ✅ 梯度检查点(减少显存)
from torch.utils.checkpoint import checkpoint
class MyModel(nn.Module):
def forward(self, x):
x = checkpoint(self.layer1, x) # 不保存中间激活
x = checkpoint(self.layer2, x)
return x
# ✅ 释放不需要的张量
tensor = tensor.detach() # 从计算图分离
del tensor # 显式删除
torch.cuda.empty_cache() # 清空显存缓存
3. 计算优化#
# ✅ 使用inplace操作(节省内存)
x.relu_() # inplace
x.add_(y)
# ✅ 避免不必要的CPU-GPU传输
# ❌ 避免:
for i in range(len(tensor)):
print(tensor[i].item()) # 每次都CPU-GPU同步
# ✅ 推荐:
print(tensor.tolist()) # 一次性传输
# ✅ 使用torch.no_grad()禁用autograd
with torch.no_grad():
for inputs, targets in val_loader:
outputs = model(inputs)
# 推理不需要构建计算图
# ✅ 设置benchmark模式(cuDNN自动调优)
torch.backends.cudnn.benchmark = True # 训练时开启
torch.backends.cudnn.deterministic = False # 允许非确定性算法(更快)
4. 混合精度训练#
from torch.cuda.amp import autocast, GradScaler
model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
scaler = GradScaler()
for inputs, targets in train_loader:
optimizer.zero_grad()
# 自动混合精度
with autocast():
outputs = model(inputs)
loss = criterion(outputs, targets)
# 缩放梯度防止下溢
scaler.scale(loss).backward()
# 梯度裁剪(防止爆炸)
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
scaler.step(optimizer)
scaler.update()
# 性能提升:1.5-2x(Tensor Core加速)
# 显存占用:减少约50%
5. 分布式训练(DDP)#
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group("nccl", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
def train_ddp(rank, world_size):
setup(rank, world_size)
# 设置当前进程的设备
torch.cuda.set_device(rank)
# 创建模型并移到对应GPU
model = MyModel().to(rank)
model = DDP(model, device_ids=[rank])
# 分布式采样器(每个进程处理不同数据)
sampler = torch.utils.data.distributed.DistributedSampler(
dataset,
num_replicas=world_size,
rank=rank
)
loader = DataLoader(
dataset,
batch_size=32,
sampler=sampler,
num_workers=4
)
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
for epoch in range(num_epochs):
sampler.set_epoch(epoch) # 打乱数据
for inputs, targets in loader:
inputs, targets = inputs.to(rank), targets.to(rank)
outputs = model(inputs)
loss = criterion(outputs, targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 保存模型(仅rank 0)
if rank == 0:
torch.save(model.module.state_dict(), 'model.pth')
cleanup()
# 启动多进程训练
import torch.multiprocessing as mp
if __name__ == '__main__':
world_size = torch.cuda.device_count()
mp.spawn(train_ddp, args=(world_size,), nprocs=world_size, join=True)
常见陷阱与解决#
1. 显存泄漏#
# ❌ 陷阱:保留了计算图引用
losses = []
for inputs, targets in train_loader:
loss = criterion(model(inputs), targets)
losses.append(loss) # loss持有整个计算图!
loss.backward()
# ✅ 解决:只保存标量值
losses = []
for inputs, targets in train_loader:
loss = criterion(model(inputs), targets)
losses.append(loss.item()) # 只保存数值
loss.backward()
# ❌ 陷阱:不必要的tensor累积
total = 0
for x in tensors:
total = total + x # 创建新tensor
# ✅ 解决:使用inplace或.item()
total = 0.0
for x in tensors:
total += x.item()
2. 梯度爆炸/消失#
# ✅ 梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# ✅ 使用BatchNorm/LayerNorm稳定训练
model = nn.Sequential(
nn.Linear(100, 100),
nn.BatchNorm1d(100), # 归一化
nn.ReLU(),
nn.Linear(100, 10)
)
# ✅ 检测梯度异常
torch.autograd.set_detect_anomaly(True) # 开发时开启
3. 数据类型错误#
# ❌ 陷阱:混合精度类型
x = torch.randn(10) # float32
w = torch.randn(10, dtype=torch.float64) # float64
y = x @ w # 错误:类型不匹配
# ✅ 解决:统一类型
y = x.double() @ w # 转为float64
# 或
y = x @ w.float() # 转为float32
# ✅ 设置默认类型
torch.set_default_dtype(torch.float32)
4. 推理模式错误#
# ❌ 陷阱:忘记切换到eval模式
model.train() # 训练模式
with torch.no_grad():
outputs = model(inputs) # BatchNorm/Dropout仍在训练模式!
# ✅ 解决:明确设置eval
model.eval()
with torch.no_grad():
outputs = model(inputs)
模型调试技巧#
1. 检查梯度流#
def check_gradients(model):
for name, param in model.named_parameters():
if param.grad is not None:
grad_norm = param.grad.norm().item()
print(f'{name}: grad_norm={grad_norm:.4f}')
if grad_norm == 0:
print(f'Warning: {name} has zero gradient!')
# 使用
loss.backward()
check_gradients(model)
2. 可视化特征图#
import matplotlib.pyplot as plt
def visualize_feature_maps(model, input_image):
activation = {}
def hook_fn(name):
def hook(module, input, output):
activation[name] = output.detach()
return hook
# 注册hook
model.layer1.register_forward_hook(hook_fn('layer1'))
model.layer2.register_forward_hook(hook_fn('layer2'))
# 前向传播
model(input_image)
# 可视化
for name, feat in activation.items():
fig, axes = plt.subplots(4, 4, figsize=(12, 12))
for i, ax in enumerate(axes.flat):
if i < feat.shape[1]:
ax.imshow(feat[0, i].cpu(), cmap='viridis')
ax.axis('off')
plt.suptitle(f'{name} Feature Maps')
plt.show()
3. Profiling性能#
from torch.profiler import profile, record_function, ProfilerActivity
with profile(
activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
record_shapes=True,
profile_memory=True,
with_stack=True
) as prof:
with record_function("model_forward"):
output = model(input)
with record_function("loss_backward"):
loss = criterion(output, target)
loss.backward()
# 打印结果
print(prof.key_averages().table(sort_by="cuda_time_total", row_limit=10))
# 导出Chrome trace
prof.export_chrome_trace("trace.json")
生产部署实践#
1. 模型量化#
import torch.quantization
# 动态量化(仅权重量化)
model_fp32 = MyModel()
model_int8 = torch.quantization.quantize_dynamic(
model_fp32,
{nn.Linear, nn.Conv2d}, # 量化的层类型
dtype=torch.qint8
)
# 模型大小减少约4x,推理速度提升2-3x
# 静态量化(激活也量化)
model_fp32.qconfig = torch.quantization.get_default_qconfig('fbgemm')
model_prepared = torch.quantization.prepare(model_fp32)
# 校准(使用代表性数据)
for inputs, _ in calibration_loader:
model_prepared(inputs)
model_int8 = torch.quantization.convert(model_prepared)
2. 模型剪枝#
import torch.nn.utils.prune as prune
# 非结构化剪枝
for name, module in model.named_modules():
if isinstance(module, nn.Linear):
prune.l1_unstructured(module, name='weight', amount=0.3) # 剪枝30%
# 结构化剪枝
prune.ln_structured(module, name='weight', amount=0.5, n=2, dim=0) # 剪枝50%输出通道
# 永久移除剪枝掩码
for module in model.modules():
if isinstance(module, nn.Linear):
prune.remove(module, 'weight')
3. ONNX导出与优化#
import onnx
import onnxruntime
# 导出ONNX
torch.onnx.export(
model,
dummy_input,
"model.onnx",
opset_version=13,
do_constant_folding=True, # 常量折叠优化
input_names=['input'],
output_names=['output'],
dynamic_axes={'input': {0: 'batch'}} # 动态batch size
)
# 优化ONNX模型
model_onnx = onnx.load("model.onnx")
model_onnx = onnx.optimizer.optimize(model_onnx)
# 使用ONNXRuntime推理
session = onnxruntime.InferenceSession("model.onnx", providers=['CUDAExecutionProvider'])
outputs = session.run(None, {'input': input_array})
4. TorchServe部署#
# 打包模型
torch-model-archiver \
--model-name resnet18 \
--version 1.0 \
--serialized-file model.pt \
--handler image_classifier \
--export-path model_store
# 启动服务
torchserve --start \
--model-store model_store \
--models resnet18=resnet18.mar \
--ncs
# 推理请求
curl -X POST http://localhost:8080/predictions/resnet18 \
-T image.jpg
总结建议#
训练阶段#
- 数据:使用DataLoader多进程 + pin_memory
- 模型:使用DDP而非DataParallel
- 精度:启用混合精度训练(AMP)
- 显存:梯度累积或梯度检查点
- 调试:开启异常检测(detect_anomaly)
推理阶段#
- 模式:
model.eval() + torch.no_grad()
- 优化:TorchScript编译 + freeze
- 量化:动态或静态量化(INT8)
- 批处理:使用更大batch size
- 硬件:CUDA Graph + TensorRT
部署阶段#
- 格式:TorchScript(跨语言)或ONNX(跨框架)
- 服务:TorchServe或Triton Inference Server
- 监控:集成Prometheus + Grafana
- 弹性:Kubernetes + HPA
- 安全:模型加密 + API认证