PyTorch-03-Autograd自动微分
模块概览
torch.autograd是PyTorch的自动微分引擎,实现反向模式自动微分(reverse-mode automatic differentiation)。构建动态计算图,记录张量操作,然后通过链式法则自动计算梯度。
核心概念
- 动态计算图:运行时构建,每次前向传播创建新图
- Function节点:表示可微分操作,保存前向上下文用于反向
- 梯度计算:从输出反向遍历图,应用链式法则
- 梯度累积:支持多次backward,梯度累加到
.grad属性
架构图
flowchart TB
subgraph Python API
A1[torch.autograd.backward]
A2[torch.autograd.grad]
A3[Tensor.backward]
A4[torch.autograd.Function]
end
subgraph 计算图构建
B1[AutogradMeta]
B2[grad_fn_]
B3[next_edges_]
B4[saved_tensors]
end
subgraph 反向引擎
C1[GraphTask]
C2[Engine::execute]
C3[ReadyQueue]
C4[Worker线程池]
end
subgraph Function实现
D1[AddBackward]
D2[MulBackward]
D3[MatmulBackward]
D4[自定义Function]
end
subgraph 梯度累积
E1[AccumulateGrad]
E2[grad_accumulator_]
E3[Tensor.grad]
end
A1 --> C1
A2 --> C1
A3 --> C1
A4 --> D4
B1 --> B2
B2 --> B3
B2 --> B4
C1 --> C2
C2 --> C3
C3 --> C4
C4 --> D1
C4 --> D2
C4 --> D3
C4 --> D4
D1 --> E1
D2 --> E1
E1 --> E2
E2 --> E3
style C1 fill:#e8f5e9
style C2 fill:#e8f5e9
style D1 fill:#e1f5ff
style E1 fill:#fff4e1
核心数据结构
AutogradMeta
struct AutogradMeta : public AutogradMetaInterface {
// 梯度张量
Variable grad_;
// 梯度函数(此tensor是哪个operation的输出)
std::shared_ptr<Node> grad_fn_;
// 梯度累积器(仅叶子节点)
std::weak_ptr<Node> grad_accumulator_;
// Hook列表
std::vector<std::shared_ptr<FunctionPreHook>> hooks_;
std::vector<std::shared_ptr<FunctionPostHook>> post_hooks_;
// requires_grad标志
bool requires_grad_ = false;
// 是否为叶子节点
bool is_view_ = false;
// 前向梯度(用于forward-mode AD)
std::shared_ptr<ForwardGrad> fw_grad_;
};
Node (grad_fn)
struct Node : std::enable_shared_from_this<Node> {
// 下一个节点(输入的grad_fn)
edge_list next_edges_;
// 反向传播函数
virtual variable_list apply(variable_list&& inputs) = 0;
// 保存的张量(用于反向计算)
std::vector<SavedVariable> saved_variables_;
// Hook列表
std::vector<std::unique_ptr<FunctionPreHook>> pre_hooks_;
std::vector<std::unique_ptr<FunctionPostHook>> post_hooks_;
// 节点信息
std::string name() const;
uint64_t sequence_nr() const;
};
重要子类:
AccumulateGrad: 叶子节点的梯度累积器AddBackward0,MulBackward0: 算子的反向函数PythonFunction: 自定义autograd.Function
Edge
struct Edge {
// 指向的Node
std::shared_ptr<Node> function;
// 输入索引(该edge连接到function的第几个输入)
uint32_t input_nr;
};
SavedVariable
struct SavedVariable {
// 保存的张量数据
Variable data_;
// 保存时的版本号
uint32_t version_counter_;
// 是否为weak引用(不增加引用计数)
bool was_default_constructed_ = true;
};
前向传播计算图构建
示例:c = a * b + a
import torch
a = torch.tensor([2.0], requires_grad=True)
b = torch.tensor([3.0], requires_grad=True)
# 前向传播
t1 = a * b # t1 = 6.0
c = t1 + a # c = 8.0
计算图结构
graph TB
a[a: Variable<br/>data=2.0<br/>requires_grad=True<br/>grad_fn=None<br/>is_leaf=True]
b[b: Variable<br/>data=3.0<br/>requires_grad=True<br/>grad_fn=None<br/>is_leaf=True]
mul[MulBackward0<br/>saved: a=2.0, b=3.0]
t1[t1: Variable<br/>data=6.0<br/>grad_fn=MulBackward0]
add[AddBackward0<br/>saved: None]
c[c: Variable<br/>data=8.0<br/>grad_fn=AddBackward0]
a --> mul
b --> mul
mul --> t1
t1 --> add
a --> add
add --> c
style a fill:#e8f5e9
style b fill:#e8f5e9
style c fill:#fff4e1
构建过程
// 1. a * b
Tensor at::mul(const Tensor& self, const Tensor& other) {
// 计算结果
Tensor result = at::empty_like(self);
mul_out(result, self, other); // result = 6.0
// 构建autograd元信息
if (self.requires_grad() || other.requires_grad()) {
// 创建MulBackward节点
auto grad_fn = std::make_shared<MulBackward0>();
// 保存前向上下文
grad_fn->set_next_edges({
Edge{self.grad_fn(), 0}, // 连接到a
Edge{other.grad_fn(), 0} // 连接到b
});
grad_fn->saved_self = SavedVariable(self);
grad_fn->saved_other = SavedVariable(other);
// 设置结果的grad_fn
result.set_requires_grad(true);
result.set_grad_fn(grad_fn);
}
return result;
}
// 2. t1 + a
Tensor at::add(const Tensor& self, const Tensor& other) {
Tensor result = at::empty_like(self);
add_out(result, self, other); // result = 8.0
if (self.requires_grad() || other.requires_grad()) {
auto grad_fn = std::make_shared<AddBackward0>();
grad_fn->set_next_edges({
Edge{self.grad_fn(), 0}, // 连接到t1(MulBackward0)
Edge{other.grad_fn(), 0} // 连接到a
});
// Add不需要保存输入(梯度恒为1)
result.set_requires_grad(true);
result.set_grad_fn(grad_fn);
}
return result;
}
反向传播引擎
Engine架构
struct Engine {
// 执行反向传播
void execute(
const edge_list& roots, // 起始节点
const variable_list& inputs, // 输入张量
bool keep_graph, // 是否保留图
bool create_graph, // 是否创建梯度的图
bool accumulate_grad, // 是否累积梯度
const edge_list& outputs = {} // 输出边(用于grad())
);
// 工作线程池
std::vector<std::thread> workers_;
// 就绪队列(每个设备一个)
std::vector<std::shared_ptr<ReadyQueue>> ready_queues_;
// 全局单例
static Engine& get_default_engine();
};
反向传播流程
sequenceDiagram
autonumber
participant User as 用户代码
participant Backward as c.backward()
participant Engine as Engine
participant GraphTask as GraphTask
participant Queue as ReadyQueue
participant Worker as Worker线程
participant Func as grad_fn
participant Accum as AccumulateGrad
User->>Backward: c.backward()
Backward->>Engine: execute({c.grad_fn()}, ...)
Engine->>GraphTask: 创建GraphTask
Note over GraphTask: 记录所有需要计算<br/>梯度的节点
Engine->>GraphTask: 拓扑排序
GraphTask->>GraphTask: 计算依赖计数<br/>(每个节点被几个edge指向)
Engine->>Queue: 添加根节点(c.grad_fn)
Queue->>Queue: grad_output = torch.ones_like(c)
Engine->>Worker: 启动worker线程
loop 直到Queue为空
Worker->>Queue: 取出就绪节点
Queue-->>Worker: AddBackward0, grad_out=[1.0]
Worker->>Func: apply([grad_out])
Note over Func: AddBackward0::apply<br/>grad_inputs = [1.0, 1.0]
Func-->>Worker: [1.0, 1.0]
Worker->>Worker: 遍历next_edges
loop 对每个输入
alt 是叶子节点
Worker->>Accum: AccumulateGrad::apply([1.0])
Accum->>Accum: a.grad += 1.0
else 是中间节点
Worker->>Queue: 递减依赖计数
alt 依赖计数==0
Worker->>Queue: 添加到就绪队列<br/>MulBackward0, grad_out=[1.0]
end
end
end
end
Engine-->>Backward: 完成
Backward-->>User: 梯度已累积到a.grad, b.grad
具体Function实现
AddBackward0
struct AddBackward0 : public TraceableFunction {
variable_list apply(variable_list&& grads) override {
// Add的梯度:d(a+b)/da = 1, d(a+b)/db = 1
// 直接传播grad_output
return {grads[0], grads[0]};
}
std::string name() const override {
return "AddBackward0";
}
};
MulBackward0
struct MulBackward0 : public TraceableFunction {
SavedVariable saved_self; // 保存a
SavedVariable saved_other; // 保存b
variable_list apply(variable_list&& grads) override {
auto& grad_output = grads[0];
// d(a*b)/da = b
// d(a*b)/db = a
variable_list grad_inputs(2);
if (should_compute_output(0)) {
grad_inputs[0] = grad_output * saved_other.unpack();
}
if (should_compute_output(1)) {
grad_inputs[1] = grad_output * saved_self.unpack();
}
return grad_inputs;
}
std::string name() const override {
return "MulBackward0";
}
};
MatmulBackward0
struct MatmulBackward0 : public TraceableFunction {
SavedVariable saved_self; // 矩阵A
SavedVariable saved_mat2; // 矩阵B
variable_list apply(variable_list&& grads) override {
auto& grad_output = grads[0]; // dL/d(AB)
variable_list grad_inputs(2);
// dL/dA = dL/d(AB) @ B^T
if (should_compute_output(0)) {
grad_inputs[0] = grad_output.mm(saved_mat2.unpack().t());
}
// dL/dB = A^T @ dL/d(AB)
if (should_compute_output(1)) {
grad_inputs[1] = saved_self.unpack().t().mm(grad_output);
}
return grad_inputs;
}
std::string name() const override {
return "MatmulBackward0";
}
};
自定义Autograd Function
Python API
class MyReLU(torch.autograd.Function):
@staticmethod
def forward(ctx, input):
# ctx用于保存前向上下文
ctx.save_for_backward(input)
return input.clamp(min=0)
@staticmethod
def backward(ctx, grad_output):
# 恢复保存的张量
input, = ctx.saved_tensors
# 计算梯度
grad_input = grad_output.clone()
grad_input[input < 0] = 0
return grad_input
# 使用
x = torch.randn(10, requires_grad=True)
y = MyReLU.apply(x) # 不要直接调用forward/backward
loss = y.sum()
loss.backward()
C++实现
// 定义Function
struct MyReluFunction : public Function<MyReluFunction> {
static Tensor forward(
AutogradContext* ctx,
const Tensor& input) {
// 保存输入
ctx->save_for_backward({input});
// 前向计算
return input.clamp_min(0);
}
static variable_list backward(
AutogradContext* ctx,
variable_list grad_outputs) {
// 恢复保存的张量
auto saved = ctx->get_saved_variables();
auto input = saved[0];
auto grad_output = grad_outputs[0];
// 计算梯度
auto grad_input = grad_output.clone();
grad_input.masked_fill_(input < 0, 0);
return {grad_input};
}
};
// 使用
Tensor my_relu(const Tensor& input) {
return MyReluFunction::apply(input);
}
梯度计算API
torch.autograd.backward
# 基础用法
loss = model(x)
loss.backward() # 等价于 torch.autograd.backward([loss])
# 多输出backward
out1, out2 = model(x)
torch.autograd.backward(
[out1, out2],
[torch.ones_like(out1), torch.ones_like(out2)]
)
# 保留计算图(允许多次backward)
loss.backward(retain_graph=True)
loss.backward() # 第二次backward
# 创建梯度的梯度图(二阶导数)
loss.backward(create_graph=True)
grad = x.grad
grad_grad = torch.autograd.grad(grad.sum(), x, create_graph=True)[0]
torch.autograd.grad
# 计算梯度但不累积到.grad
x = torch.randn(10, requires_grad=True)
y = x ** 2
grad_x, = torch.autograd.grad(y.sum(), x) # grad_x是新tensor
# vs backward(累积到.grad)
y.sum().backward() # x.grad += grad_x
# 多输入多输出
x1 = torch.randn(3, requires_grad=True)
x2 = torch.randn(3, requires_grad=True)
y = (x1 ** 2 + x2 ** 3).sum()
grad_x1, grad_x2 = torch.autograd.grad(y, [x1, x2])
# grad_x1 = dy/dx1 = 2*x1
# grad_x2 = dy/dx2 = 3*x2^2
# allow_unused: 某些输入不影响输出
z = x1.sum()
grad_x1, grad_x2 = torch.autograd.grad(
z, [x1, x2],
allow_unused=True
)
# grad_x1 = 1, grad_x2 = None
高阶导数
二阶导数
x = torch.tensor([1.0, 2.0, 3.0], requires_grad=True)
y = (x ** 3).sum() # y = x1^3 + x2^3 + x3^3
# 一阶导数
grad_y = torch.autograd.grad(y, x, create_graph=True)[0]
# grad_y = [3*x1^2, 3*x2^2, 3*x3^2] = [3, 12, 27]
# 二阶导数
grad2_y = torch.autograd.grad(grad_y.sum(), x)[0]
# grad2_y = [6*x1, 6*x2, 6*x3] = [6, 12, 18]
Jacobian向量积
# 前向模式AD(torch.func.jvp)
def f(x):
return x ** 2
x = torch.randn(3)
v = torch.randn(3) # 切向量
# JVP: J(f)(x) @ v
y, jvp_result = torch.func.jvp(f, (x,), (v,))
# jvp_result = 2*x * v(Jacobian是对角阵,元素为2*x)
# 反向模式AD(torch.autograd.grad)
y = f(x)
vjp_result = torch.autograd.grad(y, x, v)[0]
# vjp_result = v^T @ J(f)(x) = v * 2*x(相同结果,但计算方式不同)
性能优化
1. 禁用梯度计算
# 推理时禁用autograd
with torch.no_grad():
output = model(input) # 不构建计算图
# 或使用装饰器
@torch.no_grad()
def inference(model, input):
return model(input)
2. 内存优化
# 梯度检查点(gradient checkpointing)
# 减少内存占用,代价是重新计算前向
from torch.utils.checkpoint import checkpoint
class MyModel(nn.Module):
def forward(self, x):
# 不保存中间激活,backward时重新计算
x = checkpoint(self.layer1, x)
x = checkpoint(self.layer2, x)
return x
# 释放不需要的梯度
tensor.grad = None # 比 tensor.grad.zero_() 更快
3. 混合精度训练
from torch.cuda.amp import autocast, GradScaler
model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
scaler = GradScaler()
for x, y in dataloader:
with autocast(): # 自动FP16
output = model(x)
loss = criterion(output, y)
scaler.scale(loss).backward() # 缩放梯度防止下溢
scaler.step(optimizer)
scaler.update()