数据库连接弹性
大约 9 分钟约 2835 字
数据库连接弹性
简介
数据库连接弹性关注的是"短暂失败时系统如何自动恢复",而不是简单地不断重试。真实项目里的连接失败通常来自网络抖动、瞬时锁等待、数据库主从切换、连接池耗尽等瞬态故障,因此需要把重试、超时、熔断、幂等与监控放在一起设计。
常见数据库瞬态故障
故障类型 典型场景 是否适合重试
----------------------------------------------------------------
网络闪断 交换机重启、DNS 解析失败 是
连接超时 网络拥塞、数据库负载高 是
死锁 事务并发冲突 是(有限次数)
主从切换 数据库故障转移(秒级) 是
连接池耗尽 突发流量超过连接池上限 否(需要降级)
表/字段不存在 部署不完整 否
权限不足 配置错误 否
约束违反 业务逻辑错误 否
数据格式错误 数据损坏 否
关键原则:只对瞬态故障(Transient Faults)重试弹性策略层次
请求到达
|
v
[超时控制] -- 防止单个请求无限等待
|
v
[重试策略] -- 瞬态故障自动重试(指数退避)
|
v
[熔断器] -- 连续失败时快速失败,保护下游
|
v
[降级策略] -- 熔断期间返回缓存/默认值
|
v
[健康检查] -- 探测数据库是否恢复
|
v
[恢复] -- 数据库恢复后自动关闭熔断特点
实现
EF Core 内置重试与事务执行策略
// ============================================
// Program.cs — EF Core 连接弹性配置
// ============================================
using Microsoft.EntityFrameworkCore;
var builder = WebApplication.CreateBuilder(args);
var connectionString = builder.Configuration.GetConnectionString("DefaultConnection")!;
builder.Services.AddDbContext<AppDbContext>(options =>
{
options.UseSqlServer(connectionString, sqlOptions =>
{
// 启用重试策略
sqlOptions.EnableRetryOnFailure(
maxRetryCount: 5, // 最大重试次数
maxRetryDelay: TimeSpan.FromSeconds(10), // 最大重试延迟
errorNumbersToAdd: new[]
{
1205, // 死锁
4060, // 无法打开数据库
18456 // 登录失败(密码错误除外)
});
// 命令超时
sqlOptions.CommandTimeout(30);
// 启用详细错误(仅开发环境)
// sqlOptions.EnableDetailedErrors();
// sqlOptions.EnableSensitiveDataLogging();
});
});// ============================================
// PostgreSQL 的重试配置
// ============================================
builder.Services.AddDbContext<AppDbContext>(options =>
{
options.UseNpgsql(connectionString, npgsqlOptions =>
{
npgsqlOptions.EnableRetryOnFailure(
maxRetryCount: 5,
maxRetryDelay: TimeSpan.FromSeconds(10),
errorCodesToAdd: new[]
{
"53000", // 系统错误(类)
"08001", // SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION
"08004", // SQLSERVER_REJECTED_ESTABLISHMENT_OF_SQLCONNECTION
"08006", // CONNECTION_FAILURE
"08007", // TRANSACTION_RESOLUTION_UNKNOWN
"57P01", // ADMIN_SHUTDOWN
"57P02", // CRASH_SHUTDOWN
"57P03", // CANNOT_CONNECT_NOW
"53300", // TOO_MANY_CONNECTIONS
"54000", // PROGRAM_LIMIT_EXCEEDED
});
npgsqlOptions.CommandTimeout(30);
});
});// ============================================
// 事务中使用执行策略
// ============================================
public class OrderService
{
private readonly AppDbContext _dbContext;
public OrderService(AppDbContext dbContext)
{
_dbContext = dbContext;
}
/// <summary>
/// 创建订单 — 事务 + 执行策略
/// 关键:事务必须在执行策略的回调内部创建
/// </summary>
public async Task<long> CreateOrderAsync(CreateOrderRequest request)
{
var strategy = _dbContext.Database.CreateExecutionStrategy();
return await strategy.ExecuteAsync(async () =>
{
await using var transaction = await _dbContext.Database.BeginTransactionAsync();
try
{
// 1. 创建订单
var order = new Order
{
CustomerId = request.CustomerId,
Amount = request.Amount,
Currency = "CNY",
Status = "Created",
CreatedAt = DateTimeOffset.UtcNow
};
_dbContext.Orders.Add(order);
await _dbContext.SaveChangesAsync();
// 2. 写入 Outbox(同一事务,保证原子性)
_dbContext.OutboxMessages.Add(new OutboxMessage
{
Type = "OrderCreated",
AggregateId = order.Id.ToString(),
Payload = JsonSerializer.Serialize(new
{
order.Id,
order.Amount,
order.CustomerId
}),
TargetTopic = "order-events",
CreatedAt = DateTimeOffset.UtcNow
});
await _dbContext.SaveChangesAsync();
await transaction.CommitAsync();
return order.Id;
}
catch (Exception ex)
{
await transaction.RollbackAsync();
throw;
}
});
}
}幂等写操作保护
// ============================================
// 幂等键 + 数据库约束 防止重复写入
// ============================================
public class IdempotentOrderService
{
private readonly AppDbContext _dbContext;
public async Task<long> CreateOrderAsync(
string idempotencyKey,
CreateOrderRequest request)
{
var strategy = _dbContext.Database.CreateExecutionStrategy();
return await strategy.ExecuteAsync(async () =>
{
await using var transaction = await _dbContext.Database.BeginTransactionAsync();
// 检查幂等键是否已存在
var existing = await _dbContext.IdempotentRecords
.FirstOrDefaultAsync(r => r.Key == idempotencyKey);
if (existing != null)
{
// 幂等键已存在,返回之前的订单 ID
return existing.ResultId;
}
var order = new Order
{
CustomerId = request.CustomerId,
Amount = request.Amount,
Status = "Created",
CreatedAt = DateTimeOffset.UtcNow
};
_dbContext.Orders.Add(order);
await _dbContext.SaveChangesAsync();
// 保存幂等记录
_dbContext.IdempotentRecords.Add(new IdempotentRecord
{
Key = idempotencyKey,
ResultId = order.Id,
RequestPayload = JsonSerializer.Serialize(request),
CreatedAt = DateTimeOffset.UtcNow
});
await _dbContext.SaveChangesAsync();
await transaction.CommitAsync();
return order.Id;
});
}
}
// 数据库表
public class IdempotentRecord
{
public long Id { get; set; }
public string Key { get; set; } = string.Empty; // 唯一索引
public long ResultId { get; set; }
public string? RequestPayload { get; set; }
public DateTimeOffset CreatedAt { get; set; }
}Polly 弹性策略
// ============================================
// Polly v8 弹性管道 — 重试 + 超时 + 熔断
// ============================================
using Polly;
using Polly.CircuitBreaker;
using Polly.Retry;
using Polly.Timeout;
builder.Services.AddResiliencePipeline("db-query", pipelineBuilder =>
{
pipelineBuilder
// 1. 超时 — 防止单个查询无限等待
.AddTimeout(new TimeoutStrategyOptions
{
Timeout = TimeSpan.FromSeconds(3),
OnTimeout = args =>
{
Console.WriteLine($"数据库查询超时: {args.Duration.TotalSeconds}s");
return default;
}
})
// 2. 重试 — 瞬态故障自动重试
.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromMilliseconds(300),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true, // 添加随机抖动,避免重试风暴
ShouldHandle = new PredicateBuilder()
.Handle<TimeoutRejectedException>()
.Handle<SqlException>(ex => IsTransientSqlError(ex))
.Handle<TimeoutException>()
,
OnRetry = args =>
{
Console.WriteLine(
$"数据库重试: 第 {args.AttemptNumber} 次, " +
$"延迟 {args.RetryDelay.TotalMilliseconds}ms, " +
$"异常: {args.Outcome.Exception?.Message}");
return default;
}
})
// 3. 熔断 — 连续失败时快速失败
.AddCircuitBreaker(new CircuitBreakerStrategyOptions
{
FailureRatio = 0.5, // 50% 失败率触发熔断
SamplingDuration = TimeSpan.FromSeconds(20), // 统计窗口
MinimumThroughput = 10, // 最少 10 次请求才判断
BreakDuration = TimeSpan.FromSeconds(30), // 熔断持续时间
ShouldHandle = new PredicateBuilder()
.Handle<SqlException>()
.Handle<TimeoutException>(),
OnOpened = args =>
{
Console.WriteLine("数据库熔断器打开!系统进入降级模式");
return default;
},
OnClosed = args =>
{
Console.WriteLine("数据库熔断器关闭!恢复正常访问");
return default;
},
OnHalfOpened = args =>
{
Console.WriteLine("数据库熔断器半开!尝试探测恢复");
return default;
}
});
});
// ============================================
// 判断 SQL 错误是否为瞬态错误
// ============================================
private static bool IsTransientSqlError(SqlException ex)
{
// SQL Server 瞬态错误码
var transientErrorNumbers = new HashSet<int>
{
4060, // 无法打开数据库
40197, // 服务运行中遇到错误
40501, // 服务当前繁忙
40613, // 数据库不可达
49918, // 无法处理请求(暂时性)
49919, // 无法处理创建/更新请求
49920, // 无法处理请求(参数过多)
4221, // 登录超时
11001, // DNS 解析失败
1205, // 死锁
-2, // 超时
};
return ex.Number > 0 && transientErrorNumbers.Contains(ex.Number);
}健康检查、只读降级与监控
// ============================================
// 数据库健康检查
// ============================================
builder.Services.AddHealthChecks()
.AddSqlServer(
connectionString,
name: "sqlserver",
timeout: TimeSpan.FromSeconds(3),
tags: new[] { "db", "ready" });
var app = builder.Build();
app.MapHealthChecks("/health/db", new HealthCheckOptions
{
ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse
});// ============================================
// 主从降级 — 读操作故障转移
// ============================================
public interface IOrderReadRepository
{
Task<OrderDetailDto?> GetAsync(long id, CancellationToken ct);
}
public class PrimaryOrderReadRepository : IOrderReadRepository
{
private readonly AppDbContext _context;
public PrimaryOrderReadRepository(AppDbContext context)
{
_context = context;
}
public async Task<OrderDetailDto?> GetAsync(long id, CancellationToken ct)
{
return await _context.Orders
.AsNoTracking()
.Where(o => o.Id == id)
.Select(o => new OrderDetailDto(o.Id, o.Amount, o.Status))
.FirstOrDefaultAsync(ct);
}
}
public class ReplicaOrderReadRepository : IOrderReadRepository
{
private readonly AppDbContext _replicaContext;
public ReplicaOrderReadRepository(AppDbContext replicaContext)
{
_replicaContext = replicaContext;
}
public async Task<OrderDetailDto?> GetAsync(long id, CancellationToken ct)
{
return await _replicaContext.Orders
.AsNoTracking()
.Where(o => o.Id == id)
.Select(o => new OrderDetailDto(o.Id, o.Amount, o.Status))
.FirstOrDefaultAsync(ct);
}
}
/// <summary>
/// 降级仓库 — 主库失败自动切换到从库
/// </summary>
public class FallbackOrderReadRepository : IOrderReadRepository
{
private readonly PrimaryOrderReadRepository _primary;
private readonly ReplicaOrderReadRepository _replica;
private readonly ILogger<FallbackOrderReadRepository> _logger;
private readonly ResiliencePipeline _pipeline;
public FallbackOrderReadRepository(
PrimaryOrderReadRepository primary,
ReplicaOrderReadRepository replica,
ILogger<FallbackOrderReadRepository> logger)
{
_primary = primary;
_replica = replica;
_logger = logger;
_pipeline = new ResiliencePipelineBuilder()
.AddTimeout(TimeSpan.FromSeconds(3))
.AddCircuitBreaker(new CircuitBreakerStrategyOptions
{
FailureRatio = 0.5,
SamplingDuration = TimeSpan.FromSeconds(30),
BreakDuration = TimeSpan.FromSeconds(15)
})
.Build();
}
public async Task<OrderDetailDto?> GetAsync(long id, CancellationToken ct)
{
try
{
return await _pipeline.ExecuteAsync(async _ =>
await _primary.GetAsync(id, ct), ct);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"主库查询失败,降级到只读副本. OrderId={OrderId}", id);
return await _replica.GetAsync(id, ct);
}
}
}连接池配置
// ============================================
// 连接池优化配置
// ============================================
builder.Services.AddDbContext<AppDbContext>(options =>
{
options.UseSqlServer(connectionString, sqlOptions =>
{
sqlOptions.EnableRetryOnFailure(5);
sqlOptions.CommandTimeout(30);
});
// 连接池配置(通过连接字符串)
// Server=myserver;Database=mydb;User Id=myuser;Password=mypass;
// Max Pool Size=100; -- 最大连接数(默认 100)
// Min Pool Size=5; -- 最小连接数(默认 0)
// Connect Timeout=15; -- 连接超时(默认 15 秒)
// Connection Idle Timeout=180; -- 空闲连接超时(默认 180 秒)
// Connection Reset=false; -- 连接重置(默认 true)
});重试指标监控
/// <summary>
/// 数据库弹性指标
/// </summary>
public class DbResiliencyMetrics
{
private readonly Counter<long> _retryCount;
private readonly Counter<long> _timeoutCount;
private readonly Counter<long> _circuitOpenCount;
private readonly Counter<long> _failoverCount;
private readonly Histogram<double> _retryDelay;
public DbResiliencyMetrics(IMeterFactory meterFactory)
{
var meter = meterFactory.Create("MyApp.Database.Resiliency", "1.0.0");
_retryCount = meter.CreateCounter<long>("db.retry.count", "次");
_timeoutCount = meter.CreateCounter<long>("db.timeout.count", "次");
_circuitOpenCount = meter.CreateCounter<long>("db.circuit.open.count", "次");
_failoverCount = meter.CreateCounter<long>("db.failover.count", "次");
_retryDelay = meter.CreateHistogram<double>("db.retry.delay", "ms");
}
public void RecordRetry(int attempt, string errorType, double delayMs)
{
_retryCount.Add(1,
new("attempt", attempt.ToString()),
new("error_type", errorType));
_retryDelay.Record(delayMs);
}
public void RecordTimeout(string operation) =>
_timeoutCount.Add(1, new("operation", operation));
public void RecordCircuitOpen() => _circuitOpenCount.Add(1);
public void RecordFailover(string from, string to) =>
_failoverCount.Add(1, new("from", from), new("to", to));
}优点
缺点
总结
数据库连接弹性的关键不是"失败就重试",而是"只对值得重试的故障重试"。项目里通常以 EF Core 执行策略处理数据库瞬态错误,再用 Polly 补齐超时、熔断和降级,同时通过幂等键、Outbox、监控指标保证写操作安全。读路径可以降级到只读副本,写路径必须保证幂等性。
关键知识点
- 只把网络抖动、主从切换、超时等瞬态故障纳入重试范围。
- 事务重试必须通过
CreateExecutionStrategy()包裹完整事务单元。 - 非幂等副作用不要直接放进可透明重试的代码路径。
- 连接弹性必须配合健康检查、日志和指标一起上线。
项目落地视角
- 订单查询读路径允许重试,并在主库异常时降级到只读副本。
- 支付下单写路径使用幂等键 + Outbox,避免重试重复扣款。
- 数据库维护窗口期间,接口可快速进入只读模式。
- 高并发接口通过短超时 + 熔断,防止线程池和连接池被拖死。
常见误区
- 对所有异常一律重试,连参数错误也重试。
- 把发短信、发 MQ、扣款等副作用放进重试事务中。
- 重试次数过多、超时时间过长,导致雪崩更严重。
- 没有记录重试次数和熔断状态,上线后无法观察效果。
进阶路线
- 学习 SQL Server / PostgreSQL 官方瞬态错误码分类。
- 引入 Outbox / Inbox 保证消息和数据库写入一致性。
- 为读写分离场景设计不同的弹性策略模板。
- 将数据库弹性与 HttpClient、Redis、MQ 弹性策略统一治理。
适用场景
- 高并发 API 的数据库读写保护。
- 云数据库主从故障切换场景。
- 微服务调用链较长、数据库波动明显的业务系统。
- 对稳定性要求高的订单、库存、账务类应用。
落地建议
- 先定义哪些异常允许重试,哪些异常必须立即失败。
- 把读操作和写操作的弹性策略分开设计。
- 为关键写请求设计幂等键与重复提交保护。
- 为重试、熔断、故障转移建立统一日志字段和监控面板。
排错清单
- 检查是否正确启用了
EnableRetryOnFailure()。 - 检查事务是否使用
CreateExecutionStrategy()包裹。 - 检查异常是否真的是瞬态错误而不是业务/配置错误。
- 检查连接池、命令超时、数据库慢查询和锁等待情况。
复盘问题
- 当前系统哪些数据库失败适合自动重试,哪些不适合?
- 写操作是否具备幂等能力,重试后会不会产生重复副作用?
- 如果数据库出现 30 秒不可用,系统的降级路径是什么?
- 你是否能从日志和指标中看出"恢复靠的是哪种策略"?
