Saga 模式
大约 10 分钟约 3123 字
Saga 模式
简介
Saga 是一种管理分布式事务的模式。它将长事务拆分为一系列本地事务,每个本地事务完成后发布事件触发下一个步骤。如果某步骤失败,Saga 执行补偿事务来撤销已完成的步骤。
Saga 模式起源于 1987 年 Hector Garcia-Molina 和 Kenneth Salem 的论文"Sagas"。它的核心思想是:在微服务架构中,传统的分布式事务(如两阶段提交 2PC)由于性能和可用性问题难以适用,因此将长事务拆分为多个独立的本地事务,每个步骤都有对应的补偿操作,失败时逐步回滚。
Saga 模式有两种实现方式:编排式(Choreography)和协调式(Orchestration)。编排式中各服务通过事件总线直接通信,协调式中由一个中心协调器管理所有步骤的执行顺序。
特点
结构分析
编排式 vs 协调式
编排式 (Choreography) 协调式 (Orchestration)
+------+ +------+ +------+ +------+ +------+ +------+
| Order| -> | Pay | -> | Stock| | Saga | -> | Pay | -> | Stock|
+------+ +------+ +------+ | Coord| +------+ +------+
| | | | inat | | |
+--<---------+<-----------+ | or | +--<---------+
+------+ |
| 失败 | v
| 补偿 | [Shipping]
+------+Saga 生命周期
成功路径:
[创建订单] -> [扣款] -> [锁定库存] -> [安排发货] -> 完成
失败路径 (发货失败):
[创建订单] -> [扣款] -> [锁定库存] -> [发货失败]
|
v
[释放库存] -> [退款] -> 订单取消实现
编排式 Saga(Choreography)
// 事件定义
public record OrderCreatedEvent(Guid OrderId, Guid CustomerId, decimal Amount);
public record PaymentCompletedEvent(Guid OrderId, string TransactionId);
public record StockReservedEvent(Guid OrderId, List<string> Items);
public record ShippingArrangedEvent(Guid OrderId, string TrackingNumber);
public record PaymentFailedEvent(Guid OrderId, string Reason);
public record OrderCancelledEvent(Guid OrderId, string Reason);
// 事件总线
public class EventBus
{
private readonly Dictionary<Type, List<Func<object, Task>>> _handlers = new();
public void Subscribe<T>(Func<T, Task> handler)
{
if (!_handlers.TryGetValue(typeof(T), out var list)) _handlers[typeof(T)] = list = new();
list.Add(msg => handler((T)msg));
}
public async Task PublishAsync<T>(T message)
{
if (_handlers.TryGetValue(typeof(T), out var handlers))
foreach (var handler in handlers) await handler(message!);
}
}
// 各服务独立监听事件
public class PaymentService
{
public PaymentService(EventBus bus)
{
bus.Subscribe<OrderCreatedEvent>(async e =>
{
var success = await ProcessPayment(e.OrderId, e.Amount);
if (success) await bus.PublishAsync(new PaymentCompletedEvent(e.OrderId, Guid.NewGuid().ToString("N")[..8]));
else await bus.PublishAsync(new PaymentFailedEvent(e.OrderId, "余额不足"));
});
// 补偿: 收到取消事件则退款
bus.Subscribe<OrderCancelledEvent>(async e =>
{
Console.WriteLine($"[支付] 退款: {e.OrderId}");
await Task.CompletedTask;
});
}
private async Task<bool> ProcessPayment(Guid orderId, decimal amount)
{
Console.WriteLine($"[支付] 扣款: 订单 {orderId}, Y{amount}");
await Task.Delay(100);
return true;
}
}
public class InventoryService
{
public InventoryService(EventBus bus)
{
bus.Subscribe<PaymentCompletedEvent>(async e =>
{
Console.WriteLine($"[库存] 锁定库存: {e.OrderId}");
await bus.PublishAsync(new StockReservedEvent(e.OrderId, new() { "商品A" }));
});
bus.Subscribe<OrderCancelledEvent>(async e =>
{
Console.WriteLine($"[库存] 释放库存: {e.OrderId}");
await Task.CompletedTask;
});
}
}
public class ShippingService
{
public ShippingService(EventBus bus)
{
bus.Subscribe<StockReservedEvent>(async e =>
{
Console.WriteLine($"[物流] 安排发货: {e.OrderId}");
await bus.PublishAsync(new ShippingArrangedEvent(e.OrderId, "SF123456"));
});
}
}协调式 Saga(Orchestration)
// Saga 步骤定义
public interface ISagaStep<TContext>
{
string Name { get; }
Task ExecuteAsync(TContext context);
Task CompensateAsync(TContext context);
}
// Saga 上下文
public class OrderSagaContext
{
public Guid OrderId { get; init; }
public decimal Amount { get; init; }
public string? PaymentId { get; set; }
public bool StockReserved { get; set; }
public string? TrackingNumber { get; set; }
public string? FailureReason { get; set; }
}
// Saga 协调器
public class SagaOrchestrator<TContext> where TContext : class
{
private readonly List<(ISagaStep<TContext> Step, bool Completed)> _steps = new();
public SagaOrchestrator<TContext> AddStep(ISagaStep<TContext> step)
{
_steps.Add((step, false));
return this;
}
public async Task ExecuteAsync(TContext context)
{
for (int i = 0; i < _steps.Count; i++)
{
try
{
await _steps[i].Step.ExecuteAsync(context);
_steps[i] = (_steps[i].Step, true);
Console.WriteLine($"* {_steps[i].Step.Name}");
}
catch (Exception ex)
{
Console.WriteLine($"X {_steps[i].Step.Name}: {ex.Message}");
// 补偿: 逆向执行已完成的步骤
for (int j = i - 1; j >= 0; j--)
{
if (_steps[j].Completed)
{
Console.WriteLine($"<- 补偿: {_steps[j].Step.Name}");
await _steps[j].Step.CompensateAsync(context);
}
}
throw;
}
}
}
}
// 具体步骤
public class ProcessPaymentStep : ISagaStep<OrderSagaContext>
{
public string Name => "处理支付";
public async Task ExecuteAsync(OrderSagaContext ctx)
{
ctx.PaymentId = Guid.NewGuid().ToString("N")[..8];
Console.WriteLine($" 扣款 Y{ctx.Amount}, 交易号: {ctx.PaymentId}");
await Task.CompletedTask;
}
public async Task CompensateAsync(OrderSagaContext ctx)
{
Console.WriteLine($" 退款: {ctx.PaymentId}");
await Task.CompletedTask;
}
}
public class ReserveStockStep : ISagaStep<OrderSagaContext>
{
public string Name => "锁定库存";
public async Task ExecuteAsync(OrderSagaContext ctx)
{
ctx.StockReserved = true;
Console.WriteLine(" 库存已锁定");
await Task.CompletedTask;
}
public async Task CompensateAsync(OrderSagaContext ctx)
{
ctx.StockReserved = false;
Console.WriteLine(" 库存已释放");
await Task.CompletedTask;
}
}
public class ArrangeShippingStep : ISagaStep<OrderSagaContext>
{
public string Name => "安排发货";
public async Task ExecuteAsync(OrderSagaContext ctx)
{
ctx.TrackingNumber = "SF" + Random.Shared.Next(100000, 999999);
Console.WriteLine($" 物流单号: {ctx.TrackingNumber}");
await Task.CompletedTask;
}
public async Task CompensateAsync(OrderSagaContext ctx)
{
Console.WriteLine($" 取消发货: {ctx.TrackingNumber}");
await Task.CompletedTask;
}
}
// 使用
var saga = new SagaOrchestrator<OrderSagaContext>()
.AddStep(new ProcessPaymentStep())
.AddStep(new ReserveStockStep())
.AddStep(new ArrangeShippingStep());
await saga.ExecuteAsync(new OrderSagaContext { OrderId = Guid.NewGuid(), Amount = 999.99m });实战:带幂等性的 Saga 步骤
public interface ISagaStepWithIdempotency<TContext> : ISagaStep<TContext>
{
Task<bool> IsAlreadyExecutedAsync(TContext context);
Task MarkAsExecutedAsync(TContext context);
Task MarkAsCompensatedAsync(TContext context);
}
public abstract class IdempotentSagaStep<TContext> : ISagaStepWithIdempotency<TContext>
{
public abstract string StepName { get; }
public async Task ExecuteAsync(TContext context)
{
if (await IsAlreadyExecutedAsync(context)) return;
await DoExecuteAsync(context);
await MarkAsExecutedAsync(context);
}
public async Task CompensateAsync(TContext context)
{
if (!await IsAlreadyExecutedAsync(context)) return;
await DoCompensateAsync(context);
await MarkAsCompensatedAsync(context);
}
public string Name => StepName;
protected abstract Task DoExecuteAsync(TContext context);
protected abstract Task DoCompensateAsync(TContext context);
public abstract Task<bool> IsAlreadyExecutedAsync(TContext context);
public abstract Task MarkAsExecutedAsync(TContext context);
public abstract Task MarkAsCompensatedAsync(TContext context);
}Saga 持久化与故障恢复
Saga 状态持久化
// Saga 状态存储 — 用于故障恢复
public class SagaState
{
public Guid SagaId { get; set; }
public string SagaType { get; set; }
public string CurrentStep { get; set; }
public int CompletedStepCount { get; set; }
public string Status { get; set; } // Pending, Running, Completed, Failed, Compensating
public string ContextJson { get; set; }
public DateTime StartedAt { get; set; }
public DateTime? CompletedAt { get; set; }
public string? FailureReason { get; set; }
}
public interface ISagaStateRepository
{
Task<SagaState?> GetAsync(Guid sagaId);
Task SaveAsync(SagaState state);
Task<List<SagaState>> GetPendingSagasAsync(TimeSpan timeout);
}
// 带持久化的 Saga 协调器
public class PersistentSagaOrchestrator<TContext> where TContext : class
{
private readonly ISagaStateRepository _stateRepo;
private readonly List<ISagaStep<TContext>> _steps;
private readonly ILogger _logger;
public async Task ExecuteAsync(Guid sagaId, TContext context)
{
var state = await _stateRepo.GetAsync(sagaId);
// 如果是新的 Saga
if (state == null)
{
state = new SagaState
{
SagaId = sagaId,
SagaType = typeof(TContext).Name,
Status = "Running",
StartedAt = DateTime.UtcNow,
ContextJson = JsonSerializer.Serialize(context)
};
await _stateRepo.SaveAsync(state);
}
// 从上次中断的步骤继续执行
var startStep = state.CompletedStepCount;
for (int i = startStep; i < _steps.Count; i++)
{
try
{
state.CurrentStep = _steps[i].Name;
await _stateRepo.SaveAsync(state);
await _steps[i].ExecuteAsync(context);
state.CompletedStepCount = i + 1;
await _stateRepo.SaveAsync(state);
}
catch (Exception ex)
{
state.Status = "Failed";
state.FailureReason = ex.Message;
await _stateRepo.SaveAsync(state);
_logger.LogError(ex, "Saga {SagaId} 在步骤 {Step} 失败", sagaId, _steps[i].Name);
// 执行补偿
state.Status = "Compensating";
await _stateRepo.SaveAsync(state);
for (int j = i - 1; j >= 0; j--)
{
try
{
await _steps[j].CompensateAsync(context);
_logger.LogInformation("补偿步骤 {Step} 完成", _steps[j].Name);
}
catch (Exception compEx)
{
_logger.LogError(compEx, "补偿步骤 {Step} 失败", _steps[j].Name);
// 记录失败但继续补偿其他步骤
}
}
state.Status = "Compensated";
state.CompletedAt = DateTime.UtcNow;
await _stateRepo.SaveAsync(state);
throw;
}
}
state.Status = "Completed";
state.CompletedAt = DateTime.UtcNow;
await _stateRepo.SaveAsync(state);
}
}Outbox 模式 — 保证事件可靠投递
// Outbox 模式确保 Saga 事件不丢失
public class OutboxMessage
{
public Guid Id { get; set; }
public string EventType { get; set; }
public string Payload { get; set; }
public string Status { get; set; } // Pending, Published, Failed
public DateTime CreatedAt { get; set; }
public int RetryCount { get; set; }
public DateTime? NextRetryAt { get; set; }
}
// Outbox 表 — 与业务数据在同一事务中写入
public class OutboxRepository
{
private readonly AppDbContext _dbContext;
public OutboxRepository(AppDbContext dbContext) => _dbContext = dbContext;
// 在业务事务中同时写入 Outbox
public async Task SaveMessageAsync(string eventType, object payload)
{
_dbContext.OutboxMessages.Add(new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = eventType,
Payload = JsonSerializer.Serialize(payload),
Status = "Pending",
CreatedAt = DateTime.UtcNow
});
}
// 获取待发送消息
public async Task<List<OutboxMessage>> GetPendingMessagesAsync(int batchSize = 100)
{
return await _dbContext.OutboxMessages
.Where(m => m.Status == "Pending" || m.Status == "Failed")
.Where(m => m.NextRetryAt == null || m.NextRetryAt <= DateTime.UtcNow)
.OrderBy(m => m.CreatedAt)
.Take(batchSize)
.ToListAsync();
}
// 标记消息已发送
public async Task MarkPublishedAsync(Guid messageId)
{
var msg = await _dbContext.OutboxMessages.FindAsync(messageId);
if (msg != null)
{
msg.Status = "Published";
await _dbContext.SaveChangesAsync();
}
}
}
// 后台任务 — 定期轮询 Outbox 发送消息
public class OutboxProcessor : BackgroundService
{
private readonly OutboxRepository _outbox;
private readonly EventBus _eventBus;
private readonly ILogger<OutboxProcessor> _logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var messages = await _outbox.GetPendingMessagesAsync();
foreach (var msg in messages)
{
try
{
var eventType = Type.GetType(msg.EventType);
var payload = JsonSerializer.Deserialize(msg.Payload, eventType!);
await _eventBus.PublishAsync(payload);
await _outbox.MarkPublishedAsync(msg.Id);
}
catch (Exception ex)
{
msg.RetryCount++;
msg.Status = "Failed";
msg.NextRetryAt = DateTime.UtcNow.AddSeconds(Math.Pow(2, msg.RetryCount));
_logger.LogError(ex, "Outbox 消息发送失败: {MessageId}", msg.Id);
}
}
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
}Saga 超时与自动补偿
public class SagaTimeoutManager
{
private readonly ISagaStateRepository _stateRepo;
private readonly SagaOrchestratorFactory _factory;
private readonly ILogger _logger;
// 定期检查超时的 Saga
public async Task CheckTimeoutsAsync(TimeSpan timeout)
{
var cutoff = DateTime.UtcNow - timeout;
var pendingSagas = await _stateRepo.GetPendingSagasAsync(timeout);
foreach (var saga in pendingSagas)
{
if (saga.Status == "Running" && saga.StartedAt < cutoff)
{
_logger.LogWarning(
"Saga {SagaId} 超时 (已运行 {Minutes} 分钟),触发自动补偿",
saga.SagaId,
(DateTime.UtcNow - saga.StartedAt).TotalMinutes);
// 标记为超时
saga.Status = "TimedOut";
saga.FailureReason = $"Saga 执行超时 ({timeout.TotalMinutes} 分钟)";
await _stateRepo.SaveAsync(saga);
// 触发补偿(由恢复机制执行)
}
}
}
}编排式 vs 协调式对比
| 特性 | 编排式 (Choreography) | 协调式 (Orchestration) |
|---|---|---|
| 耦合度 | 低(服务间直接通信) | 中(协调器管理所有步骤) |
| 复杂度 | 分散在各服务中 | 集中在协调器中 |
| 可观测性 | 难以追踪全局流程 | 协调器提供全局视图 |
| 适用场景 | 步骤少、流程简单 | 步骤多、流程复杂 |
| 扩展性 | 新增服务需修改已有服务 | 新增步骤只改协调器 |
| 测试 | 集成测试困难 | 可单元测试协调器 |
最佳实践
- 每个步骤必须幂等:Saga 中的消息可能被重复投递,每个步骤和补偿操作必须是幂等的。
- 补偿操作也必须幂等:补偿可能被多次调用,重复补偿不应产生副作用。
- 使用 Outbox 模式:确保事件可靠投递,避免消息丢失导致 Saga 卡住。
- 设置超时机制:为每个步骤和整个 Saga 设置超时,避免无限等待。
- 记录 Saga 状态:持久化每个步骤的执行状态,便于故障恢复和调试。
优点
缺点
总结
Saga 模式将分布式事务拆分为多个本地事务,失败时执行补偿操作。编排式通过事件总线解耦各服务,协调式通过 Saga 协调器集中管理步骤顺序。每个步骤需要定义正向执行和逆向补偿操作。建议在微服务间需要事务一致性但无法使用分布式锁的场景使用 Saga 模式,配合 Outbox 模式保证消息可靠投递。
Saga 模式的本质价值在于:在微服务架构中,当你需要跨多个服务保证数据一致性,但传统的分布式事务(2PC)不可行时,Saga 提供了一种通过"补偿"实现最终一致性的方案。每个本地事务独立提交,失败时通过逆向补偿回滚,虽然过程复杂但保证了系统的可用性和性能。
关键知识点
- 模式不是目标,降低耦合和控制变化才是目标。
- 先找变化点、稳定点和协作边界,再决定是否引入模式。
- 同一个模式在不同规模下的收益和代价差异很大。
项目落地视角
- 优先画出参与对象、依赖方向和调用链,再落到代码。
- 把模式放到一个真实场景里,比如支付、规则引擎、工作流或插件扩展。
- 配合单元测试或契约测试,保证重构后的行为没有漂移。
常见误区
- 为了看起来"高级"而套模式。
- 把简单问题拆成过多抽象层,导致阅读和排障都变难。
- 只会背 UML,不会解释为什么这里需要这个模式。
进阶路线
- 继续关注模式之间的组合用法,而不是孤立记忆。
- 从业务建模、演进策略和团队协作角度看模式的适用性。
- 把模式结论沉淀为项目模板、基类或约束文档。
适用场景
- 当你准备把《Saga 模式》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合在业务规则频繁变化、分支增多或对象协作复杂时引入。
- 当你希望提高扩展性,但又不想把系统拆得过度抽象时,这类主题很有参考价值。
落地建议
- 先识别变化点,再决定是否引入模式,而不是反过来套模板。
- 优先为模式的边界、依赖和调用路径画出简单结构图。
- 把模式落到一个明确场景,例如支付、规则计算、插件扩展或工作流。
排错清单
- 检查抽象层是否过多,导致调用路径和责任不清晰。
- 确认引入模式后是否真的减少了条件分支和重复代码。
- 警惕"为了模式而模式",尤其是在简单业务里。
复盘问题
- 如果把《Saga 模式》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《Saga 模式》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《Saga 模式》最大的收益和代价分别是什么?
