1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
| class ComprehensiveOptimizationPipeline:
"""端到端优化流程(综合最佳实践)"""
def __init__(self, model, dataset, target_latency_ms=100, target_memory_gb=4):
self.original_model = model
self.dataset = dataset
self.target_latency = target_latency_ms / 1000.0 # 转换为秒
self.target_memory = target_memory_gb * 1e9 # 转换为字节
self.optimization_history = []
def run_full_optimization(self):
"""运行完整的优化流程"""
current_model = self.original_model
print("=== PyTorch Model Optimization Pipeline ===")
# 阶段1:基础模型优化
print("\n1. Basic Model Optimization...")
current_model = self._basic_model_optimization(current_model)
# 阶段2:内存优化
print("\n2. Memory Optimization...")
current_model = self._memory_optimization(current_model)
# 阶段3:计算优化
print("\n3. Computation Optimization...")
current_model = self._computation_optimization(current_model)
# 阶段4:GPU优化
print("\n4. GPU Optimization...")
current_model = self._gpu_optimization(current_model)
# 阶段5:编译优化
print("\n5. Compilation Optimization...")
current_model = self._compilation_optimization(current_model)
# 阶段6:部署优化
print("\n6. Deployment Optimization...")
current_model = self._deployment_optimization(current_model)
# 最终验证
print("\n7. Final Validation...")
self._validate_optimized_model(current_model)
return current_model
def _basic_model_optimization(self, model):
"""基础模型优化"""
# 1. 层融合
if hasattr(model, 'fuse_for_inference'):
model.fuse_for_inference()
# 2. 参数初始化优化
def optimize_initialization():
for module in model.modules():
if isinstance(module, nn.Linear):
if hasattr(module, 'reset_parameters'):
# 使用更好的初始化
nn.init.kaiming_normal_(module.weight, mode='fan_out')
optimize_initialization()
# 3. 激活函数优化
model = self._replace_activations(model)
return model
def _memory_optimization(self, model):
"""内存优化"""
# 1. 启用梯度检查点(训练时)
if model.training:
model = self._add_gradient_checkpointing(model)
# 2. 优化内存格式
if torch.cuda.is_available():
model = model.to(memory_format=torch.channels_last)
# 3. 参数共享
model = self._apply_parameter_sharing(model)
return model
def _computation_optimization(self, model):
"""计算优化"""
# 1. 算子替换
model = self._replace_inefficient_operators(model)
# 2. 并行化配置
torch.set_num_threads(min(8, torch.get_num_threads()))
return model
def _gpu_optimization(self, model):
"""GPU优化"""
if torch.cuda.is_available():
# 1. 混合精度
model = model.half() # 或使用自动混合精度
# 2. cuDNN优化
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.enabled = True
# 3. TensorRT集成(如果可用)
if hasattr(torch, 'tensorrt'):
model = self._apply_tensorrt_optimization(model)
return model
def _compilation_optimization(self, model):
"""编译优化"""
# 1. TorchScript编译
try:
scripted_model = torch.jit.script(model)
# 2. 图优化
scripted_model = torch.jit.optimize_for_inference(scripted_model)
return scripted_model
except Exception as e:
print(f"TorchScript compilation failed: {e}")
# 回退到torch.compile(PyTorch 2.0+)
if hasattr(torch, 'compile'):
try:
compiled_model = torch.compile(
model,
mode='max-autotune', # 最大化优化
dynamic=False # 静态形状优化
)
return compiled_model
except Exception as e2:
print(f"torch.compile failed: {e2}")
return model
return model
def _deployment_optimization(self, model):
"""部署优化"""
# 1. 量化
if self._should_quantize(model):
model = self._apply_quantization(model)
# 2. 剪枝
if self._should_prune(model):
model = self._apply_pruning(model)
# 3. 知识蒸馏(如果有teacher模型)
# model = self._apply_knowledge_distillation(model)
return model
def _validate_optimized_model(self, optimized_model):
"""验证优化后的模型"""
sample_input = torch.randn(1, 3, 224, 224)
if torch.cuda.is_available():
sample_input = sample_input.cuda()
optimized_model = optimized_model.cuda()
# 性能测试
latency = self._measure_latency(optimized_model, sample_input)
memory_usage = self._measure_memory_usage(optimized_model, sample_input)
print(f"\nOptimized Model Performance:")
print(f" Latency: {latency*1000:.2f} ms (Target: {self.target_latency*1000:.2f} ms)")
print(f" Memory: {memory_usage/1e9:.2f} GB (Target: {self.target_memory/1e9:.2f} GB)")
# 检查是否达到目标
latency_ok = latency <= self.target_latency
memory_ok = memory_usage <= self.target_memory
print(f" Latency Target: {'✓' if latency_ok else '✗'}")
print(f" Memory Target: {'✓' if memory_ok else '✗'}")
if not (latency_ok and memory_ok):
print("\n⚠️ Warning: Optimization targets not met. Consider:")
if not latency_ok:
print(" - Further model compression")
print(" - More aggressive quantization")
print(" - Hardware-specific optimization")
if not memory_ok:
print(" - Gradient checkpointing")
print(" - Model sharding")
print(" - Smaller batch sizes")
def _measure_latency(self, model, sample_input, num_runs=100):
"""测量推理延迟"""
model.eval()
# 预热
with torch.no_grad():
for _ in range(10):
_ = model(sample_input)
if torch.cuda.is_available():
torch.cuda.synchronize()
# 测量
start_time = time.time()
with torch.no_grad():
for _ in range(num_runs):
_ = model(sample_input)
if torch.cuda.is_available():
torch.cuda.synchronize()
end_time = time.time()
return (end_time - start_time) / num_runs
def _measure_memory_usage(self, model, sample_input):
"""测量内存使用"""
if torch.cuda.is_available():
torch.cuda.reset_peak_memory_stats()
with torch.no_grad():
_ = model(sample_input)
return torch.cuda.max_memory_allocated()
else:
# CPU内存测量(简化)
import psutil
process = psutil.Process()
return process.memory_info().rss
def create_production_training_pipeline():
"""生产级训练流水线"""
class ProductionTrainer:
def __init__(self, model, train_loader, val_loader, optimizer):
self.model = model
self.train_loader = train_loader
self.val_loader = val_loader
self.optimizer = optimizer
# 性能监控
self.performance_monitor = PerformanceMonitor()
# 自动优化配置
self._setup_automatic_optimizations()
def _setup_automatic_optimizations(self):
"""设置自动优化"""
# 1. 自动混合精度
if torch.cuda.is_available():
self.scaler = torch.cuda.amp.GradScaler()
self.use_amp = True
else:
self.use_amp = False
# 2. 编译优化
if hasattr(torch, 'compile'):
self.model = torch.compile(self.model, mode='reduce-overhead')
# 3. cuDNN优化
if torch.cuda.is_available():
torch.backends.cudnn.benchmark = True
def train_epoch(self, epoch):
"""优化的训练轮次"""
self.model.train()
epoch_loss = 0.0
num_batches = len(self.train_loader)
for batch_idx, (data, target) in enumerate(self.train_loader):
batch_start_time = time.time()
# 数据传输优化
if torch.cuda.is_available():
data = data.cuda(non_blocking=True)
target = target.cuda(non_blocking=True)
# 梯度清零
self.optimizer.zero_grad(set_to_none=True) # 更快的梯度清零
# 前向传播(混合精度)
if self.use_amp:
with torch.cuda.amp.autocast():
output = self.model(data)
loss = F.cross_entropy(output, target)
# 反向传播
self.scaler.scale(loss).backward()
self.scaler.step(self.optimizer)
self.scaler.update()
else:
output = self.model(data)
loss = F.cross_entropy(output, target)
loss.backward()
self.optimizer.step()
epoch_loss += loss.item()
# 性能监控
batch_time = time.time() - batch_start_time
self.performance_monitor.record_batch(batch_time, loss.item())
# 动态优化调整
if batch_idx % 100 == 0:
self._dynamic_optimization_adjustment(batch_idx, num_batches)
return epoch_loss / num_batches
def _dynamic_optimization_adjustment(self, batch_idx, total_batches):
"""动态优化调整"""
# 基于性能指标动态调整
avg_batch_time = self.performance_monitor.get_average_batch_time()
target_batch_time = 0.1 # 100ms目标
if avg_batch_time > target_batch_time * 1.2:
# 性能不达标,启用更多优化
if not self.use_amp and torch.cuda.is_available():
print("Enabling automatic mixed precision...")
self.scaler = torch.cuda.amp.GradScaler()
self.use_amp = True
# 减少数据加载器worker数量,释放CPU资源给GPU
if hasattr(self.train_loader, 'num_workers'):
self.train_loader.num_workers = max(1, self.train_loader.num_workers - 1)
class PerformanceMonitor:
"""性能监控器"""
def __init__(self, window_size=100):
self.window_size = window_size
self.batch_times = []
self.losses = []
def record_batch(self, batch_time, loss):
self.batch_times.append(batch_time)
self.losses.append(loss)
# 保持窗口大小
if len(self.batch_times) > self.window_size:
self.batch_times.pop(0)
self.losses.pop(0)
def get_average_batch_time(self):
return np.mean(self.batch_times) if self.batch_times else 0.0
def get_average_loss(self):
return np.mean(self.losses) if self.losses else 0.0
def get_performance_summary(self):
return {
'avg_batch_time': self.get_average_batch_time(),
'avg_loss': self.get_average_loss(),
'batch_time_std': np.std(self.batch_times),
'loss_std': np.std(self.losses)
}
|