PyTorch 深度学习框架
大约 11 分钟约 3155 字
PyTorch 深度学习框架
简介
PyTorch 是 Facebook 开源的深度学习框架,以动态计算图和 Python 风格 API 著称。相比 TensorFlow,PyTorch 调试更直观,API 更简洁,是学术界和工业界最流行的深度学习框架之一。PyTorch 2.0 引入了 torch.compile 和 Dynamo 编译器,进一步提升了性能。
PyTorch 的核心设计理念是"Pythonic":张量操作就像 NumPy 一样直观,自动求导通过 autograd 模块无缝集成,模型定义通过面向对象的 nn.Module 自然表达。这种设计让研究者能够快速验证想法,让工程师能够高效地构建和部署模型。PyTorch 的生态系统包括 torchvision(计算机视觉)、torchaudio(语音)、torchtext(文本)和 Hugging Face Transformers(NLP)等丰富的扩展库。
特点
Tensor 基础
创建和操作
import torch
import numpy as np
# 创建 Tensor
a = torch.tensor([1.0, 2.0, 3.0])
b = torch.zeros(3, 4) # 全零
c = torch.ones(2, 3) # 全一
d = torch.randn(3, 3) # 标准正态分布
e = torch.arange(0, 10, 2) # [0, 2, 4, 6, 8]
# 从 NumPy 转换
np_array = np.array([1, 2, 3])
tensor = torch.from_numpy(np_array)
back_to_numpy = tensor.numpy()
# GPU 支持
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
x = torch.randn(3, 3).to(device)
# 基本运算
x = torch.tensor([1.0, 2.0, 3.0])
y = torch.tensor([4.0, 5.0, 6.0])
print(x + y) # 加法
print(torch.dot(x, y)) # 点积
print(x * y) # 逐元素乘法
# 形状操作
matrix = torch.randn(2, 3, 4)
print(matrix.shape) # torch.Size([2, 3, 4])
print(matrix.view(2, 12).shape) # torch.Size([2, 12])
print(matrix.permute(2, 0, 1).shape) # torch.Size([4, 2, 3])
print(matrix.unsqueeze(1).shape) # torch.Size([2, 1, 3, 4])广播机制
# 广播机制:不同形状的 Tensor 可以自动扩展进行运算
a = torch.randn(3, 1) # (3, 1)
b = torch.randn(1, 4) # (1, 4)
c = a + b # (3, 4) 自动广播
print(f"a: {a.shape}, b: {b.shape}, a+b: {c.shape}")
# 常见广播规则:
# 1. 从最右边的维度开始逐维度比较
# 2. 两个维度相等,或其中一个为1,或其中一个不存在
# 3. 缺失的维度视为1Tensor 性能优化
# 内存布局和连续性
x = torch.randn(100, 100)
y = x.t() # 转置后不连续
print(f"x.is_contiguous(): {x.is_contiguous()}")
print(f"y.is_contiguous(): {y.is_contiguous()}")
# make_contiguous: 确保内存连续
y_cont = y.contiguous()
print(f"y_cont.is_contiguous(): {y_cont.is_contiguous()}")
# in-place 操作节省内存(但有风险)
x = torch.randn(1000, 1000)
x.add_(1) # in-place 加法
x.mul_(2) # in-place 乘法
x.zero_() # in-place 清零
print(f"in-place 操作修改原 Tensor: {x.sum().item():.1f}")
# 梯度相关
x = torch.randn(10, requires_grad=True)
y = x * 2
z = y.sum()
z.backward()
print(f"梯度: {x.grad}")自动求导
autograd 基础
# autograd 自动求导
x = torch.tensor([2.0, 3.0], requires_grad=True)
# 前向计算
y = x ** 2 # [4, 9]
z = y.sum() # 13
# 反向传播
z.backward()
print(x.grad) # 梯度: tensor([4., 6.]) 即 dz/dx = 2x计算图与梯度管理
# 梯度累积问题
x = torch.ones(5, requires_grad=True)
for epoch in range(3):
y = (x * 2).sum()
y.backward()
print(f"Epoch {epoch}: grad = {x.grad}")
# 解决方案:每次反向传播前清零梯度
x.grad.zero_()
for epoch in range(3):
y = (x * 2).sum()
y.backward()
print(f"Epoch {epoch}: grad (清零后) = {x.grad}")
x.grad.zero_()线性回归示例
# 线性回归示例
w = torch.randn(1, requires_grad=True)
b = torch.randn(1, requires_grad=True)
x_data = torch.tensor([1.0, 2.0, 3.0, 4.0])
y_data = torch.tensor([2.1, 3.9, 6.1, 8.0])
for epoch in range(100):
y_pred = w * x_data + b
loss = ((y_pred - y_data) ** 2).mean()
loss.backward()
with torch.no_grad():
w -= 0.01 * w.grad
b -= 0.01 * b.grad
w.grad.zero_()
b.grad.zero_()
print(f"训练后 w={w.item():.4f}, b={b.item():.4f}")自定义 autograd 函数
class MyReLU(torch.autograd.Function):
"""
自定义 autograd 函数:
实现 ReLU 的前向和反向传播
"""
@staticmethod
def forward(ctx, input_tensor):
# 保存前向传播中需要的信息
ctx.save_for_backward(input_tensor)
output = input_tensor.clamp(min=0)
return output
@staticmethod
def backward(ctx, grad_output):
# 反向传播:计算梯度
input_tensor, = ctx.saved_tensors
grad_input = grad_output.clone()
grad_input[input_tensor < 0] = 0
return grad_input
# 使用自定义函数
x = torch.randn(5, requires_grad=True)
relu = MyReLU.apply
y = relu(x)
y.sum().backward()
print(f"ReLU 梯度: {x.grad}")模型定义
nn.Module
import torch.nn as nn
# 方式1:继承 nn.Module
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28 * 28, 512),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(512, 256),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(256, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
model = NeuralNetwork()
print(model)
# 参数统计
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"总参数: {total_params:,}, 可训练: {trainable_params:,}")参数管理
# 参数管理
for name, param in model.named_parameters():
print(f"{name}: {param.shape}, requires_grad={param.requires_grad}")
# 冻结/解冻参数
for name, param in model.named_parameters():
if "layer3" in name:
param.requires_grad = False
# 查看模型各层输出形状
def print_model_shapes(model, input_shape):
"""打印模型各层输出的形状"""
x = torch.randn(input_shape)
print(f"输入: {x.shape}")
for name, module in model.named_children():
x = module(x)
print(f" {name}: {x.shape}")
print_model_shapes(model, (1, 28, 28))自定义层
# 自定义注意力层
class SelfAttention(nn.Module):
def __init__(self, embed_dim, num_heads):
super().__init__()
self.attention = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
self.norm = nn.LayerNorm(embed_dim)
def forward(self, x):
attn_output, _ = self.attention(x, x, x)
return self.norm(x + attn_output) # 残差连接 + 归一化
# 自定义残差块
class ResidualBlock(nn.Module):
def __init__(self, dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(dim, dim),
nn.GELU(),
nn.Linear(dim, dim)
)
self.norm = nn.LayerNorm(dim)
def forward(self, x):
return self.norm(x + self.net(x)) # Pre-Norm 残差
# 多头注意力手动实现
class ManualMultiHeadAttention(nn.Module):
"""手动实现多头注意力,帮助理解内部机制"""
def __init__(self, embed_dim=512, num_heads=8):
super().__init__()
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads
self.q_proj = nn.Linear(embed_dim, embed_dim)
self.k_proj = nn.Linear(embed_dim, embed_dim)
self.v_proj = nn.Linear(embed_dim, embed_dim)
self.out_proj = nn.Linear(embed_dim, embed_dim)
self.scale = self.head_dim ** -0.5
def forward(self, x, mask=None):
B, N, C = x.shape
q = self.q_proj(x).reshape(B, N, self.num_heads, self.head_dim).transpose(1, 2)
k = self.k_proj(x).reshape(B, N, self.num_heads, self.head_dim).transpose(1, 2)
v = self.v_proj(x).reshape(B, N, self.num_heads, self.head_dim).transpose(1, 2)
attn = (q @ k.transpose(-2, -1)) * self.scale
if mask is not None:
attn = attn.masked_fill(mask == 0, float('-inf'))
attn = attn.softmax(dim=-1)
out = (attn @ v).transpose(1, 2).reshape(B, N, C)
return self.out_proj(out)
attn = ManualMultiHeadAttention()
x = torch.randn(2, 10, 512)
out = attn(x)
print(f"多头注意力输出: {out.shape}")数据加载
DataLoader
from torch.utils.data import Dataset, DataLoader
# 自定义数据集
class TextDataset(Dataset):
def __init__(self, texts, labels, tokenizer, max_len=128):
self.texts = texts
self.labels = labels
self.tokenizer = tokenizer
self.max_len = max_len
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
label = self.labels[idx]
encoding = self.tokenizer(
text, max_length=self.max_len,
padding='max_length', truncation=True,
return_tensors='pt'
)
return {
'input_ids': encoding['input_ids'].squeeze(),
'attention_mask': encoding['attention_mask'].squeeze(),
'label': torch.tensor(label)
}
# 使用
# dataset = TextDataset(texts, labels, tokenizer)
# loader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=4)数据采样策略
from torch.utils.data import WeightedRandomSampler
# 处理类别不平衡
def create_balanced_sampler(labels):
"""为类别不平衡的数据集创建加权采样器"""
from collections import Counter
class_counts = Counter(labels)
total = len(labels)
class_weights = {cls: total / count for cls, count in class_counts.items()}
sample_weights = [class_weights[label] for label in labels]
return WeightedRandomSampler(sample_weights, len(sample_weights))
print("加权采样器已定义——解决类别不平衡问题")训练循环
完整训练循环
def train_model(model, train_loader, val_loader, epochs=10, lr=1e-3):
"""
完整的训练循环:
包含训练、验证、早停和最佳模型保存
"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
criterion = nn.CrossEntropyLoss()
best_val_loss = float('inf')
patience = 3
patience_counter = 0
for epoch in range(epochs):
# 训练阶段
model.train()
train_loss = 0
train_correct = 0
for batch in train_loader:
data, target = batch['input_ids'].to(device), batch['label'].to(device)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
train_loss += loss.item()
train_correct += (output.argmax(1) == target).sum().item()
# 验证阶段
model.eval()
val_loss = 0
val_correct = 0
with torch.no_grad():
for batch in val_loader:
data, target = batch['input_ids'].to(device), batch['label'].to(device)
output = model(data)
loss = criterion(output, target)
val_loss += loss.item()
val_correct += (output.argmax(1) == target).sum().item()
scheduler.step()
# 日志
print(f"Epoch {epoch+1}/{epochs}")
print(f" Train Loss: {train_loss/len(train_loader):.4f}, Acc: {train_correct/len(train_loader.dataset):.4f}")
print(f" Val Loss: {val_loss/len(val_loader):.4f}, Acc: {val_correct/len(val_loader.dataset):.4f}")
# 早停
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
torch.save(model.state_dict(), 'best_model.pth')
else:
patience_counter += 1
if patience_counter >= patience:
print(f"Early stopping at epoch {epoch+1}")
break
return model
print("完整训练循环已定义")学习率调度策略
import matplotlib
import matplotlib.pyplot as plt
# 不同学习率调度策略对比
def compare_schedulers():
"""对比不同学习率调度策略"""
epochs = 100
lr = 0.01
# 1. StepLR
scheduler_step = torch.optim.lr_scheduler.StepLR(
torch.optim.SGD([torch.tensor(1.0)], lr=lr), step_size=30, gamma=0.1
)
lrs_step = []
for _ in range(epochs):
lrs_step.append(scheduler_step.get_last_lr()[0])
scheduler_step.step()
# 2. CosineAnnealingLR
scheduler_cos = torch.optim.lr_scheduler.CosineAnnealingLR(
torch.optim.SGD([torch.tensor(1.0)], lr=lr), T_max=epochs
)
lrs_cos = []
for _ in range(epochs):
lrs_cos.append(scheduler_cos.get_last_lr()[0])
scheduler_cos.step()
# 3. OneCycleLR(推荐)
scheduler_oc = torch.optim.lr_scheduler.OneCycleLR(
torch.optim.SGD([torch.tensor(1.0)], lr=lr),
max_lr=0.1, total_steps=epochs
)
lrs_oc = []
for _ in range(epochs):
lrs_oc.append(scheduler_oc.get_last_lr()[0])
scheduler_oc.step()
print(f"StepLR: 初始={lrs_step[0]:.4f}, 最终={lrs_step[-1]:.6f}")
print(f"Cosine: 初始={lrs_cos[0]:.4f}, 最终={lrs_cos[-1]:.6f}")
print(f"OneCycle: 初始={lrs_oc[0]:.4f}, 最终={lrs_oc[-1]:.6f}")
compare_schedulers()模型保存与导出
保存和加载
# 保存整个模型
torch.save(model.state_dict(), 'model_weights.pth')
# 加载权重
model = NeuralNetwork()
model.load_state_dict(torch.load('model_weights.pth'))
model.eval()
# 导出为 ONNX
dummy_input = torch.randn(1, 28, 28)
torch.onnx.export(
model, dummy_input, 'model.onnx',
input_names=['input'],
output_names=['output'],
dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}
)TorchScript
# TorchScript 导出(比 ONNX 更适合纯 PyTorch 部署)
scripted_model = torch.jit.script(model)
traced_model = torch.jit.trace(model, torch.randn(1, 28, 28))
scripted_model.save("model_scripted.pt")
loaded_model = torch.jit.load("model_scripted.pt")
print("TorchScript 模型已保存和加载")混合精度训练
from torch.cuda.amp import GradScaler, autocast
def train_mixed_precision(model, loader, epochs=5):
"""混合精度训练:使用 FP16 加速,FP32 保持精度"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
scaler = GradScaler()
criterion = nn.CrossEntropyLoss()
for epoch in range(epochs):
model.train()
for batch in loader:
data, target = batch['input_ids'].to(device), batch['label'].to(device)
optimizer.zero_grad()
with autocast():
output = model(data)
loss = criterion(output, target)
scaler.scale(loss).backward()
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
scaler.step(optimizer)
scaler.update()
print(f"Epoch {epoch+1} 完成")
print("混合精度训练函数已定义")常见损失函数
# 常用损失函数
losses = {
"MSE": nn.MSELoss(), # 回归任务
"CrossEntropy": nn.CrossEntropyLoss(), # 多分类
"BCEWithLogits": nn.BCEWithLogitsLoss(), # 二分类(带 sigmoid)
"NLLLoss": nn.NLLLoss(), # 负对数似然(配合 log_softmax)
"KLDiv": nn.KLDivLoss(), # KL 散度(知识蒸馏)
"SmoothL1": nn.SmoothL1Loss(), # Smooth L1(目标检测)
"CTC": nn.CTCLoss(), # 序列标注(OCR、ASR)
}
for name, loss_fn in losses.items():
print(f"{name}: {loss_fn}")优点
缺点
总结
PyTorch 核心:Tensor(张量运算)、autograd(自动求导)、nn.Module(模型定义)。模型用 class 继承 nn.Module,forward 方法定义前向传播。训练循环:optimizer.zero_grad() → loss.backward() → optimizer.step()。数据加载用 Dataset + DataLoader。保存用 state_dict,部署用 ONNX 导出。建议从 torchvision 预训练模型入手做迁移学习。
关键知识点
- 先分清模型能力边界、数据边界和工程边界。
- 任何 AI 主题都不只看效果,还要看延迟、成本、可解释性和安全性。
- 评估方式和失败样例往往比"换哪个模型"更重要。
- 梯度管理(清零、裁剪)是训练稳定性的关键。
- 混合精度训练可以在支持 Tensor Core 的 GPU 上显著加速。
项目落地视角
- 给数据来源、Prompt 模板、Embedding 版本、评估集和实验结果做版本管理。
- 上线前准备兜底策略,例如拒答、回退、人工审核或缓存降级。
- 观察错误类型时,区分数据问题、召回问题、提示词问题和模型问题。
常见误区
- 只关注 Demo 效果,不考虑线上稳定性和可复现性。
- 没有评估集就频繁调参,最后无法解释为什么变好或变差。
- 忽略权限、审计、隐私和模型输出的安全边界。
- 忘记在训练循环中清零梯度或设置 model.eval()。
进阶路线
- 继续补齐训练、推理、评估、MLOps 和治理链路。
- 把主题放回真实业务流程,思考谁提供数据、谁消费结果、谁负责兜底。
- 把 PoC 逐步升级到可观测、可回滚、可演进的生产方案。
- 学习分布式训练(DDP、FSDP)和大规模模型训练技术。
适用场景
- 当你准备把《PyTorch 深度学习框架》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合企业知识问答、内容生成、分类抽取和智能助手等场景。
- 当需求同时关注效果、时延、成本和安全边界时,这类主题最有价值。
落地建议
- 先定义评估集、成功标准和失败样例,再开始调模型或调提示。
- 把数据来源、分块方式、Embedding 版本和 Prompt 模板纳入版本管理。
- 上线前准备兜底策略,例如拒答、回退、人工审核或检索降级。
排错清单
- 先判断问题出在数据、检索、Prompt、模型还是后处理。
- 检查上下文是否过长、分块是否过碎或召回是否偏题。
- 对错误回答做分类,区分幻觉、事实过时、指令误解和格式错误。
复盘问题
- 如果把《PyTorch 深度学习框架》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《PyTorch 深度学习框架》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《PyTorch 深度学习框架》最大的收益和代价分别是什么?
