Polly 弹性与重试
大约 11 分钟约 3169 字
Polly 弹性与重试
简介
Polly 是 .NET 平台最流行的弹性和瞬态故障处理库。它提供了重试、熔断、超时、舱壁隔离、缓存、回退等策略,并以流畅的 API 组合使用。适用于调用外部 API、数据库连接、消息队列等可能出现临时故障的场景。
特点
安装与基本概念
# 安装 Polly
dotnet add package Polly
# ASP.NET Core 集成
dotnet add package Microsoft.Extensions.Http.Polly六种策略
1. 重试策略
/// <summary>
/// 重试策略 — 失败后自动重试
/// </summary>
public class RetryExamples
{
// 基本重试
public async Task<string> BasicRetryAsync()
{
var retryPolicy = Policy
.Handle<HttpRequestException>()
.Or<TaskCanceledException>()
.RetryAsync(3); // 最多重试 3 次
return await retryPolicy.ExecuteAsync(async () =>
{
using var client = new HttpClient();
return await client.GetStringAsync("https://api.example.com/data");
});
}
// 指数退避重试
public async Task<string> ExponentialRetryAsync()
{
var retryPolicy = Policy
.Handle<HttpRequestException>()
.WaitAndRetryAsync(
retryCount: 5,
sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
onRetry: (exception, timeSpan, retryCount, context) =>
{
Console.WriteLine($"第 {retryCount} 次重试,等待 {timeSpan.TotalSeconds}s");
});
return await retryPolicy.ExecuteAsync(async () =>
{
using var client = new HttpClient();
return await client.GetStringAsync("https://api.example.com/data");
});
}
// 抖动重试 — 避免惊群
public async Task<string> JitterRetryAsync()
{
var jitterer = new Random();
var retryPolicy = Policy
.Handle<HttpRequestException>()
.WaitAndRetryAsync(5, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)) +
TimeSpan.FromMilliseconds(jitterer.Next(0, 1000)));
return await retryPolicy.ExecuteAsync(async () =>
{
using var client = new HttpClient();
return await client.GetStringAsync("https://api.example.com/data");
});
}
}2. 熔断策略
/// <summary>
/// 熔断器 — 连续失败后停止请求,保护下游
/// </summary>
public class CircuitBreakerExamples
{
// 基本熔断
public async Task<string> BasicCircuitBreakerAsync()
{
var breaker = Policy
.Handle<HttpRequestException>()
.CircuitBreakerAsync(
exceptionsAllowedBeforeBreaking: 3, // 连续 3 次失败
durationOfBreak: TimeSpan.FromSeconds(30), // 熔断 30 秒
onBreak: (ex, breakDelay) =>
{
Console.WriteLine($"熔断器打开,等待 {breakDelay.TotalSeconds}s");
},
onReset: () =>
{
Console.WriteLine("熔断器关闭,恢复正常");
},
onHalfOpen: () =>
{
Console.WriteLine("熔断器半开,允许试探请求");
});
return await breaker.ExecuteAsync(async () =>
{
using var client = new HttpClient();
return await client.GetStringAsync("https://api.example.com/data");
});
}
// 高级熔断 — 按失败率熔断
public async Task AdvancedCircuitBreakerAsync()
{
var breaker = Policy
.Handle<HttpRequestException>()
.AdvancedCircuitBreakerAsync(
failureThreshold: 0.5, // 50% 失败率
samplingDuration: TimeSpan.FromSeconds(10), // 10秒采样
minimumThroughput: 10, // 最少10个请求
durationOfBreak: TimeSpan.FromSeconds(30));
await breaker.ExecuteAsync(async () =>
{
// 业务逻辑
});
}
}3. 超时策略
/// <summary>
/// 超时策略 — 限制执行时间
/// </summary>
public class TimeoutExamples
{
// 乐观超时 — 依赖 CancellationToken
public async Task<string> OptimisticTimeoutAsync()
{
var timeout = Policy.TimeoutAsync<string>(TimeSpan.FromSeconds(5));
return await timeout.ExecuteAsync(async ct =>
{
using var client = new HttpClient();
return await client.GetStringAsync("https://api.example.com/data");
});
}
// 悲观超时 — 强制超时
public async Task<string> PessimisticTimeoutAsync()
{
var timeout = Policy.TimeoutAsync<string>(TimeSpan.FromSeconds(5),
TimeoutStrategy.Pessimistic,
onTimeoutAsync: (context, timeSpan, task) =>
{
Console.WriteLine($"操作超时,已执行 {timeSpan.TotalSeconds}s");
return Task.CompletedTask;
});
return await timeout.ExecuteAsync(async () =>
{
using var client = new HttpClient();
return await client.GetStringAsync("https://api.example.com/data");
});
}
}4. 舱壁隔离
/// <summary>
/// 舱壁隔离 — 限制并发数,防止资源耗尽
/// </summary>
public class BulkheadExamples
{
public async Task BulkheadAsync()
{
var bulkhead = Policy.BulkheadAsync(
maxParallelization: 10, // 最多 10 个并发
maxQueuingActions: 20, // 队列最多 20 个等待
onBulkheadRejectedAsync: context =>
{
Console.WriteLine("请求被舱壁拒绝");
return Task.CompletedTask;
});
await bulkhead.ExecuteAsync(async () =>
{
// 最多 10 个并发执行
});
}
}5. 回退策略
/// <summary>
/// 回退策略 — 失败时返回默认值或降级方案
/// </summary>
public class FallbackExamples
{
// 返回默认值
public async Task<string> FallbackWithDefaultAsync()
{
var fallback = Policy<string>
.Handle<HttpRequestException>()
.FallbackAsync(
fallbackValue: "默认数据",
onFallbackAsync: ex =>
{
Console.WriteLine($"请求失败,返回默认值:{ex.Exception.Message}");
return Task.CompletedTask;
});
return await fallback.ExecuteAsync(async () =>
{
using var client = new HttpClient();
return await client.GetStringAsync("https://api.example.com/data");
});
}
// 降级方案
public async Task<string> FallbackWithAlternativeAsync()
{
var fallback = Policy<string>
.Handle<HttpRequestException>()
.FallbackAsync(
fallbackAction: async ct =>
{
// 从缓存获取
return "来自缓存的数据";
});
return await fallback.ExecuteAsync(async () =>
{
using var client = new HttpClient();
return await client.GetStringAsync("https://api.example.com/data");
});
}
}6. 缓存策略
/// <summary>
/// 缓存策略 — 自动缓存执行结果
/// </summary>
public class CacheExamples
{
public string GetWithCache(string key)
{
var memoryCache = new MemoryCache(new MemoryCacheOptions());
var cacheProvider = new MemoryCacheProvider(memoryCache);
var cache = Policy.Cache<string>(
cacheProvider,
ttl: TimeSpan.FromMinutes(10),
onCacheHit: cacheKey =>
{
Console.WriteLine($"缓存命中:{cacheKey}");
});
return cache.Execute(context =>
{
// 缓存未命中时执行
Console.WriteLine("从数据源获取");
return "数据内容";
}, new Context(key));
}
}策略组合
组合多个策略
/// <summary>
/// 策略组合 — PolicyWrap
/// 执行顺序(从外到内):回退 → 熔断 → 超时 → 重试
/// </summary>
public class CombinedPolicy
{
private readonly IAsyncPolicy<string> _resiliencePolicy;
public CombinedPolicy()
{
// 1. 重试策略(最内层)
var retry = Policy<string>
.Handle<HttpRequestException>()
.WaitAndRetryAsync(3, attempt =>
TimeSpan.FromSeconds(Math.Pow(2, attempt)));
// 2. 超时策略
var timeout = Policy.TimeoutAsync<string>(TimeSpan.FromSeconds(10));
// 3. 熔断策略
var breaker = Policy<string>
.Handle<HttpRequestException>()
.CircuitBreakerAsync(5, TimeSpan.FromSeconds(30));
// 4. 回退策略(最外层)
var fallback = Policy<string>
.Handle<HttpRequestException>()
.Or<TimeoutRejectedException>()
.Or<BrokenCircuitException>()
.FallbackAsync("降级数据");
// 组合策略
_resiliencePolicy = Policy.WrapAsync(fallback, breaker, timeout, retry);
}
public async Task<string> GetDataAsync(string url)
{
return await _resiliencePolicy.ExecuteAsync(async () =>
{
using var client = new HttpClient();
return await client.GetStringAsync(url);
});
}
}HttpClient 集成
ASP.NET Core 中使用
/// <summary>
/// 在 HttpClientFactory 中集成 Polly
/// </summary>
// Program.cs
builder.Services.AddHttpClient("ExternalApi", client =>
{
client.BaseAddress = new Uri("https://api.example.com/");
client.Timeout = TimeSpan.FromSeconds(30);
})
.AddTransientHttpErrorPolicy(builder =>
builder.WaitAndRetryAsync(3, attempt =>
TimeSpan.FromSeconds(Math.Pow(2, attempt))))
.AddTransientHttpErrorPolicy(builder =>
builder.CircuitBreakerAsync(5, TimeSpan.FromSeconds(30)));
// 使用
public class ExternalApiService
{
private readonly HttpClient _client;
public ExternalApiService(IHttpClientFactory factory)
{
_client = factory.CreateClient("ExternalApi");
}
public async Task<string> GetDataAsync()
{
// 自动应用重试和熔断策略
return await _client.GetStringAsync("/api/data");
}
}Polly v8 与 Resilience Framework
.NET 8 新一代弹性框架
/// <summary>
/// Microsoft.Extensions.Resilience — 基于 Polly v8 的新 API
/// dotnet add package Microsoft.Extensions.Resilience
/// </summary>
// Program.cs — 使用新的 Resilience Pipeline
builder.Services.AddHttpClient("ExternalApi", client =>
{
client.BaseAddress = new Uri("https://api.example.com/");
})
.AddStandardResilienceHandler();
// 自定义 Resilience Pipeline
builder.Services.AddResiliencePipeline("my-pipeline", builder =>
{
// 重试
builder.AddRetry(new RetryStrategyOptions<HttpResponseMessage>
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromMilliseconds(500),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true,
ShouldHandle = new PredicateBuilder<HttpResponseMessage>()
.Handle<HttpRequestException>()
.HandleResult(r => !r.IsSuccessStatusCode),
OnRetry = args =>
{
Console.WriteLine($"重试第 {args.AttemptNumber} 次");
return ValueTask.CompletedTask;
}
});
// 熔断
builder.AddCircuitBreaker(new CircuitBreakerStrategyOptions<HttpResponseMessage>
{
FailureRatio = 0.5,
SamplingDuration = TimeSpan.FromSeconds(10),
MinimumThroughput = 10,
BreakDuration = TimeSpan.FromSeconds(30),
OnClosed = args =>
{
Console.WriteLine("熔断器关闭");
return ValueTask.CompletedTask;
},
OnOpened = args =>
{
Console.WriteLine("熔断器打开");
return ValueTask.CompletedTask;
}
});
// 超时
builder.AddTimeout(new TimeoutStrategyOptions
{
Timeout = TimeSpan.FromSeconds(10),
OnTimeout = args =>
{
Console.WriteLine($"请求超时: {args.Timeout}");
return ValueTask.CompletedTask;
}
});
// 限流
builder.AddRateLimiter(new SlidingWindowRateLimiter(new SlidingWindowRateLimiterOptions
{
PermitLimit = 100,
Window = TimeSpan.FromMinutes(1),
SegmentsPerWindow = 10,
QueueLimit = 50
}));
});Polly 策略的单元测试
测试重试和熔断行为
/// <summary>
/// Polly 策略的单元测试
/// </summary>
public class PollyPolicyTests
{
[Fact]
public async Task RetryPolicy_Retries_OnTransientFailure()
{
// Arrange
var callCount = 0;
var policy = Policy
.Handle<HttpRequestException>()
.RetryAsync(3, onRetry: (ex, retryCount) =>
{
callCount++;
});
var expectedException = new HttpRequestException("连接失败");
// Act & Assert
var actualException = await Assert.ThrowsAsync<HttpRequestException>(() =>
policy.ExecuteAsync(async () =>
{
throw expectedException;
}));
Assert.Equal(3, callCount); // 重试 3 次
Assert.Same(expectedException, actualException);
}
[Fact]
public async Task CircuitBreaker_Opens_AfterThresholdFailures()
{
// Arrange
var policy = Policy
.Handle<HttpRequestException>()
.CircuitBreakerAsync(
exceptionsAllowedBeforeBreaking: 3,
durationOfBreak: TimeSpan.FromSeconds(30));
// Act — 触发 3 次失败
for (int i = 0; i < 3; i++)
{
await Assert.ThrowsAsync<HttpRequestException>(() =>
policy.ExecuteAsync(() => throw new HttpRequestException()));
}
// Assert — 熔断器应已打开
var ex = await Assert.ThrowsAsync<IsolatedCircuitException>(() =>
policy.ExecuteAsync(() => Task.CompletedTask));
Assert.IsType<IsolatedCircuitException>(ex);
}
[Fact]
public async Task Fallback_ReturnsDefault_OnFailure()
{
// Arrange
var fallbackValue = "降级数据";
var policy = Policy<string>
.Handle<HttpRequestException>()
.FallbackAsync(fallbackValue);
// Act
var result = await policy.ExecuteAsync(async () =>
{
throw new HttpRequestException("服务不可用");
return ""; // 不会执行到这里
});
// Assert
Assert.Equal(fallbackValue, result);
}
[Fact]
public async Task Timeout_Throws_OnSlowExecution()
{
// Arrange
var policy = Policy.TimeoutAsync(TimeSpan.FromMilliseconds(100));
// Act & Assert
await Assert.ThrowsAsync<TimeoutRejectedException>(() =>
policy.ExecuteAsync(async ct =>
{
await Task.Delay(TimeSpan.FromSeconds(10), ct);
}));
}
[Fact]
public async Task CombinedPolicy_RetriesThenFallback()
{
// Arrange
var retry = Policy<string>
.Handle<HttpRequestException>()
.RetryAsync(2);
var fallback = Policy<string>
.Handle<HttpRequestException>()
.FallbackAsync("默认值");
var wrapped = Policy.WrapAsync(fallback, retry);
// Act
var result = await wrapped.ExecuteAsync(async () =>
{
throw new HttpRequestException("持续失败");
return "";
});
// Assert — 重试 2 次后触发回退
Assert.Equal("默认值", result);
}
}Polly 可观测性
日志与指标集成
/// <summary>
/// Polly 策略的日志与指标
/// </summary>
public class ObservablePolicyFactory
{
private readonly ILogger<ObservablePolicyFactory> _logger;
private readonly Meter _meter;
private readonly Counter<int> _retryCounter;
private readonly Counter<int> _circuitBreakerCounter;
private readonly Histogram<double> _executionDuration;
public ObservablePolicyFactory(ILogger<ObservablePolicyFactory> logger, IMeterFactory meterFactory)
{
_logger = logger;
_meter = meterFactory.Create("MyApp.Resilience");
_retryCounter = _meter.CreateCounter<int>("resilience.retry.count", "次");
_circuitBreakerCounter = _meter.CreateCounter<int>("resilience.circuit_breaker.state_change", "次");
_executionDuration = _meter.CreateHistogram<double>("resilience.execution.duration", "ms");
}
public IAsyncPolicy<HttpResponseMessage> CreateApiPolicy(string serviceName)
{
var retry = Policy<HttpResponseMessage>
.Handle<HttpRequestException>()
.OrResult(r => (int)r.StatusCode >= 500)
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
onRetry: (outcome, timespan, retryCount, context) =>
{
_logger.LogWarning(
"服务 {Service} 第 {Retry} 次重试,等待 {Delay}ms,原因: {Reason}",
serviceName, retryCount, timespan.TotalMilliseconds,
outcome.Exception?.Message ?? outcome.Result.StatusCode.ToString());
_retryCounter.Add(1,
new KeyValuePair<string, object?>("service", serviceName),
new KeyValuePair<string, object?>("attempt", retryCount));
});
var breaker = Policy<HttpResponseMessage>
.Handle<HttpRequestException>()
.OrResult(r => (int)r.StatusCode >= 500)
.CircuitBreakerAsync(
exceptionsAllowedBeforeBreaking: 5,
durationOfBreak: TimeSpan.FromSeconds(30),
onBreak: (outcome, duration) =>
{
_logger.LogError(
"服务 {Service} 熔断器打开,持续 {Duration}s",
serviceName, duration.TotalSeconds);
_circuitBreakerCounter.Add(1,
new KeyValuePair<string, object?>("service", serviceName),
new KeyValuePair<string, object?>("state", "open"));
},
onReset: () =>
{
_logger.LogInformation("服务 {Service} 熔断器关闭", serviceName);
_circuitBreakerCounter.Add(1,
new KeyValuePair<string, object?>("service", serviceName),
new KeyValuePair<string, object?>("state", "closed"));
});
var timeout = Policy.TimeoutAsync<HttpResponseMessage>(TimeSpan.FromSeconds(10));
var fallback = Policy<HttpResponseMessage>
.Handle<HttpRequestException>()
.Or<TimeoutRejectedException>()
.Or<BrokenCircuitException>()
.FallbackAsync(
fallbackAction: async ct =>
{
_logger.LogWarning("服务 {Service} 触发降级", serviceName);
return new HttpResponseMessage(System.Net.HttpStatusCode.ServiceUnavailable);
});
return Policy.WrapAsync(fallback, breaker, timeout, retry);
}
}常见策略配置模板
不同场景的推荐配置
/// <summary>
/// 不同场景的 Polly 策略配置模板
/// </summary>
public static class PolicyTemplates
{
// 1. 外部 API 调用 — 重试 + 熔断 + 超时 + 回退
public static IAsyncPolicy<HttpResponseMessage> ExternalApiPolicy(ILogger logger)
{
var retry = Policy<HttpResponseMessage>
.Handle<HttpRequestException>()
.WaitAndRetryAsync(3, attempt =>
TimeSpan.FromSeconds(Math.Pow(2, attempt)) +
TimeSpan.FromMilliseconds(Random.Shared.Next(0, 1000)),
onRetry: (outcome, delay, attempt, _) =>
logger.LogWarning("外部 API 重试 {Attempt},等待 {Delay}ms", attempt, delay.TotalMilliseconds));
var breaker = Policy<HttpResponseMessage>
.Handle<HttpRequestException>()
.AdvancedCircuitBreakerAsync(0.5, TimeSpan.FromSeconds(10), 10, TimeSpan.FromSeconds(30));
var timeout = Policy.TimeoutAsync<HttpResponseMessage>(TimeSpan.FromSeconds(15));
var fallback = Policy<HttpResponseMessage>
.Handle<Exception>()
.FallbackAsync(new HttpResponseMessage(System.Net.HttpStatusCode.ServiceUnavailable));
return Policy.WrapAsync(fallback, breaker, timeout, retry);
}
// 2. 数据库操作 — 仅重试(连接瞬断)
public static IAsyncPolicy DbPolicy(ILogger logger)
{
return Policy
.Handle<TimeoutException>()
.Or<InvalidOperationException>()
.WaitAndRetryAsync(3, attempt => TimeSpan.FromSeconds(attempt),
onRetry: (ex, delay, attempt, _) =>
logger.LogWarning("数据库重试 {Attempt}: {Message}", attempt, ex.Message));
}
// 3. 消息队列发送 — 重试 + 回退(写入死信队列)
public static IAsyncPolicy MessageQueuePolicy(ILogger logger)
{
var retry = Policy
.Handle<Exception>()
.WaitAndRetryAsync(5, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
onRetry: (ex, delay, attempt, _) =>
logger.LogWarning("消息队列重试 {Attempt}", attempt));
var fallback = Policy
.Handle<Exception>()
.FallbackAsync(async ct =>
{
logger.LogError("消息队列发送失败,写入死信队列");
// 写入死信队列或持久化存储
});
return Policy.WrapAsync(fallback, retry);
}
// 4. 缓存策略 — 先查缓存,未命中再查数据源
public static ISyncPolicy<T> CachePolicy<T>(MemoryCache cache, TimeSpan ttl)
{
var cacheProvider = new MemoryCacheProvider(cache);
return Policy.Cache<T>(cacheProvider, ttl);
}
}策略选择指南
| 故障类型 | 策略 | 说明 |
|---|---|---|
| 网络抖动 | 重试 | 瞬时故障自动恢复 |
| 下游过载 | 熔断 | 保护下游不被淹没 |
| 响应慢 | 超时 | 避免无限等待 |
| 并发过高 | 舱壁 | 限制资源使用 |
| 不可恢复 | 回退 | 优雅降级 |
| 重复查询 | 缓存 | 减少重复请求 |
优点
缺点
总结
Polly 是 .NET 微服务容错的必备工具。重试处理瞬时故障,熔断保护下游服务,超时避免无限等待,回退实现优雅降级。核心原则:外层回退、中层熔断、内层重试。所有外部依赖调用都应配置弹性策略。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
适用场景
- 当你准备把《Polly 弹性与重试》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《Polly 弹性与重试》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《Polly 弹性与重试》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《Polly 弹性与重试》最大的收益和代价分别是什么?
