分布式事务与一致性
大约 11 分钟约 3234 字
分布式事务与一致性
简介
分布式系统中,跨服务的数据一致性是核心挑战。理解 CAP 定理、Saga 模式的编排与协调实现、以及补偿事务的设计,有助于在一致性和可用性之间做出正确的权衡。
特点
CAP 定理与 BASE
理论基础
// CAP 定理:
// C (Consistency) — 一致性:所有节点看到相同数据
// A (Availability) — 可用性:每个请求都得到响应
// P (Partition) — 分区容错:网络分区时系统继续运行
//
// 网络分区不可避免,所以只能在 C 和 A 之间选择:
// CP 系统 — 强一致性(如 ZooKeeper、etcd)
// AP 系统 — 高可用性(如 Cassandra、DynamoDB)
// BASE 理论(AP 系统的妥协):
// BA (Basically Available) — 基本可用
// S (Soft State) — 软状态(允许中间状态)
// E (Eventual Consistency) — 最终一致性
// 微服务中的数据一致性策略:
// 1. 强一致性 — 单服务内事务(EF Core SaveChanges)
// 2. 最终一致性 — 跨服务事件驱动(Saga / Event Sourcing)
// 3. 弱一致性 — 允许短暂不一致(缓存 + TTL)
// 单服务事务(ACID)
public class OrderService
{
private readonly AppDbContext _db;
public async Task<Guid> CreateOrderAsync(CreateOrderCommand command, CancellationToken ct)
{
await using var transaction = await _db.Database.BeginTransactionAsync(ct);
try
{
var order = new Order
{
Id = Guid.NewGuid(),
UserId = command.UserId,
Status = OrderStatus.Created,
Items = command.Items.Select(i => new OrderItem
{
ProductId = i.ProductId,
ProductName = i.ProductName,
Price = i.Price,
Quantity = i.Quantity
}).ToList()
};
order.TotalAmount = order.Items.Sum(i => i.Price * i.Quantity);
_db.Orders.Add(order);
// 扣减库存(同一数据库)
foreach (var item in command.Items)
{
var inventory = await _db.Inventory
.FirstAsync(i => i.ProductId == item.ProductId, ct);
inventory.Quantity -= item.Quantity;
}
await _db.SaveChangesAsync(ct);
await transaction.CommitAsync(ct);
return order.Id;
}
catch
{
await transaction.RollbackAsync(ct);
throw;
}
}
}Saga 编排模式
集中式 Saga 协调器
// Saga 编排器(Orchestrator)— 集中控制分布式事务流程
// 优点:流程清晰,易于监控和调试
// 缺点:协调器可能成为单点
// Saga 步骤定义
public interface ISagaStep<TData>
{
string Name { get; }
Task ExecuteAsync(TData data, CancellationToken ct);
Task CompensateAsync(TData data, CancellationToken ct);
}
// Saga 上下文
public class SagaContext<TData>
{
public Guid SagaId { get; init; } = Guid.NewGuid();
public TData Data { get; init; } = default!;
public int CurrentStep { get; set; }
public SagaStatus Status { get; set; }
public List<SagaStepRecord> CompletedSteps { get; } = new();
}
public record SagaStepRecord(string StepName, bool Success, string? Error = null);
public enum SagaStatus { Pending, Executing, Compensating, Completed, Failed }
// Saga 编排器
public class SagaOrchestrator<TData>
{
private readonly List<ISagaStep<TData>> _steps = new();
private readonly ILogger _logger;
public SagaOrchestrator(ILogger logger)
{
_logger = logger;
}
public SagaOrchestrator<TData> AddStep(ISagaStep<TData> step)
{
_steps.Add(step);
return this;
}
public async Task<SagaContext<TData>> ExecuteAsync(TData data, CancellationToken ct)
{
var context = new SagaContext<TData> { Data = data, Status = SagaStatus.Executing };
// 正向执行
for (int i = 0; i < _steps.Count; i++)
{
var step = _steps[i];
context.CurrentStep = i;
try
{
_logger.LogInformation("Saga {SagaId}: 执行步骤 {Step}", context.SagaId, step.Name);
await step.ExecuteAsync(data, ct);
context.CompletedSteps.Add(new SagaStepRecord(step.Name, true));
_logger.LogInformation("Saga {SagaId}: 步骤 {Step} 完成", context.SagaId, step.Name);
}
catch (Exception ex)
{
_logger.LogError(ex, "Saga {SagaId}: 步骤 {Step} 失败", context.SagaId, step.Name);
context.CompletedSteps.Add(new SagaStepRecord(step.Name, false, ex.Message));
context.Status = SagaStatus.Compensating;
// 补偿已完成的步骤(反向)
await CompensateAsync(context, ct);
context.Status = SagaStatus.Failed;
return context;
}
}
context.Status = SagaStatus.Completed;
return context;
}
private async Task CompensateAsync(SagaContext<TData> context, CancellationToken ct)
{
// 反向补偿已完成的步骤
var completedSteps = context.CompletedSteps
.Where(s => s.Success)
.Select(s => s.StepName)
.ToList();
foreach (var stepName in completedSteps.AsEnumerable().Reverse())
{
var step = _steps.First(s => s.Name == stepName);
try
{
_logger.LogInformation("Saga {SagaId}: 补偿步骤 {Step}", context.SagaId, step.Name);
await step.CompensateAsync(context.Data, ct);
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Saga {SagaId}: 补偿步骤 {Step} 失败!", context.SagaId, step.Name);
// 补偿失败需要人工介入
}
}
}
}
// 创建订单 Saga 实现
public record CreateOrderSagaData(
Guid OrderId,
Guid UserId,
List<OrderItemData> Items,
decimal TotalAmount,
string? PaymentTransactionId = null);
// Step 1: 创建订单
public class CreateOrderStep : ISagaStep<CreateOrderSagaData>
{
public string Name => "CreateOrder";
private readonly IOrderService _orderService;
public CreateOrderStep(IOrderService orderService) => _orderService = orderService;
public async Task ExecuteAsync(CreateOrderSagaData data, CancellationToken ct)
{
await _orderService.CreateAsync(new OrderDto
{
Id = data.OrderId,
UserId = data.UserId,
Items = data.Items,
Status = OrderStatus.Pending
}, ct);
}
public async Task CompensateAsync(CreateOrderSagaData data, CancellationToken ct)
{
await _orderService.CancelAsync(data.OrderId, "Saga 补偿取消", ct);
}
}
// Step 2: 预留库存
public class ReserveInventoryStep : ISagaStep<CreateOrderSagaData>
{
public string Name => "ReserveInventory";
private readonly IInventoryService _inventoryService;
public ReserveInventoryStep(IInventoryService inventoryService) => _inventoryService = inventoryService;
public async Task ExecuteAsync(CreateOrderSagaData data, CancellationToken ct)
{
await _inventoryService.ReserveAsync(data.OrderId, data.Items, ct);
}
public async Task CompensateAsync(CreateOrderSagaData data, CancellationToken ct)
{
await _inventoryService.ReleaseAsync(data.OrderId, data.Items, ct);
}
}
// Step 3: 支付扣款
public class ProcessPaymentStep : ISagaStep<CreateOrderSagaData>
{
public string Name => "ProcessPayment";
private readonly IPaymentService _paymentService;
public ProcessPaymentStep(IPaymentService paymentService) => _paymentService = paymentService;
public async Task ExecuteAsync(CreateOrderSagaData data, CancellationToken ct)
{
var result = await _paymentService.ChargeAsync(data.UserId, data.TotalAmount, data.OrderId, ct);
data.PaymentTransactionId = result.TransactionId;
}
public async Task CompensateAsync(CreateOrderSagaData data, CancellationToken ct)
{
if (data.PaymentTransactionId != null)
{
await _paymentService.RefundAsync(data.PaymentTransactionId, ct);
}
}
}
// 使用 Saga
public class OrderSagaService
{
private readonly IServiceProvider _sp;
private readonly ILogger<SagaOrchestrator<CreateOrderSagaData>> _logger;
public async Task<SagaContext<CreateOrderSagaData>> CreateOrderAsync(
Guid userId, List<OrderItemData> items, CancellationToken ct)
{
var orderId = Guid.NewGuid();
var totalAmount = items.Sum(i => i.Price * i.Quantity);
var data = new CreateOrderSagaData(orderId, userId, items, totalAmount);
var saga = new SagaOrchestrator<CreateOrderSagaData>(_logger)
.AddStep(new CreateOrderStep(_sp.GetRequiredService<IOrderService>()))
.AddStep(new ReserveInventoryStep(_sp.GetRequiredService<IInventoryService>()))
.AddStep(new ProcessPaymentStep(_sp.GetRequiredService<IPaymentService>()));
return await saga.ExecuteAsync(data, ct);
}
}TCC 模式
Try-Confirm-Cancel
// TCC(Try-Confirm-Cancel)模式
// Try — 预留资源(冻结库存、冻结金额)
// Confirm — 确认操作(扣减库存、扣减金额)
// Cancel — 取消操作(释放库存、释放金额)
// TCC 参与者接口
public interface ITccParticipant<TData>
{
string Name { get; }
Task TryAsync(TData data, CancellationToken ct);
Task ConfirmAsync(TData data, CancellationToken ct);
Task CancelAsync(TData data, CancellationToken ct);
}
// 库存 TCC 实现
public class InventoryTccParticipant : ITccParticipant<CreateOrderSagaData>
{
private readonly InventoryDbContext _db;
public string Name => "Inventory";
public InventoryTccParticipant(InventoryDbContext db) => _db = db;
public async Task TryAsync(CreateOrderSagaData data, CancellationToken ct)
{
// 冻结库存
foreach (var item in data.Items)
{
var inventory = await _db.Inventory.FindAsync(new object[] { item.ProductId }, ct);
if (inventory == null || inventory.Available < item.Quantity)
throw new BusinessException($"库存不足: {item.ProductId}");
inventory.Frozen += item.Quantity; // 冻结
inventory.Available -= item.Quantity;
}
await _db.SaveChangesAsync(ct);
}
public async Task ConfirmAsync(CreateOrderSagaData data, CancellationToken ct)
{
// 确认扣减
foreach (var item in data.Items)
{
var inventory = await _db.Inventory.FindAsync(new object[] { item.ProductId }, ct);
inventory!.Frozen -= item.Quantity; // 解冻并扣减
}
await _db.SaveChangesAsync(ct);
}
public async Task CancelAsync(CreateOrderSagaData data, CancellationToken ct)
{
// 释放冻结的库存
foreach (var item in data.Items)
{
var inventory = await _db.Inventory.FindAsync(new object[] { item.ProductId }, ct);
if (inventory != null)
{
inventory.Available += item.Quantity;
inventory.Frozen -= item.Quantity;
}
}
await _db.SaveChangesAsync(ct);
}
}
// TCC 事务管理器
public class TccTransactionManager<TData>
{
private readonly List<ITccParticipant<TData>> _participants = new();
private readonly ILogger _logger;
public TccTransactionManager(ILogger logger) => _logger = logger;
public void AddParticipant(ITccParticipant<TData> participant) => _participants.Add(participant);
public async Task ExecuteAsync(TData data, CancellationToken ct)
{
var triedParticipants = new List<ITccParticipant<TData>>();
// Phase 1: Try
foreach (var participant in _participants)
{
try
{
await participant.TryAsync(data, ct);
triedParticipants.Add(participant);
}
catch (Exception ex)
{
_logger.LogError(ex, "TCC Try 失败: {Participant}", participant.Name);
// Cancel all tried participants
foreach (var tried in triedParticipants.AsEnumerable().Reverse())
{
try { await tried.CancelAsync(data, ct); }
catch (Exception cancelEx)
{
_logger.LogCritical(cancelEx, "TCC Cancel 失败: {Participant}", tried.Name);
}
}
throw;
}
}
// Phase 2: Confirm
foreach (var participant in _participants)
{
try
{
await participant.ConfirmAsync(data, ct);
}
catch (Exception ex)
{
// Confirm 失败需要重试
_logger.LogCritical(ex, "TCC Confirm 失败: {Participant},需要人工介入", participant.Name);
throw;
}
}
}
}优点
缺点
Outbox 模式与事件驱动
事务性 Outbox
/// <summary>
/// Outbox 模式 — 解决"数据库提交与消息发送"的原子性问题
/// 核心思想:将消息先写入数据库(与业务数据同一事务),
/// 再由后台线程将消息投递到消息队列。
/// </summary>
// Outbox 消息表
public class OutboxMessage
{
public Guid Id { get; set; }
public string Type { get; set; } = ""; // 消息类型
public string Payload { get; set; } = ""; // 消息内容(JSON)
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime? SentAt { get; set; } // 发送时间
int RetryCount { get; set; } // 重试次数
public string Status { get; set; } = "Pending"; // Pending/Sent/Failed
}
// 使用 Outbox 的订单服务
public class OrderServiceWithOutbox
{
private readonly AppDbContext _db;
public OrderServiceWithOutbox(AppDbContext db)
{
_db = db;
}
public async Task<Guid> CreateOrderAsync(CreateOrderCommand command, CancellationToken ct)
{
await using var transaction = await _db.Database.BeginTransactionAsync(ct);
try
{
// 1. 业务操作 — 创建订单
var order = new Order
{
Id = Guid.NewGuid(),
UserId = command.UserId,
TotalAmount = command.Items.Sum(i => i.Price * i.Quantity),
Status = OrderStatus.Created,
};
_db.Orders.Add(order);
// 2. 同一事务写入 Outbox 消息(而非直接发送到 MQ)
var message = new OutboxMessage
{
Id = Guid.NewGuid(),
Type = "OrderCreated",
Payload = JsonSerializer.Serialize(new
{
order.Id,
order.UserId,
order.TotalAmount,
Items = command.Items,
}),
};
_db.OutboxMessages.Add(message);
// 3. 一次事务提交 — 保证订单和消息同时成功或同时失败
await _db.SaveChangesAsync(ct);
await transaction.CommitAsync(ct);
return order.Id;
}
catch
{
await transaction.RollbackAsync(ct);
throw;
}
}
}
// Outbox 后台处理器 — 定期扫描未发送的消息
public class OutboxProcessor : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<OutboxProcessor> _logger;
private readonly IMessageBus _messageBus;
public OutboxProcessor(
IServiceProvider serviceProvider,
ILogger<OutboxProcessor> logger,
IMessageBus messageBus)
{
_serviceProvider = serviceProvider;
_logger = logger;
_messageBus = messageBus;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
using var scope = _serviceProvider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
// 查询待发送的消息
var pending = await db.OutboxMessages
.Where(m => m.Status == "Pending" && m.RetryCount < 3)
.OrderBy(m => m.CreatedAt)
.Take(50)
.ToListAsync(stoppingToken);
foreach (var message in pending)
{
try
{
// 发送到消息队列
await _messageBus.PublishAsync(message.Type, message.Payload, stoppingToken);
message.Status = "Sent";
message.SentAt = DateTime.UtcNow;
}
catch (Exception ex)
{
_logger.LogError(ex, "Outbox 消息发送失败: {Id}", message.Id);
message.RetryCount++;
}
}
await db.SaveChangesAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Outbox 处理异常");
}
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
}幂等性保证
/// <summary>
/// 幂等性 — 同一操作执行多次结果相同
/// 这是分布式事务中最重要的概念之一。
/// 补偿操作和消息处理都必须幂等。
/// </summary>
public class IdempotentOrderHandler
{
private readonly AppDbContext _db;
public IdempotentOrderHandler(AppDbContext db)
{
_db = db;
}
// 方式 1:唯一约束保证幂等
public async Task HandlePaymentResultAsync(PaymentResultEvent evt, CancellationToken ct)
{
// 检查是否已处理过此消息
var exists = await _db.IdempotencyKeys
.AnyAsync(k => k.MessageId == evt.MessageId, ct);
if (exists) return; // 已处理,直接返回(幂等)
await using var transaction = await _db.Database.BeginTransactionAsync(ct);
try
{
// 业务处理
var order = await _db.Orders.FindAsync(new object[] { evt.OrderId }, ct);
if (order != null)
{
order.Status = evt.Success ? OrderStatus.Paid : OrderStatus.PaymentFailed;
}
// 记录幂等键
_db.IdempotencyKeys.Add(new IdempotencyKey
{
MessageId = evt.MessageId,
ProcessedAt = DateTime.UtcNow
});
await _db.SaveChangesAsync(ct);
await transaction.CommitAsync(ct);
}
catch (DbUpdateException)
{
// 唯一约束冲突 — 说明另一个实例已处理
await transaction.RollbackAsync(ct);
}
}
}
public class IdempotencyKey
{
public int Id { get; set; }
public string MessageId { get; set; } = "";
public DateTime ProcessedAt { get; set; }
}
// 方式 2:乐观锁(版本号)保证幂等
public async Task<bool> UpdateOrderStatusAsync(
Guid orderId, OrderStatus newStatus, int expectedVersion, CancellationToken ct)
{
var affected = await _db.Orders
.Where(o => o.Id == orderId && o.Version == expectedVersion)
.ExecuteUpdateAsync(
s => s.SetProperty(o => o.Status, newStatus)
.SetProperty(o => o.Version, expectedVersion + 1),
ct);
return affected > 0; // 0 表示已被其他实例处理
}总结
分布式事务在 CAP 定理约束下,微服务通常选择 AP + 最终一致性。Saga 编排模式通过集中式协调器控制事务流程,每步执行正向操作,失败时反向补偿。TCC 模式通过 Try(预留)→ Confirm(确认)→ Cancel(取消)三阶段保证一致性。补偿操作必须幂等,且可能出现补偿失败需要人工介入。建议优先使用事件驱动+Outbox 模式实现最终一致性,Saga/TCC 适用于对一致性要求更高的场景。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
- 先把数据模型、访问模式和执行代价绑定起来理解。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
- 保留执行计划、样本 SQL、索引定义和优化前后指标。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
- 脱离真实数据分布设计索引。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
- 继续深入存储引擎、复制机制、归档与冷热分层治理。
适用场景
- 当你准备把《分布式事务与一致性》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《分布式事务与一致性》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《分布式事务与一致性》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《分布式事务与一致性》最大的收益和代价分别是什么?
