分布式系统设计模式
大约 17 分钟约 5037 字
分布式系统设计模式
简介
分布式系统设计模式是解决分布式环境下常见问题的可复用方案。当单体应用拆分为微服务后,网络延迟、部分失败、数据一致性、并发控制等问题变得极为复杂。本文将系统讲解幂等性设计、补偿模式、Saga 编排、熔断器、重试策略、隔板模式、健康检查、领导者选举、最终一致性、CQRS、事件溯源、发件箱模式等核心设计模式,并提供 .NET 实现代码。
特点
幂等性设计
幂等性核心概念
/// <summary>
/// 幂等性(Idempotency):同一操作执行一次和执行多次的效果相同
///
/// HTTP 方法的幂等性:
/// GET — 幂等(读取不变)
/// PUT — 幂等(全量替换,结果一致)
/// DELETE — 幂等(删除后再次删除无效果)
/// POST — 非幂等(每次创建新资源)
///
/// 实现幂等性的策略:
/// 1. 唯一请求 ID(Idempotency Key)
/// 2. 乐观并发控制(ETag / 版本号)
/// 3. 数据库唯一约束
/// 4. 状态机约束
/// </summary>
public class IdempotencyService
{
private readonly DbContext _dbContext;
public IdempotencyService(DbContext dbContext)
{
_dbContext = dbContext;
}
/// <summary>
/// 基于唯一请求 ID 的幂等控制
/// </summary>
public async Task<OrderResult> CreateOrderIdempotentAsync(
CreateOrderRequest request,
string idempotencyKey)
{
// 1. 检查是否已处理过该请求
var existingRecord = await _dbContext.Set<IdempotencyRecord>()
.FirstOrDefaultAsync(r => r.RequestId == idempotencyKey);
if (existingRecord != null)
{
// 已处理过,返回之前的结果
return JsonSerializer.Deserialize<OrderResult>(existingRecord.Response)!;
}
// 2. 执行业务逻辑
var order = new Order
{
Id = Guid.NewGuid(),
CustomerId = request.CustomerId,
Amount = request.Amount,
Status = OrderStatus.Created,
CreatedAt = DateTime.UtcNow
};
_dbContext.Set<Order>().Add(order);
// 3. 保存幂等记录(与业务操作在同一事务中)
var idempotencyRecord = new IdempotencyRecord
{
RequestId = idempotencyKey,
Response = JsonSerializer.Serialize(new OrderResult
{
OrderId = order.Id,
Status = order.Status.ToString()
}),
CreatedAt = DateTime.UtcNow
};
_dbContext.Set<IdempotencyRecord>().Add(idempotencyRecord);
await _dbContext.SaveChangesAsync();
return new OrderResult { OrderId = order.Id, Status = "Created" };
}
/// <summary>
/// 基于数据库唯一约束的幂等控制
/// </summary>
public async Task<bool> TransferAsync(
string fromAccount, string toAccount, decimal amount, string transferId)
{
try
{
// transferId 有唯一约束,重复插入会失败
var transfer = new TransferRecord
{
TransferId = transferId,
FromAccount = fromAccount,
ToAccount = toAccount,
Amount = amount,
Status = TransferStatus.Completed,
CreatedAt = DateTime.UtcNow
};
_dbContext.Set<TransferRecord>().Add(transfer);
// 更新账户余额
await _dbContext.Database.ExecuteSqlRawAsync(
"UPDATE Accounts SET Balance = Balance - {0} WHERE AccountNo = {1}",
amount, fromAccount);
await _dbContext.Database.ExecuteSqlRawAsync(
"UPDATE Accounts SET Balance = Balance + {0} WHERE AccountNo = {1}",
amount, toAccount);
await _dbContext.SaveChangesAsync();
return true;
}
catch (DbUpdateException)
{
// 唯一约束冲突 → 已处理过
return false;
}
}
}
public class IdempotencyRecord
{
public string RequestId { get; set; } = "";
public string Response { get; set; } = "";
public DateTime CreatedAt { get; set; }
}
public record OrderResult { public Guid OrderId { get; set; } public string Status { get; set; } = ""; }
public record CreateOrderRequest { public string CustomerId { get; set; } = ""; public decimal Amount { get; set; } }补偿模式与 Saga 编排
补偿事务
/// <summary>
/// 补偿模式(Compensation Pattern)
/// 在 Saga 中,每个步骤都有对应的补偿操作
/// 当后续步骤失败时,按逆序执行已完成步骤的补偿操作
/// </summary>
public interface ICompensatableAction
{
string ActionName { get; }
Task ExecuteAsync(SagaContext context);
Task CompensateAsync(SagaContext context);
}
public class SagaContext
{
public string SagaId { get; set; } = Guid.NewGuid().ToString();
public Dictionary<string, object> Data { get; set; } = new();
}
/// <summary>
/// Saga 协调器 — 管理步骤执行和补偿
/// </summary>
public class SagaOrchestrator
{
private readonly ILogger<SagaOrchestrator> _logger;
public SagaOrchestrator(ILogger<SagaOrchestrator> logger)
{
_logger = logger;
}
/// <summary>
/// 执行 Saga(顺序执行,失败时逆序补偿)
/// </summary>
public async Task<SagaResult> ExecuteAsync(
IEnumerable<ICompensatableAction> actions,
SagaContext context)
{
var completedActions = new Stack<ICompensatableAction>();
try
{
foreach (var action in actions)
{
_logger.LogInformation("执行步骤: {Action} (SagaId={SagaId})",
action.ActionName, context.SagaId);
await action.ExecuteAsync(context);
completedActions.Push(action);
}
_logger.LogInformation("Saga 完成: {SagaId}", context.SagaId);
return SagaResult.Success();
}
catch (Exception ex)
{
_logger.LogError(ex, "Saga 步骤失败,开始补偿 (SagaId={SagaId})",
context.SagaId);
// 逆序执行补偿
while (completedActions.TryPop(out var action))
{
try
{
_logger.LogWarning("补偿步骤: {Action} (SagaId={SagaId})",
action.ActionName, context.SagaId);
await action.CompensateAsync(context);
}
catch (Exception compensateEx)
{
_logger.LogError(compensateEx,
"补偿失败: {Action} (SagaId={SagaId})",
action.ActionName, context.SagaId);
// 记录失败的补偿,后续人工处理
}
}
return SagaResult.Failed(ex.Message);
}
}
}
public class SagaResult
{
public bool IsSuccess { get; set; }
public string ErrorMessage { get; set; } = "";
public static SagaResult Success() => new() { IsSuccess = true };
public static SagaResult Failed(string error) => new() { IsSuccess = false, ErrorMessage = error };
}订单创建 Saga 示例
/// <summary>
/// 订单创建 Saga — 包含库存锁定、支付、发货三个步骤
/// </summary>
public class ReserveInventoryAction : ICompensatableAction
{
private readonly IInventoryService _inventoryService;
public string ActionName => "ReserveInventory";
public ReserveInventoryAction(IInventoryService inventoryService)
{
_inventoryService = inventoryService;
}
public async Task ExecuteAsync(SagaContext context)
{
string orderId = context.Data["orderId"].ToString()!;
var items = (List<OrderItem>)context.Data["items"];
bool reserved = await _inventoryService.ReserveAsync(orderId, items);
if (!reserved)
throw new InvalidOperationException("库存不足");
context.Data["inventoryReserved"] = true;
}
public async Task CompensateAsync(SagaContext context)
{
string orderId = context.Data["orderId"].ToString()!;
await _inventoryService.ReleaseReservationAsync(orderId);
}
}
public class ProcessPaymentAction : ICompensatableAction
{
private readonly IPaymentService _paymentService;
public string ActionName => "ProcessPayment";
public ProcessPaymentAction(IPaymentService paymentService)
{
_paymentService = paymentService;
}
public async Task ExecuteAsync(SagaContext context)
{
string customerId = context.Data["customerId"].ToString()!;
decimal amount = (decimal)context.Data["amount"];
var paymentResult = await _paymentService.ChargeAsync(customerId, amount);
if (!paymentResult.Success)
throw new InvalidOperationException($"支付失败: {paymentResult.Message}");
context.Data["paymentTransactionId"] = paymentResult.TransactionId;
}
public async Task CompensateAsync(SagaContext context)
{
string transactionId = context.Data["paymentTransactionId"].ToString()!;
await _paymentService.RefundAsync(transactionId);
}
}
// 使用示例
public class OrderService
{
private readonly SagaOrchestrator _orchestrator;
public OrderService(SagaOrchestrator orchestrator)
{
_orchestrator = orchestrator;
}
public async Task<SagaResult> CreateOrderAsync(CreateOrderRequest request)
{
var context = new SagaContext();
context.Data["orderId"] = Guid.NewGuid().ToString();
context.Data["customerId"] = request.CustomerId;
context.Data["items"] = request.Items;
context.Data["amount"] = request.Amount;
var actions = new List<ICompensatableAction>
{
new ReserveInventoryAction(new InventoryService()),
new ProcessPaymentAction(new PaymentService()),
};
return await _orchestrator.ExecuteAsync(actions, context);
}
}
public interface IInventoryService
{
Task<bool> ReserveAsync(string orderId, List<OrderItem> items);
Task ReleaseReservationAsync(string orderId);
}
public interface IPaymentService
{
Task<PaymentResult> ChargeAsync(string customerId, decimal amount);
Task RefundAsync(string transactionId);
}
public record PaymentResult(bool Success, string TransactionId, string Message);
public record OrderItem(string ProductId, int Quantity);熔断器模式
Circuit Breaker 实现
/// <summary>
/// 熔断器(Circuit Breaker)
/// 保护系统免受级联故障影响
///
/// 三种状态:
/// Closed(关闭)— 正常调用,记录失败次数
/// Open(打开)— 直接拒绝请求,不调用下游
/// HalfOpen(半开)— 允许少量请求测试,判断是否恢复
///
/// 推荐:生产环境使用 Polly 库
/// NuGet: Install-Package Microsoft.Extensions.Http.Polly
/// </summary>
public class CircuitBreaker
{
private readonly CircuitBreakerConfig _config;
private readonly object _lock = new();
private CircuitState _state = CircuitState.Closed;
private int _failureCount;
private DateTime _lastFailureTime;
private int _halfOpenSuccessCount;
public CircuitBreaker(CircuitBreakerConfig config)
{
_config = config;
}
public CircuitState State
{
get
{
lock (_lock)
{
if (_state == CircuitState.Open &&
DateTime.UtcNow - _lastFailureTime > _config.ResetTimeout)
{
_state = CircuitState.HalfOpen;
_halfOpenSuccessCount = 0;
}
return _state;
}
}
}
/// <summary>
/// 通过熔断器执行操作
/// </summary>
public async Task<T> ExecuteAsync<T>(Func<Task<T>> action)
{
var state = State;
switch (state)
{
case CircuitState.Open:
throw new CircuitBreakerOpenException(
"熔断器已打开,请求被拒绝");
case CircuitState.HalfOpen:
return await ExecuteHalfOpenAsync(action);
case CircuitState.Closed:
default:
return await ExecuteClosedAsync(action);
}
}
private async Task<T> ExecuteClosedAsync<T>(Func<Task<T>> action)
{
try
{
var result = await action();
lock (_lock) { _failureCount = 0; }
return result;
}
catch (Exception ex)
{
RecordFailure();
throw;
}
}
private async Task<T> ExecuteHalfOpenAsync<T>(Func<Task<T>> action)
{
try
{
var result = await action();
lock (_lock)
{
_halfOpenSuccessCount++;
if (_halfOpenSuccessCount >= _config.HalfOpenSuccessThreshold)
{
_state = CircuitState.Closed;
_failureCount = 0;
}
}
return result;
}
catch
{
lock (_lock)
{
_state = CircuitState.Open;
_lastFailureTime = DateTime.UtcNow;
}
throw;
}
}
private void RecordFailure()
{
lock (_lock)
{
_failureCount++;
_lastFailureTime = DateTime.UtcNow;
if (_failureCount >= _config.FailureThreshold)
{
_state = CircuitState.Open;
}
}
}
}
public enum CircuitState { Closed, Open, HalfOpen }
public class CircuitBreakerConfig
{
public int FailureThreshold { get; set; } = 5;
public TimeSpan ResetTimeout { get; set; } = TimeSpan.FromSeconds(30);
public int HalfOpenSuccessThreshold { get; set; } = 3;
}
public class CircuitBreakerOpenException : Exception
{
public CircuitBreakerOpenException(string message) : base(message) { }
}使用 Polly 实现熔断(推荐)
/// <summary>
/// 使用 Polly 库实现熔断+重试组合策略
/// </summary>
using Polly;
using Polly.CircuitBreaker;
using Polly.Retry;
public class ResilientHttpClient
{
private readonly HttpClient _httpClient;
private readonly AsyncCircuitBreakerPolicy<HttpResponseMessage> _circuitBreaker;
private readonly AsyncRetryPolicy<HttpResponseMessage> _retryPolicy;
public ResilientHttpClient(HttpClient httpClient)
{
_httpClient = httpClient;
// 重试策略:指数退避,重试 3 次
_retryPolicy = Policy<HttpResponseMessage>
.Handle<HttpRequestException>()
.OrResult(r => (int)r.StatusCode >= 500)
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: attempt =>
TimeSpan.FromSeconds(Math.Pow(2, attempt)),
onRetry: (outcome, timespan, retryCount, context) =>
{
Console.WriteLine(
$"重试第 {retryCount} 次,等待 {timespan.TotalSeconds}s");
});
// 熔断策略:5 次失败后打开,30 秒后半开
_circuitBreaker = Policy<HttpResponseMessage>
.Handle<HttpRequestException>()
.OrResult(r => (int)r.StatusCode >= 500)
.CircuitBreakerAsync(
handledEventsAllowedBeforeBreaking: 5,
durationOfBreak: TimeSpan.FromSeconds(30),
onBreak: (ex, breakDelay) =>
{
Console.WriteLine(
$"熔断器打开,持续时间 {breakDelay.TotalSeconds}s");
},
onReset: () =>
{
Console.WriteLine("熔断器关闭,恢复正常");
});
// 组合策略:先重试再熔断
var combinedPolicy = Policy.WrapAsync(_retryPolicy, _circuitBreaker);
}
public async Task<HttpResponseMessage> GetAsync(string url)
{
var strategy = Policy.WrapAsync(_retryPolicy, _circuitBreaker);
return await strategy.ExecuteAsync(async () =>
{
var response = await _httpClient.GetAsync(url);
return response;
});
}
}重试与退避策略
指数退避重试
/// <summary>
/// 重试模式(Retry with Backoff)
///
/// 退避策略:
/// Fixed — 固定间隔重试
/// Linear — 线性递增间隔
/// Exponential — 指数递增间隔(推荐)
/// Jitter — 加入随机抖动避免惊群
/// </summary>
public class RetryPolicy
{
private readonly int _maxRetries;
private readonly TimeSpan _baseDelay;
private readonly Func<Exception, bool> _shouldRetry;
public RetryPolicy(
int maxRetries = 3,
TimeSpan? baseDelay = null,
Func<Exception, bool>? shouldRetry = null)
{
_maxRetries = maxRetries;
_baseDelay = baseDelay ?? TimeSpan.FromSeconds(1);
_shouldRetry = shouldRetry ?? (_ => true);
}
/// <summary>
/// 指数退避 + 随机抖动
/// </summary>
public async Task<T> ExecuteAsync<T>(Func<Task<T>> action)
{
Exception? lastException = null;
for (int attempt = 0; attempt <= _maxRetries; attempt++)
{
try
{
if (attempt > 0)
{
// 指数退避 + 随机抖动
double delayMs = _baseDelay.TotalMilliseconds
* Math.Pow(2, attempt - 1);
double jitterMs = Random.Shared.Next(0, (int)(delayMs * 0.2));
var delay = TimeSpan.FromMilliseconds(delayMs + jitterMs);
Console.WriteLine(
$"第 {attempt} 次重试,等待 {delay.TotalMilliseconds:F0}ms");
await Task.Delay(delay);
}
return await action();
}
catch (Exception ex) when (_shouldRetry(ex) && attempt < _maxRetries)
{
lastException = ex;
Console.WriteLine($"操作失败: {ex.Message},准备重试");
}
}
throw lastException!;
}
}
// 使用示例
public class RetryUsageExample
{
public async Task<string> FetchDataWithRetryAsync()
{
var retryPolicy = new RetryPolicy(
maxRetries: 3,
baseDelay: TimeSpan.FromSeconds(1),
shouldRetry: ex => ex is HttpRequestException or TimeoutException);
return await retryPolicy.ExecuteAsync(async () =>
{
using var client = new HttpClient();
return await client.GetStringAsync("https://api.example.com/data");
});
}
}隔板模式
资源隔离
/// <summary>
/// 隔板模式(Bulkhead Pattern)
/// 将系统资源划分为独立的池,防止一个故障耗尽所有资源
///
/// 类型:
/// Thread Pool Bulkhead — 线程池隔离
/// Semaphore Bulkhead — 信号量隔离
/// </summary>
public class BulkheadExecutor
{
private readonly SemaphoreSlim _semaphore;
private readonly ConcurrentQueue<Task> _waitingQueue = new();
private readonly int _maxConcurrency;
private readonly int _maxQueueSize;
public BulkheadExecutor(int maxConcurrency = 10, int maxQueueSize = 20)
{
_maxConcurrency = maxConcurrency;
_maxQueueSize = maxQueueSize;
_semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
}
/// <summary>
/// 在隔板内执行操作
/// </summary>
public async Task<T> ExecuteAsync<T>(Func<Task<T>> action)
{
if (_waitingQueue.Count >= _maxQueueSize)
{
throw new BulkheadRejectedException(
$"隔板已满,排队 {_waitingQueue.Count}/{_maxQueueSize}");
}
await _semaphore.WaitAsync();
try
{
return await action();
}
finally
{
_semaphore.Release();
}
}
/// <summary>
/// 当前可用资源
/// </summary>
public int AvailableConcurrency => _semaphore.CurrentCount;
}
public class BulkheadRejectedException : Exception
{
public BulkheadRejectedException(string message) : base(message) { }
}
/// <summary>
/// 按下游服务隔离的资源池
/// </summary>
public class ServiceBulkheadManager
{
private readonly ConcurrentDictionary<string, BulkheadExecutor> _executors = new();
public BulkheadExecutor GetOrCreate(string serviceName, int maxConcurrency = 5)
{
return _executors.GetOrAdd(serviceName,
_ => new BulkheadExecutor(maxConcurrency, maxConcurrency * 2));
}
}健康检查 API
.NET 健康检查实现
/// <summary>
/// 健康检查 API 模式
/// 微服务对外暴露健康状态,供负载均衡器/监控系统调用
///
/// .NET 内置 Health Checks:
/// NuGet: AspNetCore.HealthChecks.SqlServer
/// NuGet: AspNetCore.HealthChecks.Redis
/// </summary>
// 注册健康检查
public static class HealthCheckSetup
{
public static IServiceCollection AddHealthChecks(
this IServiceCollection services, IConfiguration configuration)
{
services.AddHealthChecks()
.AddSqlServer(
configuration.GetConnectionString("DefaultConnection")!,
name: "database",
tags: new[] { "ready" })
.AddRedis(
configuration.GetConnectionString("Redis")!,
name: "redis",
tags: new[] { "ready" })
.AddCheck<DiskSpaceHealthCheck>("disk", tags: new[] { "live" })
.AddCheck<ExternalApiHealthCheck>("external-api", tags: new[] { "ready" });
return services;
}
}
// 自定义健康检查
public class ExternalApiHealthCheck : IHealthCheck
{
private readonly HttpClient _httpClient;
public ExternalApiHealthCheck(HttpClient httpClient)
{
_httpClient = httpClient;
}
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
try
{
var response = await _httpClient.GetAsync(
"/health", cancellationToken);
if (response.IsSuccessStatusCode)
{
return HealthCheckResult.Healthy("外部 API 正常");
}
return HealthCheckResult.Degraded(
$"外部 API 返回 {response.StatusCode}");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy(
$"外部 API 不可用: {ex.Message}", ex);
}
}
}
public class DiskSpaceHealthCheck : IHealthCheck
{
public Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
var drive = new DriveInfo(Path.GetPathRoot(AppContext.BaseDirectory)!);
double freePercent = (double)drive.AvailableFreeSpace / drive.TotalSize * 100;
if (freePercent < 5)
{
return Task.FromResult(HealthCheckResult.Unhealthy(
$"磁盘剩余空间不足: {freePercent:F1}%"));
}
if (freePercent < 15)
{
return Task.FromResult(HealthCheckResult.Degraded(
$"磁盘空间偏低: {freePercent:F1}%"));
}
return Task.FromResult(HealthCheckResult.Healthy(
$"磁盘空间正常: {freePercent:F1}%"));
}
}CQRS 模式
命令查询职责分离
/// <summary>
/// CQRS(Command Query Responsibility Segregation)
/// 将读操作和写操作分离为不同的模型
///
/// Command — 写操作,修改状态,不返回数据
/// Query — 读操作,不修改状态,返回数据
///
/// 优点:
/// - 读写独立优化
/// - 读模型可以针对查询场景优化(如使用 Dapper)
/// - 写模型可以使用 ORM 保证一致性
/// </summary>
// 命令定义
public record CreateProductCommand(
string Name, decimal Price, string Category, int Stock);
public record UpdateProductCommand(
Guid Id, string Name, decimal Price, string Category);
// 查询定义
public record GetProductByIdQuery(Guid Id);
public record SearchProductsQuery(
string? Keyword, string? Category, int Page, int PageSize);
// 命令处理器
public class CreateProductCommandHandler
{
private readonly DbContext _dbContext;
private readonly IEventPublisher _eventPublisher;
public CreateProductCommandHandler(
DbContext dbContext, IEventPublisher eventPublisher)
{
_dbContext = dbContext;
_eventPublisher = eventPublisher;
}
public async Task<Guid> HandleAsync(CreateProductCommand command)
{
var product = new Product
{
Id = Guid.NewGuid(),
Name = command.Name,
Price = command.Price,
Category = command.Category,
Stock = command.Stock,
CreatedAt = DateTime.UtcNow
};
_dbContext.Set<Product>().Add(product);
await _dbContext.SaveChangesAsync();
await _eventPublisher.PublishAsync(new ProductCreatedEvent(product.Id));
return product.Id;
}
}
// 查询处理器(使用 Dapper 优化读取)
public class ProductQueryHandler
{
private readonly IDbConnection _dbConnection;
public ProductQueryHandler(IDbConnection dbConnection)
{
_dbConnection = dbConnection;
}
public async Task<ProductDto?> HandleAsync(GetProductByIdQuery query)
{
const string sql = @"
SELECT Id, Name, Price, Category, Stock, CreatedAt
FROM Products WHERE Id = @Id";
return await _dbConnection
.QueryFirstOrDefaultAsync<ProductDto>(sql, new { query.Id });
}
public async Task<PagedResult<ProductDto>> HandleAsync(SearchProductsQuery query)
{
var where = new List<string>();
var parameters = new DynamicParameters();
if (!string.IsNullOrEmpty(query.Keyword))
{
where.Add("Name LIKE @Keyword");
parameters.Add("Keyword", $"%{query.Keyword}%");
}
if (!string.IsNullOrEmpty(query.Category))
{
where.Add("Category = @Category");
parameters.Add("Category", query.Category);
}
string whereClause = where.Count > 0
? "WHERE " + string.Join(" AND ", where)
: "";
string countSql = $"SELECT COUNT(*) FROM Products {whereClause}";
int total = await _dbConnection.ExecuteScalarAsync<int>(countSql, parameters);
string dataSql = $@"
SELECT Id, Name, Price, Category, Stock, CreatedAt
FROM Products {whereClause}
ORDER BY CreatedAt DESC
OFFSET @Offset ROWS FETCH NEXT @PageSize ROWS ONLY";
parameters.Add("Offset", (query.Page - 1) * query.PageSize);
parameters.Add("PageSize", query.PageSize);
var items = (await _dbConnection
.QueryAsync<ProductDto>(dataSql, parameters)).ToList();
return new PagedResult<ProductDto>(items, total, query.Page, query.PageSize);
}
}
public record PagedResult<T>(List<T> Items, int Total, int Page, int PageSize);
public record ProductDto(Guid Id, string Name, decimal Price, string Category, int Stock, DateTime CreatedAt);
public record ProductCreatedEvent(Guid ProductId);
public interface IEventPublisher { Task PublishAsync<T>(T domainEvent); }发件箱模式(Outbox Pattern)
保证消息可靠投递
/// <summary>
/// Outbox Pattern(发件箱模式)
/// 保证数据库操作和消息发布的原子性
///
/// 问题:数据库事务提交后,消息发布可能失败
/// 解决:将消息先存入数据库的 Outbox 表,后台线程异步发送
/// </summary>
public class OutboxService
{
private readonly DbContext _dbContext;
private readonly IMessageBus _messageBus;
private readonly ILogger<OutboxService> _logger;
public OutboxService(
DbContext dbContext,
IMessageBus messageBus,
ILogger<OutboxService> logger)
{
_dbContext = dbContext;
_messageBus = messageBus;
_logger = logger;
}
/// <summary>
/// 在数据库事务中同时保存业务数据和消息
/// </summary>
public async Task SaveWithOutboxAsync<TEvent>(
IEnumerable<object> entities,
TEvent domainEvent,
string eventType) where TEvent : class
{
// 保存业务实体
foreach (var entity in entities)
{
_dbContext.Add(entity);
}
// 保存消息到 Outbox 表(同一事务)
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = eventType,
Payload = JsonSerializer.Serialize(domainEvent),
CreatedAt = DateTime.UtcNow,
Status = OutboxStatus.Pending
};
_dbContext.Set<OutboxMessage>().Add(outboxMessage);
// 事务提交
await _dbContext.SaveChangesAsync();
}
}
/// <summary>
/// Outbox 后台处理器 — 异步发送消息
/// </summary>
public class OutboxProcessor : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly TimeSpan _pollingInterval = TimeSpan.FromSeconds(2);
public OutboxProcessor(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ProcessPendingMessagesAsync(stoppingToken);
}
catch (Exception ex)
{
Console.WriteLine($"Outbox 处理异常: {ex.Message}");
}
await Task.Delay(_pollingInterval, stoppingToken);
}
}
private async Task ProcessPendingMessagesAsync(CancellationToken ct)
{
using var scope = _serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<DbContext>();
var messageBus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
var pendingMessages = await dbContext.Set<OutboxMessage>()
.Where(m => m.Status == OutboxStatus.Pending)
.OrderBy(m => m.CreatedAt)
.Take(50)
.ToListAsync(ct);
foreach (var message in pendingMessages)
{
try
{
await messageBus.PublishAsync(message.EventType, message.Payload);
message.Status = OutboxStatus.Sent;
message.SentAt = DateTime.UtcNow;
}
catch (Exception ex)
{
message.RetryCount++;
message.LastError = ex.Message;
if (message.RetryCount >= 5)
{
message.Status = OutboxStatus.Failed;
}
}
}
await dbContext.SaveChangesAsync(ct);
}
}
public class OutboxMessage
{
public Guid Id { get; set; }
public string EventType { get; set; } = "";
public string Payload { get; set; } = "";
public DateTime CreatedAt { get; set; }
public DateTime? SentAt { get; set; }
public OutboxStatus Status { get; set; }
public int RetryCount { get; set; }
public string? LastError { get; set; }
}
public enum OutboxStatus { Pending, Sent, Failed }
public interface IMessageBus
{
Task PublishAsync(string eventType, string payload);
}领导者选举
基于数据库的领导者选举
/// <summary>
/// 领导者选举(Leader Election)
/// 确保分布式系统中只有一个实例执行特定任务
///
/// 实现:
/// 1. 数据库行锁(简单但不推荐)
/// 2. Redis SETNX(推荐)
/// 3. Raft 协议(推荐但复杂)
/// </summary>
public class LeaderElectionService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly string _instanceId;
private readonly TimeSpan _leaseDuration = TimeSpan.FromSeconds(15);
private bool _isLeader;
public LeaderElectionService(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
_instanceId = Environment.MachineName + "_" + Guid.NewGuid().ToString("N")[..8];
}
public bool IsLeader => _isLeader;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await TryAcquireLeadershipAsync(stoppingToken);
}
catch (Exception ex)
{
Console.WriteLine($"领导者选举异常: {ex.Message}");
_isLeader = false;
}
// 续租间隔 = 租约时间 / 3
await Task.Delay(_leaseDuration / 3, stoppingToken);
}
}
private async Task TryAcquireLeadershipAsync(CancellationToken ct)
{
using var scope = _serviceProvider.CreateScope();
var redis = scope.ServiceProvider.GetRequiredService<IConnectionMultiplexer>();
var db = redis.GetDatabase();
// 尝试获取/续租
bool acquired = await db.StringSetAsync(
"leader:instance",
_instanceId,
_leaseDuration,
When.NotExists);
if (!acquired)
{
// 检查当前领导者是否是自己(续租场景)
string? currentLeader = await db.StringGetAsync("leader:instance");
_isLeader = currentLeader == _instanceId;
}
else
{
_isLeader = true;
Console.WriteLine($"本实例成为领导者: {_instanceId}");
}
// 如果是领导者,执行排他任务
if (_isLeader)
{
await ExecuteLeaderTasksAsync(ct);
}
}
private async Task ExecuteLeaderTasksAsync(CancellationToken ct)
{
// 只有领导者执行的任务
Console.WriteLine("执行领导者专属任务: 缓存预热、数据清理、报表生成...");
await Task.CompletedTask;
}
}超时模式
请求超时控制
/// <summary>
/// 超时模式(Timeout Pattern)
/// 为每个外部调用设置超时,防止无限等待
/// </summary>
public class TimeoutExecutor
{
private readonly TimeSpan _defaultTimeout;
public TimeoutExecutor(TimeSpan? defaultTimeout = null)
{
_defaultTimeout = defaultTimeout ?? TimeSpan.FromSeconds(30);
}
/// <summary>
/// 带超时执行异步操作
/// </summary>
public async Task<T> ExecuteAsync<T>(
Func<Task<T>> action,
TimeSpan? timeout = null)
{
using var cts = new CancellationTokenSource(timeout ?? _defaultTimeout);
try
{
return await action().WaitAsync(cts.Token);
}
catch (OperationCanceledException)
{
throw new TimeoutException(
$"操作在 {timeout ?? _defaultTimeout} 后超时");
}
}
/// <summary>
/// 带超时的 HTTP 调用
/// </summary>
public async Task<string> GetStringWithTimeoutAsync(
HttpClient client, string url, TimeSpan? timeout = null)
{
return await ExecuteAsync(
() => client.GetStringAsync(url),
timeout ?? TimeSpan.FromSeconds(10));
}
}优点
缺点
性能注意事项
总结
分布式系统设计模式是构建可靠微服务架构的基础。幂等性保证操作安全;Saga 和补偿处理分布式事务;熔断器防止级联故障;重试和退避策略处理瞬时故障;隔板模式隔离资源;发件箱保证消息不丢失;CQRS 分离读写关注点。选择合适的模式组合是构建高可用分布式系统的关键。
关键知识点
- 幂等性通过唯一请求 ID 或唯一约束实现
- Saga 协调式比编排式更可控,但协调器成为单点
- 熔断器三态转换:Closed → Open → HalfOpen → Closed
- 重试必须使用指数退避 + 随机抖动避免惊群
- 隔板模式将资源分池,防止一个故障耗尽所有资源
- Outbox 模式保证数据库操作和消息发布的原子性
- CQRS 让读写模型独立优化,但引入数据同步复杂性
- 领导者选举确保分布式任务只执行一次
常见误区
- 误区:每个微服务调用都需要熔断器
纠正:对关键的、频繁失败的下游服务使用熔断,内部调用不一定需要 - 误区:重试次数越多越好
纠正:过多的重试会加剧下游服务压力,3 次通常是合理的上限 - 误区:Saga 可以保证强一致性
纠正:Saga 只保证最终一致性,中间状态对外可见 - 误区:CQRS 必须用不同的数据库
纠正:读写可以使用同一个数据库,只是模型不同
进阶路线
- 初级:理解重试、超时、熔断器的基本原理
- 中级:使用 Polly 实现组合策略,理解幂等性
- 高级:实现 Saga 协调器,Outbox 模式,CQRS
- 专家级:Raft 共识实现,分布式事务框架设计
适用场景
- 微服务架构下的服务间调用
- 高并发交易系统
- 需要高可用性的支付/订单系统
- 大规模数据处理管道
- 多活/多区域部署
落地建议
- 项目初期即引入 Polly 作为弹性策略的基础库
- 所有外部调用必须设置超时
- 写操作必须实现幂等性
- 使用健康检查端点配合负载均衡器
- Outbox 表配合后台处理器保证消息不丢失
- 监控熔断器状态和重试次数作为系统健康指标
排错清单
复盘问题
- 熔断器打开后,用户体验如何保障?
- Saga 补偿失败怎么办?
- 如何测试重试逻辑是否正确?
- 多个实例的 Outbox 处理器如何避免重复发送?
- 领导者选举的网络分区问题如何处理?
- 如何监控分布式系统的端到端延迟?
- 超时时间应该设置为多少才合理?
- 如何在不停服的情况下调整熔断器参数?
