分布式事务
大约 8 分钟约 2471 字
分布式事务
简介
分布式事务是跨多个服务/数据库的事务操作,需要保证数据最终一致性。.NET 生态中主要有 TCC、Saga、本地消息表、Outbox 等模式。在微服务架构下,理解分布式事务的处理方式是构建可靠系统的关键。
特点
分布式事务模式
模式对比
| 模式 | 一致性 | 性能 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 2PC | 强一致 | 低 | 高 | 数据库层面 |
| TCC | 最终一致 | 高 | 高 | 资金交易 |
| Saga | 最终一致 | 高 | 中 | 业务流程 |
| 本地消息表 | 最终一致 | 高 | 低 | 异步通知 |
| Outbox | 最终一致 | 高 | 中 | 事件驱动 |
本地消息表模式
实现方式
/// <summary>
/// 本地消息表 — 最实用的分布式事务方案
/// 核心思想:业务操作和消息存储在同一个事务中
/// </summary>
public class OrderService
{
private readonly AppDbContext _dbContext;
private readonly ILogger<OrderService> _logger;
public OrderService(AppDbContext dbContext, ILogger<OrderService> logger)
{
_dbContext = dbContext;
_logger = logger;
}
// 创建订单 + 存储消息 — 同一事务
public async Task CreateOrderAsync(CreateOrderRequest request)
{
using var transaction = await _dbContext.Database.BeginTransactionAsync();
try
{
// 1. 业务操作 — 创建订单
var order = new Order
{
Id = Guid.NewGuid(),
UserId = request.UserId,
TotalAmount = request.Items.Sum(i => i.Price * i.Quantity),
Status = "Created",
CreatedAt = DateTime.UtcNow
};
_dbContext.Orders.Add(order);
// 2. 同一事务中存储消息
var message = new OutboxMessage
{
Id = Guid.NewGuid(),
MessageType = "OrderCreated",
AggregateId = order.Id,
Payload = JsonSerializer.Serialize(new
{
order.Id,
order.UserId,
order.TotalAmount,
request.Items
}),
Status = "Pending",
CreatedAt = DateTime.UtcNow
};
_dbContext.OutboxMessages.Add(message);
// 3. 一起提交
await _dbContext.SaveChangesAsync();
await transaction.CommitAsync();
_logger.LogInformation("订单创建成功:{OrderId}", order.Id);
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}
// Outbox 消息实体
public class OutboxMessage
{
public Guid Id { get; set; }
public string MessageType { get; set; } = "";
public Guid AggregateId { get; set; }
public string Payload { get; set; } = "";
public string Status { get; set; } = "Pending"; // Pending, Sent, Failed
public int RetryCount { get; set; }
public DateTime CreatedAt { get; set; }
public DateTime? SentAt { get; set; }
public string? Error { get; set; }
}消息发送后台服务
/// <summary>
/// Outbox 消息发送器 — 后台服务定时发送未处理的消息
/// </summary>
public class OutboxDispatcher : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<OutboxDispatcher> _logger;
public OutboxDispatcher(IServiceScopeFactory scopeFactory, ILogger<OutboxDispatcher> logger)
{
_scopeFactory = scopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
using var scope = _scopeFactory.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var bus = scope.ServiceProvider.GetRequiredService<IBus>();
// 获取待发送消息
var messages = await dbContext.OutboxMessages
.Where(m => m.Status == "Pending" && m.RetryCount < 5)
.OrderBy(m => m.CreatedAt)
.Take(50)
.ToListAsync(stoppingToken);
foreach (var msg in messages)
{
try
{
// 发送到消息队列
await bus.Publish(msg.MessageType, msg.Payload, stoppingToken);
msg.Status = "Sent";
msg.SentAt = DateTime.UtcNow;
}
catch (Exception ex)
{
msg.RetryCount++;
msg.Error = ex.Message;
_logger.LogWarning(ex, "消息发送失败:{Id},重试 {Count} 次", msg.Id, msg.RetryCount);
}
}
await dbContext.SaveChangesAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Outbox 调度异常");
}
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
}Saga 模式
编排式 Saga
/// <summary>
/// Saga 编排 — 中央协调器管理流程
/// 每步成功执行下一步,失败执行补偿
/// </summary>
public class OrderSagaOrchestrator
{
private readonly IOrderService _orderService;
private readonly IPaymentService _paymentService;
private readonly IInventoryService _inventoryService;
private readonly ISagaLogger _sagaLogger;
public OrderSagaOrchestrator(
IOrderService orderService,
IPaymentService paymentService,
IInventoryService inventoryService,
ISagaLogger sagaLogger)
{
_orderService = orderService;
_paymentService = paymentService;
_inventoryService = inventoryService;
_sagaLogger = sagaLogger;
}
public async Task<SagaResult> ExecuteAsync(CreateOrderSagaRequest request)
{
var sagaId = Guid.NewGuid();
var compensations = new Stack<Func<Task>>();
try
{
// Step 1: 创建订单
_sagaLogger.Log(sagaId, "CreateOrder", "Started");
var orderId = await _orderService.CreateAsync(request);
compensations.Push(async () => await _orderService.CancelAsync(orderId));
// Step 2: 扣减库存
_sagaLogger.Log(sagaId, "ReserveInventory", "Started");
await _inventoryService.ReserveAsync(orderId, request.Items);
compensations.Push(async () => await _inventoryService.ReleaseAsync(orderId));
// Step 3: 处理支付
_sagaLogger.Log(sagaId, "ProcessPayment", "Started");
await _paymentService.ChargeAsync(orderId, request.TotalAmount);
compensations.Push(async () => await _paymentService.RefundAsync(orderId));
// Step 4: 确认订单
await _orderService.ConfirmAsync(orderId);
_sagaLogger.Log(sagaId, "Saga", "Completed");
return SagaResult.Success(orderId);
}
catch (Exception ex)
{
_sagaLogger.Log(sagaId, "Saga", $"Failed: {ex.Message}");
// 执行补偿(逆序)
while (compensations.Count > 0)
{
var compensate = compensations.Pop();
try
{
await compensate();
}
catch (Exception compEx)
{
_sagaLogger.Log(sagaId, "Compensate", $"补偿失败:{compEx.Message}");
}
}
return SagaResult.Failure(ex.Message);
}
}
}
public record SagaResult(bool Success, Guid? OrderId, string? Error)
{
public static SagaResult Success(Guid orderId) => new(true, orderId, null);
public static SagaResult Failure(string error) => new(false, null, error);
}协同式 Saga — 事件驱动
/// <summary>
/// 协同式 Saga — 各服务自行订阅事件、执行逻辑、发布新事件
/// 无中央协调器,松耦合
/// </summary>
// 订单服务
public class OrderService
{
public async Task HandleInventoryReservedAsync(InventoryReservedEvent evt)
{
// 库存已预留,处理支付
await _bus.Publish(new ProcessPaymentCommand(evt.OrderId, evt.Amount));
}
public async Task HandlePaymentCompletedAsync(PaymentCompletedEvent evt)
{
// 支付完成,确认订单
await ConfirmOrderAsync(evt.OrderId);
await _bus.Publish(new OrderConfirmedEvent(evt.OrderId));
}
public async Task HandlePaymentFailedAsync(PaymentFailedEvent evt)
{
// 支付失败,取消订单,释放库存
await CancelOrderAsync(evt.OrderId);
await _bus.Publish(new ReleaseInventoryCommand(evt.OrderId));
}
}
// 库存服务
public class InventoryService
{
public async Task HandleOrderCreatedAsync(OrderCreatedEvent evt)
{
// 订单创建,预留库存
await ReserveAsync(evt.OrderId, evt.Items);
await _bus.Publish(new InventoryReservedEvent(evt.OrderId, evt.TotalAmount));
}
public async Task HandleReleaseInventoryAsync(ReleaseInventoryCommand cmd)
{
// 释放库存
await ReleaseAsync(cmd.OrderId);
}
}
// 支付服务
public class PaymentService
{
public async Task HandleProcessPaymentAsync(ProcessPaymentCommand cmd)
{
try
{
await ChargeAsync(cmd.OrderId, cmd.Amount);
await _bus.Publish(new PaymentCompletedEvent(cmd.OrderId));
}
catch
{
await _bus.Publish(new PaymentFailedEvent(cmd.OrderId, "支付失败"));
}
}
}TCC 模式
Try-Confirm-Cancel
/// <summary>
/// TCC — Try 预留资源、Confirm 确认、Cancel 取消
/// </summary>
public interface ITccParticipant
{
Task TryAsync(TccContext context);
Task ConfirmAsync(TccContext context);
Task CancelAsync(TccContext context);
}
// 账户服务 TCC 实现
public class AccountTccParticipant : ITccParticipant
{
private readonly AppDbContext _dbContext;
public AccountTccParticipant(AppDbContext dbContext) => _dbContext = dbContext;
// Try — 冻结金额
public async Task TryAsync(TccContext context)
{
var account = await _dbContext.Accounts.FindAsync(context.UserId);
if (account == null || account.AvailableBalance < context.Amount)
throw new InvalidOperationException("余额不足");
// 冻结金额
account.AvailableBalance -= context.Amount;
account.FrozenBalance += context.Amount;
await _dbContext.SaveChangesAsync();
}
// Confirm — 扣减冻结金额
public async Task ConfirmAsync(TccContext context)
{
var account = await _dbContext.Accounts.FindAsync(context.UserId);
account!.FrozenBalance -= context.Amount;
await _dbContext.SaveChangesAsync();
}
// Cancel — 解冻金额
public async Task CancelAsync(TccContext context)
{
var account = await _dbContext.Accounts.FindAsync(context.UserId);
account!.FrozenBalance -= context.Amount;
account.AvailableBalance += context.Amount;
await _dbContext.SaveChangesAsync();
}
}
// TCC 协调器
public class TccCoordinator
{
private readonly List<(ITccParticipant, TccContext)> _participants = new();
public void AddParticipant(ITccParticipant participant, TccContext context)
{
_participants.Add((participant, context));
}
public async Task ExecuteAsync()
{
var tried = new List<int>();
try
{
// Try 阶段
for (int i = 0; i < _participants.Count; i++)
{
await _participants[i].Item1.TryAsync(_participants[i].Item2);
tried.Add(i);
}
// Confirm 阶段
foreach (var (participant, context) in _participants)
{
await participant.ConfirmAsync(context);
}
}
catch
{
// Cancel 阶段 — 只取消已 Try 成功的
foreach (var i in tried.AsEnumerable().Reverse())
{
try { await _participants[i].Item1.CancelAsync(_participants[i].Item2); }
catch { /* 记录日志 */ }
}
throw;
}
}
}幂等性保证
幂等处理
/// <summary>
/// 幂等性 — 同一操作执行多次结果相同
/// </summary>
public class IdempotentHandler
{
private readonly AppDbContext _dbContext;
public IdempotentHandler(AppDbContext dbContext) => _dbContext = dbContext;
public async Task HandleAsync<TMessage>(TMessage message, Func<TMessage, Task> handler)
where TMessage : IIdempotentMessage
{
// 检查是否已处理
var exists = await _dbContext.ProcessedMessages
.AnyAsync(m => m.MessageId == message.MessageId);
if (exists) return; // 已处理,跳过
// 执行业务逻辑
await handler(message);
// 记录已处理
_dbContext.ProcessedMessages.Add(new ProcessedMessage
{
MessageId = message.MessageId,
MessageType = typeof(TMessage).Name,
ProcessedAt = DateTime.UtcNow
});
await _dbContext.SaveChangesAsync();
}
}
public interface IIdempotentMessage
{
Guid MessageId { get; }
}
public class ProcessedMessage
{
public Guid MessageId { get; set; }
public string MessageType { get; set; } = "";
public DateTime ProcessedAt { get; set; }
}优点
缺点
总结
分布式事务是微服务架构的核心难题。推荐优先使用本地消息表模式(最简单实用),业务流程使用 Saga 编排,资金交易使用 TCC。核心原则:拥抱最终一致性,保证幂等性,做好补偿和重试。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
- 先把数据模型、访问模式和执行代价绑定起来理解。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
- 保留执行计划、样本 SQL、索引定义和优化前后指标。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
- 脱离真实数据分布设计索引。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
- 继续深入存储引擎、复制机制、归档与冷热分层治理。
适用场景
- 当你准备把《分布式事务》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《分布式事务》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《分布式事务》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《分布式事务》最大的收益和代价分别是什么?
