vLLM-06-Distributed模块-概览
模块职责
Distributed 模块负责 vLLM 的分布式通信和并行策略实现,包括:
- 初始化分布式进程组(torch.distributed)
- 实现张量并行(Tensor Parallelism, TP)
- 实现流水线并行(Pipeline Parallelism, PP)
- 实现数据并行(Data Parallelism, DP)
- 实现专家并行(Expert Parallelism, EP,用于 MoE 模型)
- 提供高效的集合通信算子(AllReduce、AllGather、ReduceScatter 等)
- 实现自定义通信优化(Custom AllReduce)
- 支持多种通信后端(NCCL、Gloo)
输入与输出
输入
- 初始化参数:world_size、rank、backend、master_addr、master_port
- 通信数据:torch.Tensor(需要通信的张量)
- 并行配置:ParallelConfig
输出
- 通信结果:torch.Tensor(通信后的张量)
- 进程组:torch.distributed.ProcessGroup
上下游依赖
上游(调用方)
ModelExecutor:模型并行切分和通信EngineCore:数据并行同步Worker:设备间通信
下游(被调用)
torch.distributed:PyTorch 分布式通信NCCL:NVIDIA 集合通信库Custom CUDA kernels:自定义高效通信实现
生命周期
初始化阶段
- 设置环境变量(MASTER_ADDR、MASTER_PORT、WORLD_SIZE、RANK)
- 初始化进程组(
torch.distributed.init_process_group) - 创建并行状态(TP、PP、DP 的进程组)
- (可选)初始化自定义 AllReduce
运行阶段
- 模型前向传播时进行 TP/PP 通信
- 梯度同步时进行 DP 通信(训练场景)
- KV Cache 传输时进行设备间通信
关闭阶段
- 销毁进程组(
torch.distributed.destroy_process_group) - 清理通信资源
架构图
flowchart TB
subgraph ParallelStrategies["并行策略"]
TP[Tensor Parallel<br/>模型切分]
PP[Pipeline Parallel<br/>Layer 切分]
DP[Data Parallel<br/>副本并行]
EP[Expert Parallel<br/>MoE 切分]
end
subgraph CommunicationOps["通信算子"]
AllReduce[AllReduce<br/>全局规约]
AllGather[AllGather<br/>全局收集]
ReduceScatter[ReduceScatter<br/>规约散布]
Send[Send/Recv<br/>点对点]
end
subgraph Backends["通信后端"]
NCCL[NCCL<br/>NVIDIA GPU]
Gloo[Gloo<br/>CPU]
Custom[Custom AllReduce<br/>优化实现]
end
TP -->|使用| AllReduce
TP -->|使用| AllGather
PP -->|使用| Send
DP -->|使用| AllReduce
EP -->|使用| AllReduce
EP -->|使用| ReduceScatter
AllReduce -->|GPU| NCCL
AllReduce -->|GPU| Custom
AllGather -->|GPU| NCCL
Send -->|GPU| NCCL
AllReduce -->|CPU| Gloo
style TP fill:#e1f5ff
style PP fill:#ffe1e1
style DP fill:#e1ffe1
style Custom fill:#fff5e1
架构说明
1. 图意概述
Distributed 模块实现四种并行策略(TP、PP、DP、EP),每种策略使用不同的集合通信算子,底层通过 NCCL/Gloo/Custom 后端执行实际通信。
2. 关键接口
初始化接口:
initialize_model_parallel(tp_size, pp_size): 初始化模型并行get_tensor_model_parallel_group(): 获取 TP 进程组get_pipeline_model_parallel_group(): 获取 PP 进程组
通信接口:
tensor_model_parallel_all_reduce(tensor): TP AllReducetensor_model_parallel_all_gather(tensor): TP AllGatherbroadcast(tensor, src): 广播
状态查询:
get_tensor_model_parallel_rank(): 获取 TP rankget_tensor_model_parallel_world_size(): 获取 TP world size
3. 边界(并发/超时/幂等/顺序)
并发:
- 集合通信是阻塞的(同步点)
- 点对点通信支持异步
超时:
- NCCL 默认超时:30 分钟
- 可通过
NCCL_TIMEOUT_S环境变量配置
幂等性:
- 通信操作非幂等(每次产生新结果)
顺序:
- 集合通信要求所有 rank 按相同顺序调用
4. 异常与回退
异常类型:
- 通信超时:NCCL timeout
- 进程崩溃:某个 rank 挂掉
- 网络故障:通信失败
回退策略:
- 通信失败通常无法恢复,需重启所有进程
- NCCL 错误会导致整个作业失败
5. 性能与容量假设
性能特征:
- AllReduce 延迟:~100 µs(小张量),~1 ms(大张量)
- 带宽:接近理论带宽(NVLink:300 GB/s,IB:100 GB/s)
- 扩展性:TP 2-8 GPU 接近线性
容量假设:
- TP:最多 8 GPU(受 NVLink 限制)
- PP:理论无限,实际 2-16 stage
- DP:理论无限
6. 版本兼容/演进
V0 → V1 变更:
- V1 简化了并行状态管理
- V1 支持更灵活的并行组合
兼容性:
- PyTorch ≥ 2.0
- NCCL ≥ 2.10
核心算法与流程
Tensor Parallelism(张量并行)
原理:将模型参数按列或行切分到多个 GPU。
实现:
# 列并行 Linear
class ColumnParallelLinear(nn.Module):
def forward(self, input: torch.Tensor) -> torch.Tensor:
# 1. input 在所有 TP rank 上相同(或已通过 AllGather 同步)
# 2. 权重 W 按列切分:W = [W0, W1, ..., W_{tp-1}]
# 3. 计算 output_parallel = input @ W_i(本地计算)
output_parallel = F.linear(input, self.weight)
# 4. (可选)根据需要进行 AllReduce 或延迟同步
return output_parallel
# 行并行 Linear
class RowParallelLinear(nn.Module):
def forward(self, input: torch.Tensor) -> torch.Tensor:
# 1. input 已被切分(来自上层 ColumnParallel)
# 2. 权重 W 按行切分:W = [W0; W1; ...; W_{tp-1}]
# 3. 计算 output_parallel = input_i @ W_i(本地计算)
output_parallel = F.linear(input, self.weight)
# 4. AllReduce 汇总结果
output = tensor_model_parallel_all_reduce(output_parallel)
return output
通信开销:
- ColumnParallel:通常无通信(延迟到 RowParallel)
- RowParallel:一次 AllReduce(size = batch_size × seq_len × hidden_dim)
Pipeline Parallelism(流水线并行)
原理:将模型层切分到多个 stage,每个 stage 在不同 GPU。
实现:
# Stage 0 (Rank 0)
hidden_states = embedding(input_ids)
hidden_states = layers[0:4](hidden_states)
send(hidden_states, dst=1) # 发送到 Stage 1
# Stage 1 (Rank 1)
hidden_states = recv(src=0) # 接收来自 Stage 0
hidden_states = layers[4:8](hidden_states)
send(hidden_states, dst=2) # 发送到 Stage 2
# Stage 2 (Rank 2)
hidden_states = recv(src=1)
hidden_states = layers[8:12](hidden_states)
logits = lm_head(hidden_states)
通信开销:
- 每个 micro-batch:PP_size - 1 次点对点通信
- Pipeline bubble:~(PP_size - 1) / num_micro_batches
Data Parallelism(数据并行)
原理:每个 GPU 持有完整模型副本,处理不同数据。
实现:
# 每个 DP rank 处理不同的请求
for dp_rank in range(dp_size):
if rank == dp_rank:
# 处理分配给本 rank 的请求
outputs = model(inputs[dp_rank])
# (训练场景)梯度同步
# all_reduce(grads) # vLLM 推理场景无需梯度同步
通信开销:
- 推理:无通信(完全独立)
- 训练:每个 backward 一次 AllReduce(模型参数大小)
Expert Parallelism(专家并行,用于 MoE)
原理:将 MoE 的专家(Experts)切分到多个 GPU。
实现:
# 假设 8 个 experts,EP=4,每个 GPU 持有 2 个 experts
class MoELayer(nn.Module):
def forward(self, hidden_states, routing_weights):
# 1. 根据 routing 分配 tokens 到 experts
# 2. AllToAll 通信:将 tokens 发送到对应的 GPU
expert_input = all_to_all(hidden_states, routing_weights)
# 3. 本地计算 expert 输出
expert_output = self.experts(expert_input)
# 4. AllToAll 通信:将输出发回原 GPU
output = all_to_all(expert_output, routing_weights)
return output
通信开销:
- 两次 AllToAll(size = batch_size × seq_len × hidden_dim)
自定义 AllReduce 优化
vLLM 实现了自定义 AllReduce,绕过 NCCL 的开销:
# 使用 IPC (Inter-Process Communication) 直接访问 GPU 内存
def custom_all_reduce(tensor: torch.Tensor) -> torch.Tensor:
# 1. 每个 GPU 将 tensor 写入共享内存
ipc_handle = cuda_ipc_get_handle(tensor)
# 2. 所有 GPU 读取其他 GPU 的 tensor
for rank in range(world_size):
remote_tensor = cuda_ipc_open_handle(ipc_handles[rank])
tensor += remote_tensor # 原地累加
# 3. 无需额外同步(已在共享内存中完成)
return tensor
性能提升:
- 小张量(< 1 MB):2-3x 速度提升
- 适用于频繁的小规模 AllReduce
通信模式
Ring AllReduce
graph LR
GPU0[GPU 0] -->|chunk 0| GPU1[GPU 1]
GPU1 -->|chunk 1| GPU2[GPU 2]
GPU2 -->|chunk 2| GPU3[GPU 3]
GPU3 -->|chunk 3| GPU0
步骤:
- Reduce-Scatter:每个 GPU 收集一部分数据的 reduce 结果
- AllGather:每个 GPU 广播其 reduce 结果
通信量:2 × (N-1)/N × data_size(N = GPU 数量)
Tree AllReduce
graph TB
GPU0[GPU 0]
GPU1[GPU 1]
GPU2[GPU 2]
GPU3[GPU 3]
GPU0 & GPU1 --> Reduce1[Reduce]
GPU2 & GPU3 --> Reduce2[Reduce]
Reduce1 & Reduce2 --> Root[Root]
Root --> GPU0 & GPU1 & GPU2 & GPU3
步骤:
- 树形 Reduce:层层汇总到 root
- 树形 Broadcast:从 root 广播到所有 GPU
通信量:2 × log(N) × data_size
配置示例
Tensor Parallelism
from vllm import LLM
llm = LLM(
model="meta-llama/Llama-2-70b-hf",
tensor_parallel_size=4, # 4-GPU TP
)
Pipeline Parallelism
llm = LLM(
model="meta-llama/Llama-2-70b-hf",
pipeline_parallel_size=2, # 2-stage PP
tensor_parallel_size=4, # 每 stage 4-GPU TP
# 总共 2 × 4 = 8 GPU
)
Data Parallelism
llm = LLM(
model="meta-llama/Llama-2-7b-hf",
data_parallel_size=4, # 4 个模型副本
)
性能对比
| 并行策略 | 扩展性 | 通信开销 | 内存效率 | 适用场景 |
|---|---|---|---|---|
| TP | 1.8-3.5x (2-4 GPU) | 中(每层 AllReduce) | 高(参数切分) | 大模型 |
| PP | 3.0-3.8x (4 GPU) | 低(只在 stage 边界) | 高(层切分) | 超大模型 |
| DP | 接近线性 | 无(推理) | 低(完整副本) | 高并发 |
| EP | 接近线性 | 高(AllToAll) | 高(expert 切分) | MoE 模型 |
总结
Distributed 模块实现了 vLLM 的并行策略,支持:
- 四种并行策略:TP、PP、DP、EP
- 高效通信:NCCL、Custom AllReduce
- 灵活组合:支持 TP+PP、TP+DP 等
- 良好扩展性:接近线性扩展(2-8 GPU)
通过合理选择并行策略,可以支持从 7B 到 70B+ 的各类模型规模。