模型压缩与加速
大约 14 分钟约 4297 字
模型压缩与加速
简介
模型压缩的目标,是在尽量保留效果的前提下,让模型更小、更快、更省资源。它通常出现在训练完成之后、模型准备进入生产环境之前,尤其适用于移动端、边缘设备、GPU 紧张的在线服务以及对延迟敏感的推理系统。
模型压缩是一个系统工程,涉及量化(Quantization)、剪枝(Pruning)、知识蒸馏(Knowledge Distillation)、低秩分解(Low-Rank Decomposition)等多种技术。这些技术可以单独使用,也可以组合使用以达到更好的压缩效果。选择哪种技术取决于目标硬件、精度容忍度、延迟要求和工程复杂度。
特点
量化(Quantization)
量化基本原理
量化的核心思想是用更低精度的数值表示来近似原始的高精度参数。最常见的转换路径是 FP32 → FP16 → INT8。量化可分为两类:
- 训练后量化(Post-Training Quantization, PTQ):训练完成后直接量化,无需重新训练,实现简单但可能有精度损失
- 量化感知训练(Quantization-Aware Training, QAT):在训练过程中模拟量化误差,精度保持更好但需要重新训练
import torch
import torch.nn as nn
# 量化基本概念演示
def demonstrate_quantization():
"""
量化过程示意:
1. 计算 FP32 张量的最小值和最大值
2. 根据 INT8 范围 [-128, 127] 计算缩放因子(scale)和零点(zero_point)
3. 将 FP32 值映射到 INT8
"""
original = torch.tensor([0.1, 0.5, -0.3, 0.8, -0.9, 1.2])
qmin, qmax = -128, 127
fmin, fmax = original.min().item(), original.max().item()
scale = (fmax - fmin) / (qmax - qmin)
zero_point = qmin - fmin / scale
# 量化
quantized = torch.clamp(torch.round(original / scale + zero_point), qmin, qmax).to(torch.int8)
# 反量化
dequantized = (quantized.float() - zero_point) * scale
print(f"原始值: {original.tolist()}")
print(f"量化值: {quantized.tolist()}")
print(f"反量化值: {[f'{v:.4f}' for v in dequantized.tolist()]}")
print(f"量化误差: {(original - dequantized).abs().tolist()}")
print(f"Scale: {scale:.6f}, Zero Point: {zero_point:.2f}")
demonstrate_quantization()动态量化(Dynamic Quantization)
class MLP(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(128, 256),
nn.ReLU(),
nn.Linear(256, 64),
nn.ReLU(),
nn.Linear(64, 10)
)
def forward(self, x):
return self.net(x)
model = MLP().eval()
sample = torch.randn(1, 128)
quantized_model = torch.quantization.quantize_dynamic(
model,
{nn.Linear},
dtype=torch.qint8
)
with torch.no_grad():
out = quantized_model(sample)
print(out.shape)# 导出 ONNX,便于跨框架推理部署
onnx_path = "mlp_quant.onnx"
torch.onnx.export(
quantized_model,
sample,
onnx_path,
input_names=["input"],
output_names=["output"],
dynamic_axes={"input": {0: "batch"}, "output": {0: "batch"}},
opset_version=17
)
print(f"exported: {onnx_path}")# 延迟对比(简化示意)
import time
def benchmark(model, input_tensor, n=200):
start = time.time()
with torch.no_grad():
for _ in range(n):
model(input_tensor)
return (time.time() - start) * 1000 / n
print("fp32 ms:", benchmark(model, sample))
print("int8 ms:", benchmark(quantized_model, sample))静态量化(Static Quantization)
import torch
import torch.nn as nn
import copy
class ConvModel(nn.Module):
"""包含卷积层的模型——适合静态量化"""
def __init__(self):
super().__init__()
self.features = nn.Sequential(
nn.Conv2d(3, 16, 3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(16, 32, 3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2),
)
self.classifier = nn.Sequential(
nn.Linear(32 * 8 * 8, 128),
nn.ReLU(),
nn.Linear(128, 10),
)
def forward(self, x):
x = self.features(x)
x = x.view(x.size(0), -1)
return self.classifier(x)
# 静态量化需要:1. 插入量化桩 → 2. 校准 → 3. 转换
model_fp32 = ConvModel().eval()
# 第一步:添加 QuantStub 和 DeQuantStub
class QuantizableConvModel(nn.Module):
def __init__(self):
super().__init__()
self.quant = torch.quantization.QuantStub()
self.features = nn.Sequential(
nn.Conv2d(3, 16, 3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(16, 32, 3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2),
)
self.classifier = nn.Sequential(
nn.Linear(32 * 8 * 8, 128),
nn.ReLU(),
nn.Linear(128, 10),
)
self.dequant = torch.quantization.DeQuantStub()
def forward(self, x):
x = self.quant(x)
x = self.features(x)
x = x.view(x.size(0), -1)
x = self.classifier(x)
x = self.dequant(x)
return x
model_to_quant = QuantizableConvModel().eval()
# 第二步:设置量化配置
model_to_quant.qconfig = torch.quantization.get_default_qconfig('fbgemm')
# 第三步:准备模型(插入 Observer)
torch.quantization.prepare(model_to_quant, inplace=True)
# 第四步:校准——用代表性数据运行前向传播
print("正在进行校准...")
with torch.no_grad():
for i in range(100):
dummy_input = torch.randn(1, 3, 32, 32)
model_to_quant(dummy_input)
# 第五步:转换
model_int8 = torch.quantization.convert(model_to_quant, inplace=False)
print(f"静态量化完成,模型类型: {type(model_int8)}")
# 对比模型大小
fp32_size = sum(p.numel() * p.element_size() for p in model_fp32.parameters())
print(f"FP32 模型参数大小: {fp32_size / 1024:.1f} KB")量化感知训练(QAT)
def quantization_aware_training(model, train_loader, epochs=3):
"""
量化感知训练(QAT):
在训练过程中插入伪量化节点(FakeQuantize),
让模型提前适应量化带来的精度损失
"""
model.train()
# 设置 QAT 配置
model.qconfig = torch.quantization.get_default_qat_qconfig('fbgemm')
torch.quantization.prepare_qat(model, inplace=True)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()
for epoch in range(epochs):
for batch_idx, (data, target) in enumerate(train_loader):
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
# 训练后期冻结量化参数
if epoch >= epochs - 1:
model.apply(torch.quantization.disable_observer)
if epoch == epochs - 1:
model.apply(torch.nn.intrinsic.QAT.convert)
print(f"QAT Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}")
# 最终转换
model_int8 = torch.quantization.convert(model.eval())
return model_int8
print("QAT 训练流程已定义")混合精度训练与推理
import torch
def mixed_precision_inference(model, input_tensor):
"""
混合精度推理:
大部分计算使用 FP16,关键操作保持 FP32
在支持 Tensor Core 的 GPU 上可显著加速
"""
model = model.half().eval() # 转换为半精度
input_tensor = input_tensor.half()
with torch.cuda.amp.autocast():
with torch.no_grad():
output = model(input_tensor)
return output.float() # 输出转回 FP32
# 混合精度训练示意
def mixed_precision_training(model, train_loader, epochs=3):
"""
使用 PyTorch AMP(Automatic Mixed Precision)进行混合精度训练
- 自动选择哪些操作用 FP16,哪些保持 FP32
- 使用 GradScaler 防止 FP16 下梯度下溢
"""
model.train().cuda()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()
scaler = torch.cuda.amp.GradScaler()
for epoch in range(epochs):
for data, target in train_loader:
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
# 自动混合精度前向传播
with torch.cuda.amp.autocast():
output = model(data)
loss = criterion(output, target)
# 缩放损失 + 反向传播
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}")
print("混合精度训练流程已定义")剪枝(Pruning)
非结构化剪枝
import torch.nn.utils.prune as prune
conv = nn.Conv2d(3, 16, kernel_size=3)
# 对卷积层权重做非结构化剪枝
prune.l1_unstructured(conv, name="weight", amount=0.3)
print(conv.weight_mask)
# 永久应用剪枝结果
prune.remove(conv, "weight")# 对多个层统一剪枝
model2 = nn.Sequential(
nn.Linear(128, 256),
nn.ReLU(),
nn.Linear(256, 64),
nn.ReLU(),
nn.Linear(64, 10)
)
for module in model2.modules():
if isinstance(module, nn.Linear):
prune.l1_unstructured(module, name="weight", amount=0.2)
prune.remove(module, "weight")结构化剪枝
def structured_pruning_example():
"""
结构化剪枝:直接移除整个通道或层
相比非结构化剪枝,不需要特殊硬件支持就能获得实际加速
"""
conv = nn.Conv2d(64, 128, kernel_size=3, padding=1)
# L1 范数通道剪枝:移除权重 L1 范数最小的 30% 通道
prune.ln_structured(conv, name="weight", amount=0.3, n=1, dim=0)
print(f"原始输出通道数: 128")
print(f"剪枝后非零通道数: {(conv.weight_mask.sum(dim=[1,2,3]) > 0).sum().item()}")
# 查看每个通道的重要性分数(L1 范数)
channel_importance = conv.weight.data.abs().sum(dim=[1, 2, 3])
print(f"通道重要性分数(前5个): {channel_importance[:5].tolist()}")
structured_pruning_example()渐进式剪枝策略
import torch.nn.utils.prune as prune
class ProgressivePruner:
"""
渐进式剪枝:
不是一次性剪掉 50%,而是分多个 epoch 逐步增加剪枝率
给模型更多时间适应稀疏结构,减少精度损失
"""
def __init__(self, model, target_sparsity=0.8, num_steps=5):
self.model = model
self.target_sparsity = target_sparsity
self.num_steps = num_steps
self.current_step = 0
def step(self):
"""每调用一次增加剪枝强度"""
self.current_step += 1
# 线性增加剪枝率:20% → 40% → 60% → 80%
current_sparsity = self.target_sparsity * (self.current_step / self.num_steps)
for module in self.model.modules():
if isinstance(module, nn.Linear):
prune.l1_unstructured(module, name="weight", amount=current_sparsity)
actual_sparsity = self._compute_sparsity()
print(f"剪枝步骤 {self.current_step}/{self.num_steps}: "
f"目标稀疏率={current_sparsity:.1%}, 实际稀疏率={actual_sparsity:.1%}")
def _compute_sparsity(self):
total = 0
zeros = 0
for module in self.model.modules():
if isinstance(module, nn.Linear):
total += module.weight.numel()
zeros += (module.weight == 0).sum().item()
return zeros / total if total > 0 else 0
# 使用示例
model_to_prune = nn.Sequential(
nn.Linear(256, 512),
nn.ReLU(),
nn.Linear(512, 256),
nn.ReLU(),
nn.Linear(256, 10),
)
pruner = ProgressivePruner(model_to_prune, target_sparsity=0.8, num_steps=4)
for _ in range(4):
pruner.step()注意:
- 非结构化剪枝减少参数数量,不一定自动降低实际延迟
- 真正部署提速通常还需要推理框架支持稀疏计算
- 结构化剪枝可以直接带来推理加速,但精度损失可能更大
- 剪枝后通常需要微调(Fine-tuning)恢复精度知识蒸馏(Knowledge Distillation)
基础蒸馏
import torch.nn.functional as F
teacher = MLP().eval()
student = nn.Sequential(
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 10)
)
optimizer = torch.optim.Adam(student.parameters(), lr=1e-3)
ce_loss = nn.CrossEntropyLoss()
def distillation_loss(student_logits, teacher_logits, labels, temperature=3.0, alpha=0.7):
hard = ce_loss(student_logits, labels)
soft = F.kl_div(
F.log_softmax(student_logits / temperature, dim=1),
F.softmax(teacher_logits / temperature, dim=1),
reduction="batchmean"
) * (temperature ** 2)
return alpha * soft + (1 - alpha) * hard# 蒸馏训练示意
x = torch.randn(32, 128)
y = torch.randint(0, 10, (32,))
student.train()
with torch.no_grad():
teacher_logits = teacher(x)
student_logits = student(x)
loss = distillation_loss(student_logits, teacher_logits, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print("distill loss:", float(loss))特征蒸馏
class FeatureDistillation(nn.Module):
"""
特征蒸馏:不仅让学生模仿教师的输出(logits),
还让学生模仿教师的中间层特征表示
通常比纯 logits 蒸馏效果更好
"""
def __init__(self, teacher, student):
super().__init__()
self.teacher = teacher
self.student = student
# 匹配特征维度的适配器
self.adapter = nn.AdaptiveAvgPool1d(student_feature_dim)
def feature_loss(self, student_features, teacher_features):
"""特征层对齐损失"""
# 将教师特征维度适配到学生维度
teacher_adapted = F.adaptive_avg_pool1d(
teacher_features.unsqueeze(1),
student_features.shape[-1]
).squeeze(1)
return F.mse_loss(student_features, teacher_adapted)
def total_loss(self, x, labels, alpha=0.5, beta=0.3, temperature=3.0):
# Hard loss:与真实标签的交叉熵
with torch.no_grad():
teacher_logits = self.teacher(x)
student_logits = self.student(x)
hard_loss = F.cross_entropy(student_logits, labels)
# Soft loss:与教师输出的 KL 散度
soft_loss = F.kl_div(
F.log_softmax(student_logits / temperature, dim=1),
F.softmax(teacher_logits / temperature, dim=1),
reduction="batchmean"
) * (temperature ** 2)
# Feature loss:中间特征对齐
feat_loss = self.feature_loss(
self.student.get_features(x),
self.teacher.get_features(x)
)
return alpha * soft_loss + beta * feat_loss + (1 - alpha - beta) * hard_loss
print("特征蒸馏框架已定义")大语言模型蒸馏
def llm_distillation_example():
"""
大语言模型蒸馏策略:
1. Logit-based:让学生模型模仿教师模型的输出分布
2. Feature-based:让学生模型的隐藏状态对齐教师模型
3. Task-specific:在特定任务上蒸馏,而非通用能力
"""
# 蒸馏配置
distill_config = {
"teacher_model": "llama-7b",
"student_model": "llama-1.5b",
"strategy": "logit_based",
"temperature": 2.0,
"alpha": 0.8, # 蒸馏损失权重
"learning_rate": 1e-5,
"batch_size": 4,
"max_length": 2048,
"num_epochs": 3,
}
# 不同蒸馏策略的适用场景
strategies = {
"logit_based": {
"desc": "学生模仿教师的输出概率分布",
"pros": "实现简单,效果稳定",
"cons": "需要教师和学生共享词汇表",
"best_for": "同架构不同规模的模型蒸馏"
},
"feature_based": {
"desc": "学生对齐教师的隐藏层表示",
"pros": "不要求词汇表一致",
"cons": "需要设计适配层,工程复杂度高",
"best_for": "跨架构蒸馏(如 Transformer → RNN)"
},
"task_specific": {
"desc": "在特定任务数据上蒸馏",
"pros": "目标明确,效率高",
"cons": "通用能力可能下降",
"best_for": "特定领域或任务场景"
}
}
for name, info in strategies.items():
print(f"\n{name}:")
print(f" 描述: {info['desc']}")
print(f" 优势: {info['pros']}")
print(f" 局限: {info['cons']}")
print(f" 适用: {info['best_for']}")
llm_distillation_example()低秩分解(Low-Rank Decomposition)
import torch
import torch.nn as nn
class LowRankLinear(nn.Module):
"""
低秩分解:将一个大矩阵 W(d_in, d_out) 分解为两个小矩阵 A(d_in, r) 和 B(r, d_out)
参数量从 d_in * d_out 降低到 d_in * r + r * d_out
当 r << min(d_in, d_out) 时,压缩效果显著
"""
def __init__(self, original_linear, rank=None):
super().__init__()
d_in = original_linear.in_features
d_out = original_linear.out_features
if rank is None:
rank = min(d_in, d_out) // 4 # 默认压缩到 1/4
self.down_proj = nn.Linear(d_in, rank, bias=False)
self.up_proj = nn.Linear(rank, d_out, bias=original_linear.bias is not None)
self.activation = nn.ReLU()
# SVD 初始化:用原始权重的奇异值分解初始化低秩矩阵
U, S, Vh = torch.linalg.svd(original_linear.weight.data, full_matrices=False)
self.down_proj.weight.data = (U[:, :rank] * S[:rank].sqrt()).T
self.up_proj.weight.data = Vh[:rank, :] * S[:rank].sqrt().unsqueeze(1)
if original_linear.bias is not None:
self.up_proj.bias.data = original_linear.bias.data.clone()
def forward(self, x):
return self.up_proj(self.activation(self.down_proj(x)))
# 演示低秩分解
original = nn.Linear(512, 512)
print(f"原始参数量: {sum(p.numel() for p in original.parameters())}")
for rank in [16, 32, 64, 128]:
low_rank = LowRankLinear(original, rank=rank)
params = sum(p.numel() for p in low_rank.parameters())
print(f"Rank={rank:3d} 参数量: {params} ({params/262144*100:.1f}%)")部署前统一评估
evaluation = {
"model_name": "student_int8",
"accuracy": 0.913,
"latency_ms": 8.7,
"memory_mb": 42,
"throughput_qps": 320,
"package_size_mb": 18
}
print(evaluation)def comprehensive_evaluation(model, eval_loader, input_tensor, num_runs=200):
"""
全面的压缩模型评估:
不仅看精度,还要看延迟、吞吐量、内存占用等
"""
import time
import torch
results = {}
# 1. 精度评估
correct = 0
total = 0
model.eval()
with torch.no_grad():
for data, target in eval_loader:
output = model(data)
pred = output.argmax(dim=1)
correct += (pred == target).sum().item()
total += target.size(0)
results["accuracy"] = correct / total
# 2. 延迟评估(P50, P95, P99)
latencies = []
model.eval()
with torch.no_grad():
# 预热
for _ in range(10):
model(input_tensor)
# 正式测量
for _ in range(num_runs):
start = time.perf_counter()
model(input_tensor)
latencies.append((time.perf_counter() - start) * 1000)
latencies.sort()
results["latency_p50_ms"] = latencies[len(latencies) // 2]
results["latency_p95_ms"] = latencies[int(len(latencies) * 0.95)]
results["latency_p99_ms"] = latencies[int(len(latencies) * 0.99)]
# 3. 吞吐量评估
batch_sizes = [1, 4, 8, 16]
results["throughput"] = {}
for bs in batch_sizes:
batch_input = input_tensor.repeat(bs, 1)
start = time.perf_counter()
with torch.no_grad():
for _ in range(50):
model(batch_input)
elapsed = time.perf_counter() - start
results["throughput"][bs] = (50 * bs) / elapsed
# 4. 内存评估
if torch.cuda.is_available():
torch.cuda.reset_peak_memory_stats()
model(input_tensor.cuda())
results["peak_memory_mb"] = torch.cuda.max_memory_allocated() / 1024 / 1024
# 5. 模型大小
torch.save(model.state_dict(), "/tmp/temp_model.pt")
import os
results["model_size_mb"] = os.path.getsize("/tmp/temp_model.pt") / 1024 / 1024
os.remove("/tmp/temp_model.pt")
return results
print("综合评估函数已定义")压缩评估不能只看 accuracy:
1. 延迟
2. 吞吐
3. 内存
4. 包大小
5. 硬件兼容性
6. 稳定性(冷启动/长时间运行)不同压缩策略对比
| 策略 | 精度损失 | 加速效果 | 实现难度 | 硬件要求 | 适用场景 |
|---|---|---|---|---|---|
| 动态量化(INT8) | 小 | 中 | 低 | 无特殊要求 | CPU 推理、快速部署 |
| 静态量化(INT8) | 中 | 高 | 中 | 需要 CPU/GPU INT8 支持 | 高吞吐推理服务 |
| QAT | 小 | 高 | 高 | 同上 | 精度敏感场景 |
| 非结构化剪枝 | 小 | 低(需稀疏支持) | 低 | 稀疏计算支持 | 模型存储压缩 |
| 结构化剪枝 | 中 | 高 | 中 | 无特殊要求 | 通用推理加速 |
| 知识蒸馏 | 中 | 高 | 中 | 无特殊要求 | 模型架构迁移 |
| 低秩分解 | 中 | 中 | 中 | 无特殊要求 | 大规模全连接层 |
优点
缺点
总结
模型压缩不是把模型"变小"那么简单,而是围绕部署目标做系统性权衡:是为了缩小包体、降低延迟、节省显存,还是为了跑在特定硬件上。只有把效果、性能和部署环境一起评估,压缩才真正有价值。建议从最简单的动态量化开始,逐步尝试更复杂的方案,并在目标硬件上做真实性能测试。
关键知识点
- 量化是最常见也最容易落地的压缩方式。
- 剪枝减少参数不一定等于实际推理更快。
- 蒸馏常用于把大模型能力迁移到更小模型。
- 部署平台支持情况会直接决定压缩收益。
- QAT 在精度敏感场景中通常优于 PTQ。
- 低秩分解适合压缩大规模全连接层和注意力矩阵。
项目落地视角
- 边缘设备常优先考虑 INT8 量化和 ONNX/TFLite 导出。
- 在线服务常结合 FP16、TensorRT、批处理优化降低时延。
- 资源紧张场景可用蒸馏替代直接上线大模型。
- 生产评估必须同时覆盖离线精度和真实流量延迟。
- 大模型推理常使用 GPTQ、AWQ、GGUF 等专用量化方案。
常见误区
- 没有统一评估集和基线,就盲目做压缩。
- 以为压缩后文件更小,就一定更快。
- 忽略目标硬件是否支持相关量化算子。
- 只看实验室延迟,不看生产长时间运行稳定性。
- 量化精度下降后直接放弃,没有尝试 QAT 或校准数据优化。
进阶路线
- 学习静态量化、QAT(量化感知训练)与混合精度策略。
- 研究 TensorRT、OpenVINO、CoreML、TFLite 等部署引擎。
- 结合蒸馏、剪枝、低秩分解等多策略联合压缩。
- 为大模型推理建立端到端性能 profiling 流程。
- 学习 GPTQ、AWQ、GGUF 等大模型专用量化方法。
适用场景
- 边缘设备、移动端、IoT 推理部署。
- 高并发在线推理服务。
- GPU/CPU 资源紧张的生产环境。
- 模型体积或冷启动受限的场景。
落地建议
- 先明确瓶颈是模型大小、显存、延迟还是吞吐。
- 压缩前后用同一套验证集与基准脚本做对比。
- 在目标硬件上测真实收益,不只看本地实验结果。
- 为每个压缩版本记录精度、时延、兼容性和回滚方案。
排错清单
- 检查压缩后精度下降是否超出业务容忍范围。
- 检查目标部署框架是否支持相应量化/剪枝算子。
- 检查延迟瓶颈究竟在模型计算还是数据预处理/后处理。
- 检查压缩模型导出后是否存在数值不一致或推理异常。
- 检查量化校准数据是否具有代表性。
复盘问题
- 你做压缩,是为了减小体积、降低延迟,还是节约硬件成本?
- 当前压缩收益,是否真的来自模型本身而不是别的瓶颈优化?
- 哪种压缩策略最适合你的目标硬件?
- 如果效果下降 1%,换来 40% 时延下降,这个权衡值不值得?
- 压缩后的模型是否有回滚到原始模型的方案?
