熔断器与弹性策略
大约 9 分钟约 2754 字
熔断器与弹性策略
简介
分布式系统中,服务间调用不可避免会出现故障。熔断器(Circuit Breaker)模式防止故障级联扩散,弹性策略(Resilience)通过重试、超时、舱壁等机制提升系统容错能力。理解 Polly v8 的策略组合和自定义扩展,有助于构建健壮的微服务通信。
特点
Polly v8 基础
弹性管道构建
// dotnet add package Microsoft.Extensions.Http.Resilience
// dotnet add package Polly.Extensions
// .NET 8+ 推荐使用 Microsoft.Extensions.Http.Resilience
// 1. 标准弹性管道(推荐)
builder.Services.AddHttpClient<OrderServiceClient>(client =>
{
client.BaseAddress = new Uri("https://order-service:8080");
})
.AddStandardResilienceHandler();
// 标准管道包含(按顺序):
// Total Request Timeout → Rate Limiter → Retry → Circuit Breaker → Attempt Timeout
// 2. 自定义弹性管道
builder.Services.AddHttpClient<UserServiceClient>(client =>
{
client.BaseAddress = new Uri("https://user-service:8080");
})
.AddResilienceHandler("custom-pipeline", builder =>
{
// 总超时
builder.AddTotalTimeout(new TotalTimeoutStrategyOptions
{
TotalTimeout = TimeSpan.FromSeconds(30),
OnTimeout = args =>
{
Console.WriteLine($"总超时: {args.Context}");
return default;
}
});
// 重试
builder.AddRetry(new HttpRetryStrategyOptions
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromMilliseconds(500),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true, // 启用抖动
ShouldHandle = new PredicateBuilder<HttpResponseMessage>()
.Handle<HttpRequestException>()
.HandleResult(r => r.StatusCode == HttpStatusCode.ServiceUnavailable)
.HandleResult(r => r.StatusCode == HttpStatusCode.TooManyRequests),
OnRetry = args =>
{
Console.WriteLine($"重试第 {args.AttemptNumber} 次");
return default;
}
});
// 熔断器
builder.AddCircuitBreaker(new HttpCircuitBreakerStrategyOptions
{
SamplingDuration = TimeSpan.FromSeconds(10),
FailureRatio = 0.5,
MinimumThroughput = 10,
BreakDuration = TimeSpan.FromSeconds(30),
ShouldHandle = new PredicateBuilder<HttpResponseMessage>()
.Handle<HttpRequestException>()
.HandleResult(r => !r.IsSuccessStatusCode),
OnOpened = args =>
{
Console.WriteLine($"熔断器打开: {args.BreakDuration}");
return default;
},
OnClosed = args =>
{
Console.WriteLine("熔断器关闭");
return default;
},
OnHalfOpened = args =>
{
Console.WriteLine("熔断器半开");
return default;
}
});
// 单次尝试超时
builder.AddTimeout(new TimeoutStrategyOptions
{
Timeout = TimeSpan.FromSeconds(5),
OnTimeout = args =>
{
Console.WriteLine($"请求超时: {args.Timeout}");
return default;
}
});
});
// 3. 轻量级弹性管道(只需重试)
builder.Services.AddHttpClient<ProductServiceClient>(client =>
{
client.BaseAddress = new Uri("https://product-service:8080");
})
.AddStandardHedgingHandler(); // 自动重试 + 并发请求Polly v8 策略详解
// 重试策略 — 指数退避 + 抖动
var retryPipeline = new ResiliencePipelineBuilder<HttpResponseMessage>()
.AddRetry(new RetryStrategyOptions<HttpResponseMessage>
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromMilliseconds(200),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true, // 防止惊群效应
ShouldHandle = new PredicateBuilder<HttpResponseMessage>()
.Handle<HttpRequestException>()
.Handle<TimeoutRejectedException>()
.HandleResult(r => r.StatusCode == HttpStatusCode.ServiceUnavailable),
OnRetry = args =>
{
// args.Outcome — 本次结果
// args.AttemptNumber — 重试次数
// args.RetryDelay — 下次重试延迟
return default;
}
})
.Build();
// 熔断器 — 三状态模型
// Closed(关闭)→ 正常放行,统计失败率
// Open(打开)→ 直接拒绝,返回错误
// Half-Open(半开)→ 放行少量请求探测
var circuitBreakerPipeline = new ResiliencePipelineBuilder<HttpResponseMessage>()
.AddCircuitBreaker(new CircuitBreakerStrategyOptions<HttpResponseMessage>
{
FailureRatio = 0.5, // 50% 失败率触发熔断
MinimumThroughput = 10, // 最少 10 次请求才开始计算
SamplingDuration = TimeSpan.FromSeconds(10), // 10 秒采样窗口
BreakDuration = TimeSpan.FromSeconds(30), // 熔断 30 秒
ShouldHandle = new PredicateBuilder<HttpResponseMessage>()
.Handle<HttpRequestException>()
.HandleResult(r => r.StatusCode == HttpStatusCode.InternalServerError),
OnOpened = args =>
{
// 熔断打开 → 触发告警
return default;
}
})
.Build();
// 超时策略
var timeoutPipeline = new ResiliencePipelineBuilder<HttpResponseMessage>()
.AddTimeout(new TimeoutStrategyOptions
{
Timeout = TimeSpan.FromSeconds(5),
OnTimeout = args =>
{
// 记录超时日志
return default;
}
})
.Build();
// 舱壁隔离 — 限制并发请求数
var bulkheadPipeline = new ResiliencePipelineBuilder<HttpResponseMessage>()
.AddRateLimiter(new RateLimiterStrategyOptions
{
PermitLimit = 20, // 最大并发 20
QueueLimit = 5, // 排队 5 个
OnRejected = args =>
{
// 被拒绝时的回调
return default;
}
})
.Build();
// 组合策略(推荐顺序)
var combinedPipeline = new ResiliencePipelineBuilder<HttpResponseMessage>()
.AddTotalTimeout(TimeSpan.FromSeconds(30)) // 1. 总超时
.AddRetry(new RetryStrategyOptions<HttpResponseMessage> // 2. 重试
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromMilliseconds(200),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true
})
.AddCircuitBreaker(new CircuitBreakerStrategyOptions<HttpResponseMessage> // 3. 熔断
{
FailureRatio = 0.5,
MinimumThroughput = 8,
SamplingDuration = TimeSpan.FromSeconds(10),
BreakDuration = TimeSpan.FromSeconds(30)
})
.AddTimeout(TimeSpan.FromSeconds(5)) // 4. 单次超时
.Build();
// 使用管道
var response = await combinedPipeline.ExecuteAsync(async ct =>
{
return await httpClient.GetAsync("/api/users", ct);
}, CancellationToken.None);自定义降级策略
Fallback 模式
// 降级策略 — 主服务不可用时返回兜底数据
public class FallbackHandler<T>
{
private readonly ILogger _logger;
private readonly Func<CancellationToken, Task<T?>> _fallbackFactory;
public FallbackHandler(
ILogger logger,
Func<CancellationToken, Task<T?>> fallbackFactory)
{
_logger = logger;
_fallbackFactory = fallbackFactory;
}
public async Task<T> ExecuteWithFallbackAsync(
Func<CancellationToken, Task<T>> primary,
CancellationToken ct = default)
{
try
{
return await primary(ct);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogWarning(ex, "主服务调用失败,启用降级");
var fallback = await _fallbackFactory(ct);
if (fallback != null) return fallback;
throw;
}
}
}
// 缓存降级(推荐)
public class CachedFallbackService
{
private readonly IMemoryCache _cache;
private readonly ILogger<CachedFallbackService> _logger;
public CachedFallbackService(IMemoryCache cache, ILogger<CachedFallbackService> logger)
{
_cache = cache;
_logger = logger;
}
public async Task<T> GetWithFallbackAsync<T>(
string cacheKey,
Func<CancellationToken, Task<T>> fetcher,
TimeSpan? cacheDuration = null,
CancellationToken ct = default) where T : class
{
try
{
var result = await fetcher(ct);
// 成功时更新缓存
_cache.Set(cacheKey, result, cacheDuration ?? TimeSpan.FromMinutes(5));
return result;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "获取 {Key} 失败,尝试缓存降级", cacheKey);
// 从缓存获取兜底数据
if (_cache.TryGetValue(cacheKey, out T? cached))
{
_logger.LogInformation("使用缓存数据: {Key}", cacheKey);
return cached;
}
throw;
}
}
}
// 使用
app.MapGet("/api/products", async (
CachedFallbackService fallback,
CancellationToken ct) =>
{
return await fallback.GetWithFallbackAsync(
"products:all",
fetcher: ct => productServiceClient.GetAllProductsAsync(ct),
cacheDuration: TimeSpan.FromMinutes(10),
ct: ct);
});断路器状态监控
健康检查集成
// 监控熔断器状态
public class CircuitBreakerHealthCheck : IHealthCheck
{
private readonly CircuitBreakerStateProvider _stateProvider;
public CircuitBreakerHealthCheck(CircuitBreakerStateProvider stateProvider)
{
_stateProvider = stateProvider;
}
public Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
var state = _stateProvider.CircuitState;
return Task.FromResult(state switch
{
CircuitState.Closed => HealthCheckResult.Healthy("熔断器关闭"),
CircuitState.HalfOpen => HealthCheckResult.Degraded("熔断器半开"),
CircuitState.Open => HealthCheckResult.Unhealthy("熔断器打开"),
_ => HealthCheckResult.Unhealthy("熔断器状态未知")
});
}
}
// 注册
builder.Services.AddHealthChecks()
.AddCheck<CircuitBreakerHealthCheck>("circuit-breaker-order-service");弹性策略指标收集
/// <summary>
/// 弹性策略指标收集器 — 记录重试、熔断、超时等事件
/// </summary>
public class ResilienceMetrics
{
private readonly ConcurrentDictionary<string, long> _counters = new();
public void RecordRetry(string serviceName)
=> _counters.AddOrUpdate($"retry:{serviceName}", 1, (_, v) => v + 1);
public void RecordCircuitOpen(string serviceName)
=> _counters.AddOrUpdate($"circuit_open:{serviceName}", 1, (_, v) => v + 1);
public void RecordCircuitClose(string serviceName)
=> _counters.AddOrUpdate($"circuit_close:{serviceName}", 1, (_, v) => v + 1);
public void RecordTimeout(string serviceName)
=> _counters.AddOrUpdate($"timeout:{serviceName}", 1, (_, v) => v + 1);
public void RecordFallback(string serviceName)
=> _counters.AddOrUpdate($"fallback:{serviceName}", 1, (_, v) => v + 1);
public Dictionary<string, long> GetSnapshot()
=> new Dictionary<string, long>(_counters);
}
// 集成到 Polly 管道
builder.Services.AddHttpClient<OrderServiceClient>(client =>
{
client.BaseAddress = new Uri("https://order-service:8080");
})
.AddResilienceHandler("monitored-pipeline", pipeline =>
{
var metrics = pipeline.ServiceProvider.GetRequiredService<ResilienceMetrics>();
pipeline.AddRetry(new HttpRetryStrategyOptions
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromMilliseconds(500),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true,
OnRetry = args =>
{
metrics.RecordRetry("order-service");
return default;
}
});
pipeline.AddCircuitBreaker(new HttpCircuitBreakerStrategyOptions
{
FailureRatio = 0.5,
MinimumThroughput = 10,
SamplingDuration = TimeSpan.FromSeconds(10),
BreakDuration = TimeSpan.FromSeconds(30),
OnOpened = args =>
{
metrics.RecordCircuitOpen("order-service");
return default;
},
OnClosed = args =>
{
metrics.RecordCircuitClose("order-service");
return default;
}
});
pipeline.AddTimeout(new TimeoutStrategyOptions
{
Timeout = TimeSpan.FromSeconds(5),
OnTimeout = args =>
{
metrics.RecordTimeout("order-service");
return default;
}
});
});策略配置最佳实践
/// <summary>
/// 不同场景的策略配置模板
/// </summary>
public static class ResilienceTemplates
{
// 关键服务 — 高重试、快速熔断
public static void AddCriticalServiceResilience(
this IHttpClientBuilder builder, string serviceName)
{
builder.AddResilienceHandler(serviceName, pipeline =>
{
pipeline.AddTotalTimeout(TimeSpan.FromSeconds(10));
pipeline.AddRetry(new HttpRetryStrategyOptions
{
MaxRetryAttempts = 5,
Delay = TimeSpan.FromMilliseconds(200),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true
});
pipeline.AddCircuitBreaker(new HttpCircuitBreakerStrategyOptions
{
FailureRatio = 0.3, // 更敏感的熔断阈值
MinimumThroughput = 5,
SamplingDuration = TimeSpan.FromSeconds(5),
BreakDuration = TimeSpan.FromSeconds(15) // 更短的熔断时间
});
pipeline.AddTimeout(TimeSpan.FromSeconds(3));
});
}
// 非关键服务 — 低重试、宽松熔断
public static void AddNonCriticalServiceResilience(
this IHttpClientBuilder builder, string serviceName)
{
builder.AddResilienceHandler(serviceName, pipeline =>
{
pipeline.AddTotalTimeout(TimeSpan.FromSeconds(30));
pipeline.AddRetry(new HttpRetryStrategyOptions
{
MaxRetryAttempts = 2,
Delay = TimeSpan.FromSeconds(1)
});
pipeline.AddCircuitBreaker(new HttpCircuitBreakerStrategyOptions
{
FailureRatio = 0.8, // 更宽松的熔断阈值
MinimumThroughput = 20,
SamplingDuration = TimeSpan.FromSeconds(30),
BreakDuration = TimeSpan.FromMinutes(1)
});
pipeline.AddTimeout(TimeSpan.FromSeconds(10));
});
}
// 只读操作 — 高重试、无需熔断
public static void AddReadOnlyResilience(
this IHttpClientBuilder builder, string serviceName)
{
builder.AddResilienceHandler(serviceName, pipeline =>
{
pipeline.AddTotalTimeout(TimeSpan.FromSeconds(15));
pipeline.AddRetry(new HttpRetryStrategyOptions
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromMilliseconds(300),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true
});
pipeline.AddTimeout(TimeSpan.FromSeconds(5));
});
}
}
// 使用配置模板
builder.Services.AddHttpClient<IPaymentService, PaymentServiceClient>()
.AddCriticalServiceResilience("payment-service");
builder.Services.AddHttpClient<INotificationService, NotificationServiceClient>()
.AddNonCriticalServiceResilience("notification-service");
builder.Services.AddHttpClient<IProductQueryService, ProductQueryServiceClient>()
.AddReadOnlyResilience("product-query-service");熔断器单元测试
[TestFixture]
public class CircuitBreakerTests
{
[Test]
public async Task CircuitBreaker_Opens_AfterThresholdFailures()
{
// 安排:模拟连续失败
var pipeline = new ResiliencePipelineBuilder<HttpResponseMessage>()
.AddCircuitBreaker(new CircuitBreakerStrategyOptions<HttpResponseMessage>
{
FailureRatio = 0.5,
MinimumThroughput = 3,
SamplingDuration = TimeSpan.FromSeconds(10),
BreakDuration = TimeSpan.FromSeconds(5),
ShouldHandle = new PredicateBuilder<HttpResponseMessage>()
.HandleResult(r => !r.IsSuccessStatusCode)
})
.Build();
// 执行:连续发送失败请求
for (int i = 0; i < 5; i++)
{
await pipeline.ExecuteAsync(ct =>
Task.FromResult(new HttpResponseMessage(HttpStatusCode.InternalServerError)));
}
// 断言:熔断器应打开,后续请求直接失败
var ex = Assert.ThrowsAsync<BrokenCircuitException>(async () =>
await pipeline.ExecuteAsync(ct =>
Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK))));
}
[Test]
public async Task Retry_ExponentialBackoff_RetriesCorrectTimes()
{
int attempts = 0;
var delays = new List<TimeSpan>();
var pipeline = new ResiliencePipelineBuilder<HttpResponseMessage>()
.AddRetry(new RetryStrategyOptions<HttpResponseMessage>
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromMilliseconds(100),
BackoffType = DelayBackoffType.Exponential,
UseJitter = false,
ShouldHandle = new PredicateBuilder<HttpResponseMessage>()
.HandleResult(r => !r.IsSuccessStatusCode),
OnRetry = args =>
{
delays.Add(args.RetryDelay);
return default;
}
})
.Build();
// 所有请求都失败
await pipeline.ExecuteAsync(ct =>
{
attempts++;
return Task.FromResult(new HttpResponseMessage(HttpStatusCode.ServiceUnavailable));
});
// 验证重试次数和延迟递增
Assert.AreEqual(4, attempts); // 1 初始 + 3 重试
Assert.AreEqual(3, delays.Count);
Assert.That(delays[1], Is.GreaterThan(delays[0]));
Assert.That(delays[2], Is.GreaterThan(delays[1]));
}
}优点
缺点
总结
Polly v8 是 .NET 8+ 官方推荐的弹性策略库,通过 AddStandardResilienceHandler() 提供开箱即用的标准管道。策略组合顺序:总超时 → 重试 → 熔断器 → 单次超时。重试策略使用指数退避+抖动防止惊群效应。熔断器三状态(Closed/Open/Half-Open)自动恢复。降级策略通过缓存兜底数据提升可用性。建议为每个外部服务调用配置独立的弹性管道,并根据服务特性调整参数。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
适用场景
- 当你准备把《熔断器与弹性策略》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《熔断器与弹性策略》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《熔断器与弹性策略》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《熔断器与弹性策略》最大的收益和代价分别是什么?
