RNN 与 LSTM
RNN 与 LSTM
简介
循环神经网络(Recurrent Neural Network,RNN)及其变体 LSTM、GRU 是处理序列数据的基础架构。通过隐状态的循环传递,RNN 能够建模序列中的时序依赖关系。虽然 Transformer 在多数任务上已取代 RNN,但 RNN/LSTM 在资源受限的在线推理、流式处理等场景中仍有实际价值。
RNN 的核心思想源自对序列数据的建模需求:与前馈网络假设输入独立同分布不同,序列数据(文本、语音、时序信号)具有强时序依赖关系,当前时刻的状态不仅取决于当前输入,还取决于历史状态。RNN 通过引入隐状态(hidden state)来维护这种历史信息。
LSTM(Long Short-Term Memory)由 Hochreiter 和 Schmidhuber 在 1997 年提出,专门解决原始 RNN 的梯度消失问题。GRU(Gated Recurrent Unit)由 Cho 等人在 2014 年提出,是 LSTM 的简化版本。在 2017 年 Transformer 出现之前,LSTM 和 GRU 几乎统治了所有序列建模任务。
从理论角度看,RNN 的计算过程可以看作一个动态系统(Dynamical System):隐状态在状态空间中的轨迹由输入序列和模型参数共同决定。LSTM 通过门控机制实现了对这个动态系统的更精细控制,使其能够学习长期依赖而不会受到短期噪声的干扰。
特点
序列建模的本质
序列建模的核心挑战在于如何有效利用历史信息。RNN 通过递归公式 h_t = f(h_{t-1}, x_t) 实现这一点,其中 f 是一个非线性函数。这个递归结构使得 RNN 具有理论上的图灵完备性——给定足够大的隐状态维度,RNN 可以模拟任何计算过程。
然而,实际训练中的梯度传播是有限的。对于 T 步的序列,梯度需要经过 T 次矩阵乘法才能从最后一步传回第一步。如果隐状态到隐状态的转移矩阵的最大特征值大于 1,梯度会指数增长(梯度爆炸);如果小于 1,梯度会指数衰减(梯度消失)。LSTM 的加法更新机制(而非 RNN 的纯乘法更新)为梯度提供了一条"高速公路",使得梯度可以在较长的序列中传播。
实现
# 示例1:手动实现 RNN 前向传播
import torch
class SimpleRNN:
def __init__(self, input_size, hidden_size):
self.W_ih = torch.randn(hidden_size, input_size) * 0.01
self.W_hh = torch.randn(hidden_size, hidden_size) * 0.01
self.b = torch.zeros(hidden_size)
def forward(self, x_seq):
"""x_seq: (seq_len, input_size)"""
h = torch.zeros(self.W_hh.shape[0])
outputs = []
for t in range(x_seq.shape[0]):
h = torch.tanh(x_seq[t] @ self.W_ih.T + h @ self.W_hh.T + self.b)
outputs.append(h)
return torch.stack(outputs), h
rnn = SimpleRNN(input_size=10, hidden_size=32)
x = torch.randn(20, 10) # 序列长度20,特征维度10
outputs, last_h = rnn.forward(x)
print(f"每个时间步输出: {outputs.shape}, 最后隐状态: {last_h.shape}")# 示例2:使用 PyTorch LSTM 进行文本分类
import torch
import torch.nn as nn
class LSTMClassifier(nn.Module):
def __init__(self, vocab_size, embed_dim, hidden_dim, num_classes, num_layers=2, dropout=0.3):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
self.lstm = nn.LSTM(
embed_dim, hidden_dim,
num_layers=num_layers,
batch_first=True,
bidirectional=True,
dropout=dropout,
)
self.classifier = nn.Sequential(
nn.Dropout(dropout),
nn.Linear(hidden_dim * 2, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, num_classes),
)
def forward(self, x):
embeds = self.embedding(x) # (B, L, E)
output, (h_n, c_n) = self.lstm(embeds) # output: (B, L, 2H)
# 取双向最后一个隐状态拼接
h_forward = h_n[-2] # 正向最后一层
h_backward = h_n[-1] # 反向最后一层
h_cat = torch.cat([h_forward, h_backward], dim=-1) # (B, 2H)
return self.classifier(h_cat)
model = LSTMClassifier(vocab_size=5000, embed_dim=128, hidden_dim=256, num_classes=2)
dummy_input = torch.randint(0, 5000, (4, 50))
print(f"分类输出: {model(dummy_input).shape}")
print(f"参数量: {sum(p.numel() for p in model.parameters()):,}")# 示例3:LSTM 门控机制详解
class LSTMCellManual(nn.Module):
"""手动实现 LSTM 单元,理解门控的作用"""
def __init__(self, input_size, hidden_size):
super().__init__()
self.hidden_size = hidden_size
# 遗忘门
self.W_f = nn.Linear(input_size + hidden_size, hidden_size)
# 输入门
self.W_i = nn.Linear(input_size + hidden_size, hidden_size)
# 候选记忆
self.W_g = nn.Linear(input_size + hidden_size, hidden_size)
# 输出门
self.W_o = nn.Linear(input_size + hidden_size, hidden_size)
def forward(self, x, state):
h, c = state
combined = torch.cat([x, h], dim=-1)
f = torch.sigmoid(self.W_f(combined)) # 遗忘门:决定遗忘多少旧记忆
i = torch.sigmoid(self.W_i(combined)) # 输入门:决定写入多少新信息
g = torch.tanh(self.W_g(combined)) # 候选新记忆
o = torch.sigmoid(self.W_o(combined)) # 输出门:决定输出多少记忆
c_new = f * c + i * g # 更新细胞状态
h_new = o * torch.tanh(c_new) # 更新隐状态
return h_new, c_new
cell = LSTMCellManual(input_size=32, hidden_size=64)
x_t = torch.randn(2, 32)
h = torch.zeros(2, 64)
c = torch.zeros(2, 64)
h_new, c_new = cell(x_t, (h, c))
print(f"新隐状态: {h_new.shape}, 新细胞状态: {c_new.shape}")# 示例4:梯度消失对比实验(RNN vs LSTM)
import torch.nn as nn
def measure_gradient_flow(model_class, seq_len=100, input_size=32, hidden_size=64):
"""对比 RNN 和 LSTM 在长序列上的梯度传播"""
x = torch.randn(1, seq_len, input_size)
model = model_class(input_size, hidden_size, batch_first=True)
output, _ = model(x)
output.sum().backward()
grad_norms = []
for name, param in model.named_parameters():
if param.grad is not None:
grad_norms.append((name, param.grad.norm().item()))
return grad_norms
rnn_grads = measure_gradient_flow(nn.RNN, seq_len=100)
lstm_grads = measure_gradient_flow(nn.LSTM, seq_len=100)
print("=== RNN 梯度范数 ===")
for name, norm in rnn_grads:
print(f" {name}: {norm:.6f}")
print("=== LSTM 梯度范数 ===")
for name, norm in lstm_grads:
print(f" {name}: {norm:.6f}")深入理解:RNN 的梯度问题
梯度消失与爆炸的数学分析
import torch
import torch.nn as nn
def analyze_gradient_flow(seq_lengths=[10, 50, 100, 200, 500]):
"""分析不同序列长度下 RNN 和 LSTM 的梯度流动情况
RNN 的梯度传播公式:
∂L/∂h_0 = ∏_{t=1}^{T} ∂h_t/∂h_{t-1} × ∂L/∂h_T
其中 ∂h_t/∂h_{t-1} = W_hh × diag(tanh'(·))
如果 W_hh 的最大奇异值 > 1,梯度指数增长(爆炸)
如果 W_hh 的最大奇异值 < 1,梯度指数衰减(消失)
LSTM 的梯度通过加法路径传播:
∂c_T/∂c_0 = ∏_{t=1}^{T} f_t
其中 f_t 是遗忘门(sigmoid 输出),值在 (0, 1) 之间
即使大多数 f_t < 1,只要某些 f_t ≈ 1,梯度就能在对应路径上保持
"""
print(f"{'序列长度':>8s} | {'RNN梯度范数':>12s} | {'LSTM梯度范数':>12s} | {'GRU梯度范数':>12s}")
print("-" * 60)
for seq_len in seq_lengths:
# RNN
rnn = nn.RNN(32, 64, batch_first=True)
x = torch.randn(1, seq_len, 32)
out, _ = rnn(x)
out.sum().backward()
rnn_grad = max(p.grad.norm().item() for p in rnn.parameters() if p.grad is not None)
# LSTM
lstm = nn.LSTM(32, 64, batch_first=True)
x = torch.randn(1, seq_len, 32)
out, _ = lstm(x)
out.sum().backward()
lstm_grad = max(p.grad.norm().item() for p in lstm.parameters() if p.grad is not None)
# GRU
gru = nn.GRU(32, 64, batch_first=True)
x = torch.randn(1, seq_len, 32)
out, _ = gru(x)
out.sum().backward()
gru_grad = max(p.grad.norm().item() for p in gru.parameters() if p.grad is not None)
print(f"{seq_len:>8d} | {rnn_grad:>12.6f} | {lstm_grad:>12.6f} | {gru_grad:>12.6f}")
analyze_gradient_flow()梯度裁剪的正确使用
import torch
import torch.nn as nn
class TrainingWithGradientClipping:
"""梯度裁剪是训练 RNN 的标配技巧
三种梯度裁剪方式:
1. 按范数裁剪(推荐):如果梯度范数超过阈值,按比例缩放
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
2. 按值裁剪:将每个梯度元素限制在 [-clip_value, clip_value]
torch.nn.utils.clip_grad_value_(model.parameters(), clip_value)
3. 自适应裁剪:根据历史梯度统计动态调整裁剪阈值
max_norm 的典型取值:
- 1.0: 比较激进,适合训练不稳定的情况
- 5.0: 适中,大多数情况下的默认选择
- 10.0: 比较宽松,适合已经比较稳定的训练
"""
@staticmethod
def train_step(model, x, y, optimizer, criterion, max_norm=5.0):
model.train()
optimizer.zero_grad()
output, _ = model(x)
loss = criterion(output, y)
loss.backward()
# 训练前检查梯度范数
total_norm = torch.sqrt(
sum(p.grad.norm().item() ** 2 for p in model.parameters() if p.grad is not None)
)
print(f"裁剪前梯度范数: {total_norm:.4f}")
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=max_norm)
# 裁剪后检查梯度范数
clipped_norm = torch.sqrt(
sum(p.grad.norm().item() ** 2 for p in model.parameters() if p.grad is not None)
)
print(f"裁剪后梯度范数: {clipped_norm:.4f}")
optimizer.step()
return loss.item()
# 实际使用示例
model = nn.LSTM(32, 64, num_layers=2, batch_first=True)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
x = torch.randn(8, 50, 32) # batch=8, seq_len=50, features=32
y = torch.randint(0, 10, (8,)) # 分类标签
# step = TrainingWithGradientClipping.train_step(model, x, y, optimizer, criterion)深入理解:GRU 与 LSTM 的对比
import torch
import torch.nn as nn
class GRUCellManual(nn.Module):
"""GRU 的手动实现,与 LSTM 对比
GRU 只有两个门:
1. 重置门 (reset gate, r):决定是否忽略之前的隐状态
2. 更新门 (update gate, z):决定更新多少新信息(类似 LSTM 的遗忘门+输入门的合并)
GRU 的关键简化:
- 没有独立的细胞状态(cell state),只用隐状态
- 遗忘门和输入门合并为更新门
- 重置门应用于候选隐状态的计算
GRU vs LSTM 选择建议:
- 数据量少、需要快速迭代:GRU(参数更少)
- 序列较长、需要精确记忆控制:LSTM(独立的 cell state)
- 实际上在很多任务上两者性能接近,可以都试一下
"""
def __init__(self, input_size, hidden_size):
super().__init__()
# 更新门
self.W_z = nn.Linear(input_size + hidden_size, hidden_size)
# 重置门
self.W_r = nn.Linear(input_size + hidden_size, hidden_size)
# 候选隐状态
self.W_h = nn.Linear(input_size + hidden_size, hidden_size)
def forward(self, x, h):
combined = torch.cat([x, h], dim=-1)
z = torch.sigmoid(self.W_z(combined)) # 更新门
r = torch.sigmoid(self.W_r(combined)) # 重置门
h_candidate = torch.tanh(self.W_h(torch.cat([x, r * h], dim=-1)))
h_new = (1 - z) * h + z * h_candidate # 更新隐状态
return h_new
# 参数量对比
def compare_params(input_size=128, hidden_size=256):
"""对比 LSTM 和 GRU 的参数量"""
lstm = nn.LSTMCell(input_size, hidden_size)
gru = nn.GRUCell(input_size, hidden_size)
lstm_params = sum(p.numel() for p in lstm.parameters())
gru_params = sum(p.numel() for p in gru.parameters())
print(f"输入维度: {input_size}, 隐状态维度: {hidden_size}")
print(f"LSTM 单元参数量: {lstm_params:,}")
print(f"GRU 单元参数量: {gru_params:,}")
print(f"GRU 比 LSTM 少: {lstm_params - gru_params:,} 参数 ({(1 - gru_params/lstm_params):.1%})")
compare_params()深入理解:变长序列处理
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
class VariableLengthSequenceHandler:
"""处理变长序列的标准方法
在实际 NLP 任务中,每个 batch 中的句子长度不同。
需要填充(padding)到相同长度,但这会带来问题:
1. padding token 参与计算,浪费算力
2. LSTM 的隐状态会受 padding 的影响
3. 池化操作可能被 padding 值干扰
PyTorch 的解决方案:pack_padded_sequence
1. 先对序列按长度降序排列
2. 将填充后的序列"打包"(pack),记录真实长度
3. LSTM 直接处理 packed 序列,自动跳过 padding
4. 解包(unpack)恢复为填充后的输出
"""
@staticmethod
def demo():
# 模拟 3 个不同长度的句子(已转为词向量)
sentences = [
torch.randn(10, 128), # 句子1: 10个词
torch.randn(5, 128), # 句子2: 5个词
torch.randn(8, 128), # 句子3: 8个词
]
lengths = torch.tensor([10, 5, 8])
# 填充到相同长度
padded = pad_sequence(sentences, batch_first=True, padding_value=0)
print(f"填充后形状: {padded.shape}") # (3, 10, 128)
# 打包序列
packed = pack_padded_sequence(padded, lengths.cpu(), batch_first=True, enforce_sorted=True)
print(f"打包后数据形状: {packed.data.shape}") # 总词数 × 128
print(f"batch_sizes: {packed.batch_sizes}") # 每个时间步的有效样本数
# LSTM 处理打包序列
lstm = nn.LSTM(128, 256, batch_first=True, bidirectional=True)
packed_output, (h_n, c_n) = lstm(packed)
# 解包恢复形状
output, _ = pad_packed_sequence(packed_output, batch_first=True)
print(f"解包后输出形状: {output.shape}") # (3, 10, 512)
VariableLengthSequenceHandler.demo()正确获取变长序列的隐状态
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
class CorrectHiddenStateExtraction(nn.Module):
"""正确处理变长序列的隐状态提取
常见错误:直接取 output[:, -1, :] 作为最后一个隐状态
这在有 padding 的情况下是错误的,因为 -1 位置可能是 padding
正确方法:
1. 使用 gather 根据实际长度索引
2. 使用 pack_padded_sequence 后的 h_n(已经是真实的最后隐状态)
"""
def __init__(self, input_size, hidden_size):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
def forward(self, padded_input, lengths):
# 方法1:使用 pack/unpack(推荐)
packed = pack_padded_sequence(padded_input, lengths.cpu(), batch_first=True, enforce_sorted=False)
packed_output, (h_n, c_n) = self.lstm(packed)
output, _ = pad_packed_sequence(packed_output, batch_first=True)
# h_n 已经是真实的最后有效隐状态
return output, h_n.squeeze(0)
@staticmethod
def method_gather(output, lengths):
"""方法2:使用 gather 根据实际长度提取最后一个有效位置"""
batch_size = output.shape[0]
# lengths: [10, 5, 8] -> last_indices: [9, 4, 7]
last_indices = lengths - 1
# 扩展为 (batch, 1, hidden)
last_indices = last_indices.view(-1, 1, 1).expand(-1, 1, output.shape[-1])
# 从 output 中收集对应位置的隐状态
last_hidden = output.gather(1, last_indices).squeeze(1) # (batch, hidden)
return last_hidden
model = CorrectHiddenStateExtraction(128, 256)
sentences = [torch.randn(l, 128) for l in [10, 5, 8]]
lengths = torch.tensor([10, 5, 8])
padded = pad_sequence(sentences, batch_first=True, padding_value=0)
output, h_n = model(padded, lengths)
print(f"输出形状: {output.shape}, 最后隐状态: {h_n.shape}")深入理解:多层 LSTM 与双向 LSTM
import torch
import torch.nn as nn
class DeepBidirectionalLSTM(nn.Module):
"""多层双向 LSTM 的详解
多层 LSTM:堆叠多个 LSTM 层
- 第1层的输出作为第2层的输入
- 每层可以捕获不同层次的特征
- 通常 2-3 层就足够,更多层收益递减
双向 LSTM:同时从前向后和从后向前处理序列
- 正向 LSTM: h_forward[t] = f(h_forward[t-1], x[t])
- 反向 LSTM: h_backward[t] = g(h_backward[t+1], x[t])
- 最终输出: h[t] = [h_forward[t]; h_backward[t]](拼接)
双向 LSTM 的局限:
- 需要看到完整序列才能计算,不适合在线推理
- 在实时语音识别等场景中只能用单向
"""
def __init__(self, vocab_size, embed_dim, hidden_dim, num_classes):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
self.lstm = nn.LSTM(
embed_dim, hidden_dim,
num_layers=3, # 3层 LSTM
batch_first=True,
bidirectional=True, # 双向
dropout=0.3, # 层间 dropout
)
# 多种池化策略
self.attention_pool = nn.Sequential(
nn.Linear(hidden_dim * 2, 1),
)
def forward(self, x, lengths=None):
embeds = self.embedding(x)
output, (h_n, c_n) = self.lstm(embeds)
# output: (B, L, 2*H), h_n: (2*num_layers, B, H)
# 策略1:取最后一层双向隐状态拼接
h_forward = h_n[-2] # 正向第3层
h_backward = h_n[-1] # 反向第3层
h_last = torch.cat([h_forward, h_backward], dim=-1)
# 策略2:注意力加权池化
attn_weights = torch.softmax(self.attention_pool(output), dim=1) # (B, L, 1)
h_attn = (output * attn_weights).sum(dim=1) # (B, 2*H)
# 策略3:最大池化(适合分类任务)
h_max = output.max(dim=1)[0] # (B, 2*H)
return h_last, h_attn, h_max
model = DeepBidirectionalLSTM(5000, 128, 256, 10)
x = torch.randint(0, 5000, (4, 50))
h_last, h_attn, h_max = model(x)
print(f"最后隐状态: {h_last.shape}")
print(f"注意力池化: {h_attn.shape}")
print(f"最大池化: {h_max.shape}")
# 参数量分析
def analyze_lstm_params():
configs = [
("1层单向", 1, False),
("2层单向", 2, False),
("1层双向", 1, True),
("2层双向", 2, True),
("3层双向", 3, True),
]
for name, layers, bidirectional in configs:
lstm = nn.LSTM(128, 256, num_layers=layers, bidirectional=bidirectional)
params = sum(p.numel() for p in lstm.parameters())
direction_factor = 2 if bidirectional else 1
print(f"{name:>8s}: {params:>12,} 参数")
analyze_lstm_params()深入理解:LSTM 的实际应用场景
时间序列预测
import torch
import torch.nn as nn
class TimeSeriesForecaster(nn.Module):
"""基于 LSTM 的时间序列预测模型
应用场景:
- 股票价格预测
- 电力负荷预测
- 服务器流量预测
- 天气预报
关键设计考虑:
1. 输入归一化至关重要(时序数据通常量纲差异大)
2. 使用滑动窗口构建训练样本
3. 多变量输入时可以用不同的编码维度
"""
def __init__(self, input_dim, hidden_dim, output_dim, num_layers=2):
super().__init__()
self.encoder = nn.LSTM(input_dim, hidden_dim, num_layers=num_layers, batch_first=True)
self.decoder = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Linear(hidden_dim // 2, output_dim)
)
def forward(self, x):
# x: (B, seq_len, input_dim)
_, (h_n, _) = self.encoder(x)
# 用最后一层的隐状态做预测
h_last = h_n[-1] # (B, hidden_dim)
return self.decoder(h_last) # (B, output_dim)
# 使用示例
model = TimeSeriesForecaster(input_dim=5, hidden_dim=128, output_dim=1)
# 假设输入5个特征(温度、湿度、风速等),预测未来1个值
x = torch.randn(16, 72, 5) # batch=16, 过去72小时, 5个特征
pred = model(x)
print(f"预测输出: {pred.shape}") # (16, 1)
# 滑动窗口数据构建
def create_windows(data, window_size, horizon=1):
"""将时序数据切分为滑动窗口样本
data: (total_len, features)
window_size: 输入窗口长度
horizon: 预测步数
"""
X, y = [], []
for i in range(len(data) - window_size - horizon + 1):
X.append(data[i:i+window_size])
y.append(data[i+window_size:i+window_size+horizon])
return torch.stack(X), torch.stack(y)
# 多步预测:预测未来多个时间步
class MultiStepForecaster(nn.Module):
"""多步预测:自回归方式逐步预测未来多个时间步"""
def __init__(self, input_dim, hidden_dim, output_dim, num_layers=2, pred_steps=12):
super().__init__()
self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=num_layers, batch_first=True)
self.decoder = nn.Linear(hidden_dim, output_dim)
self.pred_steps = pred_steps
def forward(self, x):
# 编码阶段
_, (h_n, c_n) = self.lstm(x)
# 自回归解码
predictions = []
h, c = h_n[-1:], c_n[-1:] # 取最后一层
for _ in range(self.pred_steps):
pred = self.decoder(h[-1]) # (B, output_dim)
predictions.append(pred)
# 将预测结果作为下一步的输入
lstm_out, (h, c) = self.lstm(pred.unsqueeze(1), (h, c))
return torch.stack(predictions, dim=1) # (B, pred_steps, output_dim)
model = MultiStepForecaster(5, 128, 5, pred_steps=12)
x = torch.randn(16, 72, 5)
pred = model(x)
print(f"多步预测输出: {pred.shape}") # (16, 12, 5)文本生成
import torch
import torch.nn as nn
class LSTMTextGenerator(nn.Module):
"""基于 LSTM 的文本生成模型
工作流程:
1. 训练:给定一段文本,预测下一个字符/词
2. 生成:给定一个起始文本,自回归地生成后续文本
注意事项:
- 温度(temperature)控制生成文本的随机性
- Top-k 和 Top-p 采样可以避免生成低质量文本
- LSTM 生成容易陷入重复循环,需要注意多样性控制
"""
def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=2):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers=num_layers, batch_first=True)
self.fc = nn.Linear(hidden_dim, vocab_size)
def forward(self, x, hidden=None):
embeds = self.embedding(x)
output, hidden = self.lstm(embeds, hidden)
logits = self.fc(output)
return logits, hidden
def generate(self, start_tokens, max_length=100, temperature=1.0, top_k=50):
"""带温度和 top-k 采样的文本生成"""
self.eval()
tokens = start_tokens.clone()
hidden = None
with torch.no_grad():
for _ in range(max_length):
logits, hidden = self.forward(tokens[:, -1:], hidden)
logits = logits[:, -1, :] / temperature # 温度缩放
# Top-k 采样
if top_k > 0:
top_k_values, top_k_indices = torch.topk(logits, top_k)
probs = torch.softmax(top_k_values, dim=-1)
next_idx = torch.multinomial(probs, 1)
next_token = top_k_indices.gather(-1, next_idx)
else:
next_token = torch.multinomial(torch.softmax(logits, dim=-1), 1)
tokens = torch.cat([tokens, next_token], dim=1)
return tokens
generator = LSTMTextGenerator(vocab_size=10000, embed_dim=128, hidden_dim=256)
start = torch.randint(0, 10000, (1, 5)) # 起始 5 个 token
# generated = generator.generate(start, max_length=50, temperature=0.8, top_k=40)
print(f"生成器参数量: {sum(p.numel() for p in generator.parameters()):,}")深入理解:注意力机制与 RNN 的结合
import torch
import torch.nn as nn
class BahdanauAttention(nn.Module):
"""Bahdanau 注意力机制(加性注意力)
在 RNN 中引入注意力的经典方法。
与全局池化(取最后一个隐状态)不同,注意力机制在解码的每一步
都动态地关注编码器输出的不同位置。
公式:
score(h_t, h_s) = v^T tanh(W_1 h_t + W_2 h_s)
α_{t,s} = softmax(score(h_t, h_s))
context_t = Σ_s α_{t,s} × h_s
其中 h_t 是解码器的隐状态,h_s 是编码器的隐状态。
"""
def __init__(self, hidden_dim):
super().__init__()
self.W_query = nn.Linear(hidden_dim, hidden_dim)
self.W_key = nn.Linear(hidden_dim, hidden_dim)
self.v = nn.Linear(hidden_dim, 1)
def forward(self, query, keys, mask=None):
# query: (B, 1, H) 解码器隐状态
# keys: (B, L, H) 编码器输出序列
# mask: (B, L) padding mask
scores = self.v(torch.tanh(self.W_query(query) + self.W_key(keys)))
scores = scores.squeeze(-1) # (B, L)
if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))
attn_weights = torch.softmax(scores, dim=-1) # (B, L)
context = torch.bmm(attn_weights.unsqueeze(1), keys) # (B, 1, H)
return context, attn_weights
class Seq2SeqWithAttention(nn.Module):
"""带注意力的序列到序列模型
典型应用:机器翻译、文本摘要、对话系统
"""
def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=2):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
self.encoder = nn.LSTM(embed_dim, hidden_dim, num_layers=num_layers, batch_first=True, bidirectional=True)
self.decoder = nn.LSTM(embed_dim + hidden_dim * 2, hidden_dim, num_layers=num_layers, batch_first=True)
self.attention = BahdanauAttention(hidden_dim * 2)
self.output_proj = nn.Linear(hidden_dim, vocab_size)
def forward(self, src, tgt, src_lengths=None):
# 编码
src_embeds = self.embedding(src)
encoder_outputs, (h_n, c_n) = self.encoder(src_embeds)
# 初始化解码器状态
decoder_h = h_n[-2:].transpose(0, 1) # (B, 2, H) 取最后一层双向
decoder_c = c_n[-2:].transpose(0, 1)
# 解码(自回归)
tgt_embeds = self.embedding(tgt)
decoder_outputs = []
hidden = (decoder_h, decoder_c)
for t in range(tgt.shape[1] - 1):
query = hidden[0][:, -1:, :] # (B, 1, H)
context, attn_weights = self.attention(query, encoder_outputs)
decoder_input = torch.cat([tgt_embeds[:, t:t+1, :], context], dim=-1)
output, hidden = self.decoder(decoder_input, hidden)
decoder_outputs.append(self.output_proj(output))
return torch.cat(decoder_outputs, dim=1)
model = Seq2SeqWithAttention(vocab_size=5000, embed_dim=128, hidden_dim=256)
src = torch.randint(0, 5000, (4, 20))
tgt = torch.randint(0, 5000, (4, 15))
print(f"模型参数量: {sum(p.numel() for p in model.parameters()):,}")深入理解:RNN 的优化技巧
正则化策略
import torch
import torch.nn as nn
class RNNRegularization:
"""RNN 的正则化方法
1. Dropout: 在 RNN 中需要特殊处理
- 标准 Dropout 不能直接应用于时间步之间
- PyTorch 的 dropout 参数只在层与层之间应用
- 变分 Dropout (Variational Dropout): 同一序列内使用相同的 dropout mask
实现:使用相同的 dropout mask 应用于所有时间步
2. Embedding Dropout: 直接对词向量做 dropout
比 word dropout 更简单有效
3. Lockdrop / Zoneout: 更高级的 RNN 正则化
- Zoneout: 随机将隐状态替换为前一步的隐状态(而非零)
- 效果比标准 Dropout 更好,因为它保持了隐状态的连续性
4. 权重正则化: L2 正则化(weight decay)仍然有效
"""
@staticmethod
def variational_dropout(rnn_output, dropout_rate=0.3):
"""变分 Dropout:同一序列内使用相同的 mask
标准 Dropout 每个时间步使用不同的 mask,这可能导致隐状态的不连续。
变分 Dropout 使用同一个 mask 应用于所有时间步,保持一致性。
"""
if not rnn_output.requires_grad or dropout_rate == 0:
return rnn_output
# 生成 (B, H) 的 mask,然后广播到 (B, T, H)
mask = (torch.rand(rnn_output.size(0), rnn_output.size(2),
device=rnn_output.device) > dropout_rate).float()
mask = mask.unsqueeze(1) # (B, 1, H)
return rnn_output * mask / (1 - dropout_rate)
@staticmethod
def embedding_dropout(embedding, dropout_rate=0.1):
"""Embedding Dropout: 随机将词向量置零
在 embedding 层之后应用,简单有效
"""
if dropout_rate == 0:
return embedding
mask = (torch.rand(embedding.shape[:2], device=embedding.device) > dropout_rate).float()
return embedding * mask.unsqueeze(-1)学习率调度
import torch
import torch.nn as nn
def get_rnn_training_config():
"""RNN 训练的推荐配置
优化器选择:
- Adam: 最常用,对学习率不太敏感
- AdamW: 带 weight decay 的 Adam,推荐
- SGD + Momentum: 在某些任务上收敛更好,但需要仔细调参
学习率调度:
- ReduceLROnPlateau: 监控验证集 loss,不下降时降低学习率
- CosineAnnealing: 周期性调整学习率
- OneCycleLR: 先升后降,训练速度更快
"""
configs = {
"优化器": "AdamW (lr=1e-3, weight_decay=0.01)",
"梯度裁剪": "max_norm=5.0(必须开启)",
"学习率调度": "ReduceLROnPlateau (patience=3, factor=0.5)",
"Dropout": "0.3-0.5(LSTM 层间 + 分类器前)",
"Embedding": "维度 128-300,预训练词向量优先",
"Batch Size": "32-128(RNN 训练比 Transformer 更吃显存,注意序列长度)",
"最大序列长度": "根据任务裁剪(如 512),超长序列截断",
"Epochs": "10-50(取决于数据量)",
"早停": "patience=5-10,监控验证集指标",
}
print("RNN 训练推荐配置:")
for key, value in configs.items():
print(f" {key}: {value}")
get_rnn_training_config()优点
缺点
总结
RNN 和 LSTM 是序列建模的经典方案,理解门控机制和梯度流动对深入学习 Transformer 有直接帮助。在新项目中优先考虑 Transformer,但在资源受限、在线推理或短序列场景中,LSTM/GRU 仍然是实用的选择。
RNN 的核心价值不仅在于其本身的应用,更在于它为理解序列建模提供了基础框架。从 RNN 的梯度问题出发,可以更好地理解为什么 Transformer 的自注意力机制能在长距离依赖上取得更好的效果——自注意力提供了 O(1) 步的路径连接任意两个位置,而 RNN 需要 O(n) 步。
关键知识点
- LSTM 通过遗忘门、输入门、输出门三个门控机制来控制记忆的读写和遗忘
- 双向 LSTM 同时从前向后和从后向前处理序列,能捕获前后文信息
- GRU 是 LSTM 的简化版本,合并了遗忘门和输入门,参数更少但效果接近
- 梯度裁剪(gradient clipping)是训练 RNN 时的标配技巧,防止梯度爆炸
- pack_padded_sequence 是处理变长序列的标准工具,避免 padding 浪费计算
- LSTM 的细胞状态通过加法更新(而非乘法),为梯度提供了通畅的传播路径
项目落地视角
- 如果序列长度短(<200)且推理延迟要求高,LSTM/GRU 比 Transformer 更合适
- 用 pack_padded_sequence 处理变长序列,避免 padding token 浪费计算
- 实际项目中建议从 nn.LSTM 而非 nn.RNN 开始,训练稳定性差异显著
RNN vs Transformer 选型决策
def model_selection_for_sequence():
"""RNN vs Transformer 选型决策树"""
print("=== 序列模型选型决策 ===\n")
print("场景1: 实时流式语音识别")
print(" -> 推荐: 单向 LSTM/GRU(逐 token 处理,低延迟)\n")
print("场景2: 大规模文本分类")
print(" -> 推荐: Transformer / 预训练模型(BERT, RoBERTa)\n")
print("场景3: 边缘设备上的时序预测")
print(" -> 推荐: GRU(参数少,推理快)\n")
print("场景4: 机器翻译")
print(" -> 推荐: Transformer(双向注意力,并行训练快)\n")
print("场景5: 键盘输入预测")
print(" -> 推荐: LSTM(在线推理,低延迟)\n")
print("场景6: 超长文档理解")
print(" -> 推荐: Transformer + 稀疏注意力(长距离依赖)\n")
# 性能对比参考
print("=== 典型性能对比 ===")
comparisons = [
("推理延迟 (单步)", "LSTM: ~0.1ms", "Transformer: ~1-10ms (序列级)"),
("训练并行度", "低(串行)", "高(完全并行)"),
("长距离依赖", "弱(>200步衰减)", "强(O(1) 路径)"),
("参数量 (同等能力)", "少", "多"),
("显存占用", "较低", "较高(注意力矩阵)"),
]
for metric, rnn_perf, trans_perf in comparisons:
print(f" {metric}: {rnn_perf} vs {trans_perf}")
model_selection_for_sequence()常见误区
- 在长文本任务上强行用 LSTM——Transformer 的自注意力在长距离依赖上明显更优
- 不做梯度裁剪就训练 RNN,导致训练不稳定或 loss 突然飙升
- 忽略 padding 对隐状态的影响,用最后一个位置的隐状态做分类而不区分是否是 padding
- 双向 LSTM 用于在线推理——双向需要完整序列,无法用于流式场景
- 过度堆叠 LSTM 层——3 层以上通常收益很小,反而增加过拟合风险
- 忘记设置 padding_idx——会导致 padding token 的 embedding 也被更新
进阶路线
- 深入对比 LSTM 和 GRU 的门控差异和适用场景
- 学习注意力机制如何与 RNN 结合(Bahdanau Attention、Luong Attention)
- 理解 Transformer 如何通过自注意力并行化解决 RNN 的序列瓶颈
- 探索 SRU(Simple Recurrent Unit)、Mamba 等现代序列模型的创新方向
- 学习 RWKV 等结合 RNN 线性复杂度和 Transformer 性能的新型架构
适用场景
- 流式语音识别、实时翻译等需要逐 token 在线处理的任务
- 时间序列预测(股票、天气、传感器数据)
- 资源受限的边缘设备上的文本或序列处理
- 短文本的情感分析、意图识别等轻量级 NLP 任务
- 音乐生成、手写体识别等序列生成任务
落地建议
- 先在小数据集上验证 LSTM 基线效果,再决定是否升级到 Transformer
- 对时序任务重视数据归一化和窗口设计,它们对效果的影响可能比模型结构更大
- 训练时始终开启梯度裁剪(max_norm=1.0~5.0),避免训练崩溃
- 使用预训练词向量(Word2Vec、GloVe)可以显著提升小数据集上的效果
- 监控训练过程中的梯度范数,及时发现梯度爆炸/消失的苗头
排错清单
- loss 震荡或飙升:检查是否做了梯度裁剪,学习率是否过大
- 训练速度极慢:确认是否忘记使用 pack_padded_sequence,或 batch_size 过小
- 短序列效果好但长序列差:这是 RNN 的固有局限,考虑换成 Transformer 或分层结构
- 验证集 loss 不下降:检查数据归一化、学习率、模型容量是否匹配任务难度
- 推理结果不稳定:确认设置了 model.eval(),且推理时的预处理与训练一致
- 内存泄漏:检查是否正确使用了 detach() 隔离计算图
复盘问题
- 你的序列数据平均长度是多少?LSTM 是否能有效覆盖这个长度?
- 是否对比了单向和双向 LSTM 的效果差异?
- 在线推理场景中,逐 token 输出的延迟是否满足业务要求?
- 是否尝试过注意力池化替代简单的最后隐状态?
- 梯度裁剪的阈值是否经过调优?能否通过梯度范数分布来验证?
