错误处理与弹性模式
大约 16 分钟约 4912 字
错误处理与弹性模式
简介
在分布式系统和复杂应用中,错误是不可避免的。网络抖动、服务降级、资源耗尽、并发冲突等各种因素都可能导致操作失败。错误处理与弹性模式(Resilience Patterns)提供了一套系统性的策略来应对这些失败场景,确保系统在面对异常时仍能保持可接受的响应能力和数据一致性。
传统的 try-catch 异常处理在面对瞬态故障(Transient Faults)时力不从心——一次偶然的网络超时不应该导致整个操作链失败。弹性模式引入了重试、熔断、舱壁隔离、超时、降级等策略,通过组合使用这些策略构建出健壮的弹性管线(Resilience Pipeline)。
特点
- 声明式弹性策略:以组合子(Combinator)方式组合多种弹性策略
- 瞬态故障处理:自动识别可重试的临时性故障
- 系统性保护:熔断器防止故障扩散,舱壁隔离保护关键资源
- 可观测性:弹性事件可被记录、监控和告警
- 超时控制:确保操作不会无限等待
- 优雅降级:在故障时提供替代响应而非直接失败
实现
Result 类型模式(函数式错误处理)
Result 类型(又称 Either Monad)是函数式编程中的错误处理范式,避免了异常的性能开销和控制流混乱。
基础 Result 类型
// Result 类型定义
public readonly struct Result<T>
{
public bool IsSuccess { get; }
public bool IsFailure => !IsSuccess;
public T Value { get; }
public string Error { get; }
private Result(T value)
{
IsSuccess = true;
Value = value;
Error = default;
}
private Result(string error)
{
IsSuccess = false;
Value = default;
Error = error;
}
public static Result<T> Success(T value) => new(value);
public static Result<T> Failure(string error) => new(error);
public static implicit operator Result<T>(T value) => Success(value);
public static implicit operator Result<T>(string error) => Failure(error);
}
// 非泛型 Result
public readonly struct Result
{
public bool IsSuccess { get; }
public string Error { get; }
private Result(bool isSuccess, string error)
{
IsSuccess = isSuccess;
Error = error;
}
public static Result Success() => new(true, null);
public static Result Failure(string error) => new(false, error);
}Result 类型的链式操作
// Map 和 Bind 操作(函数式组合子)
public static class ResultExtensions
{
// Map:转换成功值
public static Result<TOut> Map<TIn, TOut>(this Result<TIn> result, Func<TIn, TOut> mapper)
{
return result.IsSuccess
? Result<TOut>.Success(mapper(result.Value))
: Result<TOut>.Failure(result.Error);
}
// Bind:链式操作(flatMap)
public static Result<TOut> Bind<TIn, TOut>(this Result<TIn> result,
Func<TIn, Result<TOut>> binder)
{
return result.IsSuccess
? binder(result.Value)
: Result<TOut>.Failure(result.Error);
}
// Match:模式匹配
public static TOut Match<TIn, TOut>(this Result<TIn> result,
Func<TIn, TOut> onSuccess,
Func<string, TOut> onFailure)
{
return result.IsSuccess ? onSuccess(result.Value) : onFailure(result.Error);
}
// Tap:成功时执行副作用
public static Result<T> Tap<T>(this Result<T> result, Action<T> action)
{
if (result.IsSuccess) action(result.Value);
return result;
}
// OnFailure:失败时执行副作用
public static Result<T> OnFailure<T>(this Result<T> result, Action<string> action)
{
if (result.IsFailure) action(result.Error);
return result;
}
// Recover:失败时提供替代值
public static Result<T> Recover<T>(this Result<T> result, Func<string, T> recovery)
{
return result.IsFailure ? Result<T>.Success(recovery(result.Error)) : result;
}
}实际应用
// 使用 Result 类型构建业务流程
public class OrderService
{
private readonly IProductRepository _productRepo;
private readonly ICustomerRepository _customerRepo;
private readonly IPaymentService _paymentService;
public Result<Order> CreateOrder(CreateOrderRequest request)
{
return ValidateRequest(request)
.Bind(req => FindCustomer(req.CustomerId))
.Bind(customer => FindProducts(request.ProductIds)
.Map(products => (customer, products)))
.Bind(pair => CalculateTotal(pair.products)
.Map(total => (pair.customer, pair.products, total)))
.Bind(tuple => ProcessPayment(tuple.customer, tuple.total)
.Map(paymentId => new Order(
tuple.customer.Id,
tuple.products,
tuple.total,
paymentId)));
}
private Result<CreateOrderRequest> ValidateRequest(CreateOrderRequest request)
{
if (request == null)
return Result<CreateOrderRequest>.Failure("请求不能为空");
if (request.CustomerId <= 0)
return Result<CreateOrderRequest>.Failure("客户 ID 无效");
if (request.ProductIds == null || !request.ProductIds.Any())
return Result<CreateOrderRequest>.Failure("必须选择至少一个产品");
return Result<CreateOrderRequest>.Success(request);
}
private Result<Customer> FindCustomer(int customerId)
{
var customer = _customerRepo.FindById(customerId);
return customer != null
? Result<Customer>.Success(customer)
: Result<Customer>.Failure($"客户 {customerId} 不存在");
}
private Result<List<Product>> FindProducts(List<int> productIds)
{
var products = _productRepo.FindByIds(productIds);
if (products.Count != productIds.Count)
{
var missing = productIds.Except(products.Select(p => p.Id));
return Result<List<Product>>.Failure($"产品不存在: {string.Join(",", missing)}");
}
return Result<List<Product>>.Success(products);
}
private Result<decimal> CalculateTotal(List<Product> products)
{
decimal total = products.Sum(p => p.Price);
if (total <= 0)
return Result<decimal>.Failure("订单金额必须大于零");
return Result<decimal>.Success(total);
}
private Result<string> ProcessPayment(Customer customer, decimal amount)
{
try
{
var paymentId = _paymentService.Charge(customer.PaymentMethod, amount);
return Result<string>.Success(paymentId);
}
catch (PaymentException ex)
{
return Result<string>.Failure($"支付失败: {ex.Message}");
}
}
}
// 调用端
var result = orderService.CreateOrder(request);
result.Match(
onSuccess: order => Console.WriteLine($"订单创建成功: {order.Id}"),
onFailure: error => Console.WriteLine($"订单创建失败: {error}")
);错误码 vs 异常
// 错误码模式(适用于跨服务边界)
public enum ErrorCode
{
None = 0,
NotFound = 1001,
ValidationFailed = 1002,
Unauthorized = 1003,
Conflict = 1004,
Timeout = 2001,
ServiceUnavailable = 2002,
RateLimited = 2003,
InternalError = 9999
}
// 结构化错误响应
public class ServiceError
{
public ErrorCode Code { get; init; }
public string Message { get; init; }
public string Detail { get; init; }
public Dictionary<string, string> Fields { get; init; }
public string TraceId { get; init; }
public DateTime Timestamp { get; init; } = DateTime.UtcNow;
public static ServiceError NotFound(string resource, object id) => new()
{
Code = ErrorCode.NotFound,
Message = $"{resource} ({id}) 不存在"
};
public static ServiceError Validation(string field, string reason) => new()
{
Code = ErrorCode.ValidationFailed,
Message = "数据验证失败",
Fields = new Dictionary<string, string> { { field, reason } }
};
}
// 混合策略:内部使用异常,跨边界使用错误码
public class ErrorTranslationMiddleware
{
public ServiceResponse<T> Execute<T>(Func<T> action)
{
try
{
var result = action();
return ServiceResponse<T>.Success(result);
}
catch (NotFoundException ex)
{
return ServiceResponse<T>.Fail(ErrorCode.NotFound, ex.Message);
}
catch (ValidationException ex)
{
return ServiceResponse<T>.Fail(ErrorCode.ValidationFailed, ex.Message);
}
catch (TimeoutException ex)
{
return ServiceResponse<T>.Fail(ErrorCode.Timeout, ex.Message);
}
catch (Exception ex)
{
return ServiceResponse<T>.Fail(ErrorCode.InternalError, "内部错误");
}
}
}
public class ServiceResponse<T>
{
public bool Success { get; init; }
public T Data { get; init; }
public ServiceError Error { get; init; }
public static ServiceResponse<T> Success(T data) => new()
{
Success = true, Data = data
};
public static ServiceResponse<T> Fail(ErrorCode code, string message) => new()
{
Success = false,
Error = new ServiceError { Code = code, Message = message }
};
}全局异常处理
// WPF 全局异常处理
public class GlobalExceptionHandler
{
private readonly ILogger _logger;
private readonly IDialogService _dialogService;
public GlobalExceptionHandler(ILogger logger, IDialogService dialogService)
{
_logger = logger;
_dialogService = dialogService;
}
public void Register()
{
// UI 线程未处理异常
Application.Current.DispatcherUnhandledException += OnDispatcherUnhandledException;
// 非UI线程未处理异常
AppDomain.CurrentDomain.UnhandledException += OnDomainUnhandledException;
// Task 未观察到的异常
TaskScheduler.UnobservedTaskException += OnUnobservedTaskException;
}
private void OnDispatcherUnhandledException(object sender, DispatcherUnhandledExceptionEventArgs e)
{
_logger.LogError(e.Exception, "UI 线程未处理异常");
// 标记为已处理,防止应用崩溃
e.Handled = true;
// 用户友好的错误提示
_dialogService.ShowNotification("操作异常", GetUserFriendlyMessage(e.Exception));
}
private void OnDomainUnhandledException(object sender, UnhandledExceptionEventArgs e)
{
var exception = e.ExceptionObject as Exception;
_logger.LogCritical(exception, "非 UI 线程未处理异常,应用即将终止");
// 尝试保存用户数据
EmergencySave();
}
private void OnUnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e)
{
_logger.LogError(e.Exception, "Task 未观察到的异常");
e.SetObserved(); // 防止进程终止
}
private string GetUserFriendlyMessage(Exception ex)
{
return ex switch
{
TimeoutException => "操作超时,请检查网络连接后重试",
UnauthorizedAccessException => "您没有执行此操作的权限",
InvalidOperationException => "操作无效,请检查输入数据",
_ => "发生了意外错误,请稍后重试"
};
}
private void EmergencySave()
{
try
{
// 紧急保存逻辑
}
catch { /* 最后的保存也失败了,无能为力 */ }
}
}重试策略
固定间隔重试
public class FixedIntervalRetryPolicy
{
private readonly int _maxRetries;
private readonly TimeSpan _interval;
private readonly ILogger _logger;
public FixedIntervalRetryPolicy(int maxRetries, TimeSpan interval, ILogger logger)
{
_maxRetries = maxRetries;
_interval = interval;
_logger = logger;
}
public async Task<T> ExecuteAsync<T>(Func<Task<T>> action,
Func<Exception, bool> shouldRetry = null)
{
shouldRetry ??= _ => true;
int attempt = 0;
while (true)
{
try
{
attempt++;
return await action();
}
catch (Exception ex) when (attempt < _maxRetries && shouldRetry(ex))
{
_logger.LogWarning(ex,
$"第 {attempt} 次重试,{_interval.TotalSeconds}秒后重试");
await Task.Delay(_interval);
}
}
}
}
// 使用示例
var retryPolicy = new FixedIntervalRetryPolicy(3, TimeSpan.FromSeconds(2), logger);
var result = await retryPolicy.ExecuteAsync(
() => httpClient.GetFromJsonAsync<Data>("/api/data"),
ex => ex is HttpRequestException || ex is TimeoutException
);指数退避重试
public class ExponentialBackoffRetryPolicy
{
private readonly int _maxRetries;
private readonly TimeSpan _baseDelay;
private readonly TimeSpan _maxDelay;
private readonly ILogger _logger;
public ExponentialBackoffRetryPolicy(
int maxRetries,
TimeSpan baseDelay,
TimeSpan maxDelay,
ILogger logger)
{
_maxRetries = maxRetries;
_baseDelay = baseDelay;
_maxDelay = maxDelay;
_logger = logger;
}
public async Task<T> ExecuteAsync<T>(Func<Task<T>> action,
Func<Exception, bool> shouldRetry = null)
{
shouldRetry ??= _ => true;
int attempt = 0;
while (true)
{
try
{
attempt++;
return await action();
}
catch (Exception ex) when (attempt < _maxRetries && shouldRetry(ex))
{
// 指数退避:delay = baseDelay * 2^(attempt-1)
var delay = TimeSpan.FromTicks(
Math.Min(
_baseDelay.Ticks * (1L << (attempt - 1)),
_maxDelay.Ticks));
_logger.LogWarning(ex,
$"第 {attempt} 次重试,{delay.TotalSeconds:F1}秒后重试");
await Task.Delay(delay);
}
}
}
}
// 使用
var policy = new ExponentialBackoffRetryPolicy(
maxRetries: 5,
baseDelay: TimeSpan.FromSeconds(1),
maxDelay: TimeSpan.FromSeconds(30),
logger);带抖动的指数退避
public class JitteredBackoffRetryPolicy
{
private readonly Random _random = new();
private readonly int _maxRetries;
private readonly TimeSpan _baseDelay;
private readonly TimeSpan _maxDelay;
public JitteredBackoffRetryPolicy(int maxRetries, TimeSpan baseDelay, TimeSpan maxDelay)
{
_maxRetries = maxRetries;
_baseDelay = baseDelay;
_maxDelay = maxDelay;
}
public async Task<T> ExecuteAsync<T>(Func<Task<T>> action,
Func<Exception, bool> shouldRetry = null)
{
shouldRetry ??= _ => true;
int attempt = 0;
while (true)
{
try
{
attempt++;
return await action();
}
catch (Exception ex) when (attempt < _maxRetries && shouldRetry(ex))
{
// 指数退避 + 随机抖动
// 公式: min(maxDelay, baseDelay * 2^attempt + random(0, baseDelay))
var exponentialDelay = _baseDelay.Ticks * (1L << attempt);
var jitter = _random.NextInt64(0, _baseDelay.Ticks);
var delay = TimeSpan.FromTicks(
Math.Min(exponentialDelay + jitter, _maxDelay.Ticks));
await Task.Delay(delay);
}
}
}
}熔断器模式(Circuit Breaker)
// 熔断器状态
public enum CircuitState
{
Closed, // 正常:允许请求通过
Open, // 熔断:拒绝所有请求
HalfOpen // 半开:允许少量请求试探
}
// 熔断器实现
public class CircuitBreaker
{
private readonly object _lock = new();
private CircuitState _state = CircuitState.Closed;
private int _failureCount;
private DateTime _lastFailureTime;
private readonly int _failureThreshold;
private readonly TimeSpan _resetTimeout;
private readonly TimeSpan _halfOpenMaxDuration;
private readonly ILogger _logger;
public CircuitState State => _state;
public event Action<CircuitState> StateChanged;
public CircuitBreaker(
int failureThreshold,
TimeSpan resetTimeout,
ILogger logger)
{
_failureThreshold = failureThreshold;
_resetTimeout = resetTimeout;
_logger = logger;
}
public async Task<T> ExecuteAsync<T>(Func<Task<T>> action)
{
// 检查熔断器状态
if (!CanExecute())
{
throw new CircuitBreakerOpenException(
$"熔断器已打开,{_lastFailureTime + _resetTimeout - DateTime.UtcNow} 后重试");
}
try
{
var result = await action();
OnSuccess();
return result;
}
catch (Exception ex)
{
OnFailure();
throw;
}
}
private bool CanExecute()
{
lock (_lock)
{
switch (_state)
{
case CircuitState.Closed:
return true;
case CircuitState.Open:
// 检查是否超过冷却时间
if (DateTime.UtcNow - _lastFailureTime >= _resetTimeout)
{
TransitionTo(CircuitState.HalfOpen);
return true;
}
return false;
case CircuitState.HalfOpen:
return true;
default:
return false;
}
}
}
private void OnSuccess()
{
lock (_lock)
{
if (_state == CircuitState.HalfOpen)
{
// 试探成功,关闭熔断器
_failureCount = 0;
TransitionTo(CircuitState.Closed);
_logger.LogInformation("熔断器已关闭,服务恢复正常");
}
}
}
private void OnFailure()
{
lock (_lock)
{
_failureCount++;
_lastFailureTime = DateTime.UtcNow;
if (_state == CircuitState.HalfOpen)
{
// 试探失败,重新打开熔断器
TransitionTo(CircuitState.Open);
_logger.LogWarning("熔断器试探失败,重新打开");
}
else if (_failureCount >= _failureThreshold)
{
// 失败次数达到阈值,打开熔断器
TransitionTo(CircuitState.Open);
_logger.LogWarning(
$"熔断器已打开,连续失败 {_failureCount} 次");
}
}
}
private void TransitionTo(CircuitState newState)
{
var oldState = _state;
_state = newState;
_logger.LogInformation($"熔断器状态变更: {oldState} -> {newState}");
StateChanged?.Invoke(newState);
}
}
public class CircuitBreakerOpenException : Exception
{
public CircuitBreakerOpenException(string message) : base(message) { }
}舱壁隔离模式(Bulkhead)
// 舱壁隔离:限制并发调用数,防止资源耗尽
public class BulkheadPolicy
{
private readonly SemaphoreSlim _semaphore;
private readonly int _maxConcurrency;
private readonly int _maxQueue;
private int _currentActive;
private int _currentQueued;
public int AvailableConcurrency => _maxConcurrency - _currentActive;
public int CurrentActive => _currentActive;
public int CurrentQueued => _currentQueued;
public BulkheadPolicy(int maxConcurrency, int maxQueue = 0)
{
_maxConcurrency = maxConcurrency;
_maxQueue = maxQueue;
_semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
}
public async Task<T> ExecuteAsync<T>(Func<Task<T>> action,
CancellationToken cancellationToken = default)
{
// 尝试获取执行槽
bool acquired = await _semaphore.WaitAsync(
maxQueue > 0 ? TimeSpan.FromSeconds(30) : TimeSpan.Zero,
cancellationToken);
if (!acquired)
{
throw new BulkheadRejectedException(
$"舱壁已满,最大并发: {_maxConcurrency},排队: {_maxQueue}");
}
Interlocked.Increment(ref _currentActive);
try
{
return await action();
}
finally
{
Interlocked.Decrement(ref _currentActive);
_semaphore.Release();
}
}
}
public class BulkheadRejectedException : Exception
{
public BulkheadRejectedException(string message) : base(message) { }
}
// 使用示例:为不同服务设置独立的舱壁
public class ServiceBulkheadManager
{
private readonly Dictionary<string, BulkheadPolicy> _bulkheads = new();
public void Register(string serviceName, int maxConcurrency, int maxQueue = 10)
{
_bulkheads[serviceName] = new BulkheadPolicy(maxConcurrency, maxQueue);
}
public async Task<T> ExecuteAsync<T>(string serviceName, Func<Task<T>> action)
{
if (_bulkheads.TryGetValue(serviceName, out var bulkhead))
{
return await bulkhead.ExecuteAsync(action);
}
// 未注册的服务直接执行
return await action();
}
}超时模式
// 超时策略
public class TimeoutPolicy
{
private readonly TimeSpan _timeout;
public TimeoutPolicy(TimeSpan timeout)
{
_timeout = timeout;
}
public async Task<T> ExecuteAsync<T>(Func<Task<T>> action,
CancellationToken externalToken = default)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(externalToken);
cts.CancelAfter(_timeout);
try
{
return await action(cts.Token);
}
catch (OperationCanceledException) when (!externalToken.IsCancellationRequested)
{
throw new TimeoutException(
$"操作在 {_timeout.TotalSeconds} 秒后超时");
}
}
// 带超时的降级版本
public async Task<T> ExecuteAsync<T>(Func<CancellationToken, Task<T>> action,
Func<Task<T>> fallback,
CancellationToken externalToken = default)
{
try
{
return await ExecuteAsync(action, externalToken);
}
catch (TimeoutException)
{
return await fallback();
}
}
}降级策略(Fallback)
// 降级策略
public class FallbackPolicy<T>
{
private readonly Func<CancellationToken, Task<T>> _primaryAction;
private readonly List<Func<Exception, CancellationToken, Task<T>>> _fallbacks;
private readonly ILogger _logger;
public FallbackPolicy(
Func<CancellationToken, Task<T>> primaryAction,
ILogger logger)
{
_primaryAction = primaryAction;
_fallbacks = new List<Func<Exception, CancellationToken, Task<T>>>();
_logger = logger;
}
public FallbackPolicy<T> WithFallback(Func<Exception, CancellationToken, Task<T>> fallback)
{
_fallbacks.Add(fallback);
return this;
}
public async Task<T> ExecuteAsync(CancellationToken cancellationToken = default)
{
Exception lastException = null;
try
{
return await _primaryAction(cancellationToken);
}
catch (Exception ex)
{
lastException = ex;
_logger.LogWarning(ex, "主操作失败,尝试降级");
}
// 尝试降级方案
foreach (var fallback in _fallbacks)
{
try
{
return await fallback(lastException, cancellationToken);
}
catch (Exception ex)
{
lastException = ex;
_logger.LogWarning(ex, "降级方案失败,尝试下一个");
}
}
throw lastException;
}
}
// 使用示例
var result = await new FallbackPolicy<Product>(async ct =>
{
// 主方案:从远程服务获取
return await productService.GetRemoteAsync(productId, ct);
}, logger)
.WithFallback(async (ex, ct) =>
{
// 降级方案1:从本地缓存获取
return await cacheService.GetAsync<Product>($"product:{productId}");
})
.WithFallback(async (ex, ct) =>
{
// 降级方案2:返回默认值
return Product.GetDefault(productId);
})
.ExecuteAsync();弹性管线组合(Resilience Pipeline)
// 弹性管线:组合多种弹性策略
public class ResiliencePipeline
{
private readonly List<IResilienceStrategy> _strategies = new();
public ResiliencePipeline AddStrategy(IResilienceStrategy strategy)
{
_strategies.Add(strategy);
return this;
}
public async Task<T> ExecuteAsync<T>(Func<CancellationToken, Task<T>> action,
CancellationToken cancellationToken = default)
{
// 从外到内包裹执行
Func<CancellationToken, Task<T>> pipeline = action;
// 反向包裹,使第一个添加的策略在最外层
for (int i = _strategies.Count - 1; i >= 0; i--)
{
var current = pipeline;
var strategy = _strategies[i];
pipeline = ct => strategy.ExecuteAsync(() => current(ct), ct);
}
return await pipeline(cancellationToken);
}
}
// 策略接口
public interface IResilienceStrategy
{
Task<T> ExecuteAsync<T>(Func<Task<T>> action, CancellationToken cancellationToken);
}
// 超时策略实现
public class TimeoutStrategy : IResilienceStrategy
{
private readonly TimeSpan _timeout;
public TimeoutStrategy(TimeSpan timeout) => _timeout = timeout;
public async Task<T> ExecuteAsync<T>(Func<Task<T>> action,
CancellationToken cancellationToken)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(_timeout);
return await action();
}
}
// 重试策略实现
public class RetryStrategy : IResilienceStrategy
{
private readonly int _maxRetries;
private readonly TimeSpan _baseDelay;
public RetryStrategy(int maxRetries, TimeSpan baseDelay)
{
_maxRetries = maxRetries;
_baseDelay = baseDelay;
}
public async Task<T> ExecuteAsync<T>(Func<Task<T>> action,
CancellationToken cancellationToken)
{
for (int attempt = 0; attempt <= _maxRetries; attempt++)
{
try
{
return await action();
}
catch (Exception ex) when (attempt < _maxRetries)
{
var delay = _baseDelay * Math.Pow(2, attempt);
await Task.Delay(delay, cancellationToken);
}
}
throw new InvalidOperationException("不应到达此处");
}
}
// 构建和使用弹性管线
var pipeline = new ResiliencePipeline()
.AddStrategy(new TimeoutStrategy(TimeSpan.FromSeconds(30)))
.AddStrategy(new RetryStrategy(3, TimeSpan.FromSeconds(1)));
var result = await pipeline.ExecuteAsync(async ct =>
await httpClient.GetFromJsonAsync<Data>("/api/data", ct));.NET Polly 集成
Polly 是 .NET 生态中最成熟的弹性库。
基础重试
// 安装:Install-Package Polly
using Polly;
using Polly.Retry;
// 基本重试策略
var retryPolicy = Policy
.Handle<HttpRequestException>()
.Or<TimeoutException>()
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
onRetry: (exception, delay, retryCount, context) =>
{
Console.WriteLine($"第 {retryCount} 次重试,{delay.TotalSeconds}s 后重试");
});
// 执行
var result = await retryPolicy.ExecuteAsync(
async () => await httpClient.GetStringAsync("https://api.example.com/data"));熔断器
// Polly 熔断器
var circuitBreakerPolicy = Policy
.Handle<HttpRequestException>()
.CircuitBreakerAsync(
exceptionsAllowedBeforeBreaking: 5,
durationOfBreak: TimeSpan.FromSeconds(30),
onBreak: (exception, duration) =>
{
Console.WriteLine($"熔断器打开: {exception.Message}");
},
onReset: () =>
{
Console.WriteLine("熔断器关闭");
},
onHalfOpen: () =>
{
Console.WriteLine("熔断器半开");
});舱壁隔离
// Polly 舱壁隔离
var bulkheadPolicy = Policy.BulkheadAsync(
maxParallelization: 10,
maxQueuedActions: 20,
onBulkheadRejectedAsync: context =>
{
Console.WriteLine("请求被舱壁隔离拒绝");
return Task.CompletedTask;
});超时
// Polly 超时
var timeoutPolicy = Policy.TimeoutAsync(
TimeSpan.FromSeconds(10),
onTimeoutAsync: (context, timespan, task) =>
{
Console.WriteLine($"操作超时 ({timespan.TotalSeconds}s)");
return Task.CompletedTask;
});降级
// Polly 降级
var fallbackPolicy = Policy<string>
.Handle<HttpRequestException>()
.FallbackAsync(
fallbackAction: async ct =>
{
// 从缓存读取
return await cache.GetStringAsync("fallback_data") ?? "默认数据";
},
onFallbackAsync: async ex =>
{
Console.WriteLine($"降级: {ex.Exception.Message}");
await Task.CompletedTask;
});组合策略(PolicyWrap)
// 组合所有策略:降级 -> 舱壁 -> 熔断 -> 重试 -> 超时
var resiliencePipeline = Policy.WrapAsync(
fallbackPolicy,
bulkheadPolicy,
circuitBreakerPolicy,
retryPolicy,
timeoutPolicy);
// 统一执行
try
{
var result = await resiliencePipeline.ExecuteAsync(
async ct => await httpClient.GetStringAsync("https://api.example.com/data"),
CancellationToken.None);
}
catch (Exception ex)
{
Console.WriteLine($"所有策略耗尽: {ex.Message}");
}Polly + HttpClient 集成
// 使用 Polly 与 HttpClientFactory 集成
public static class HttpClientExtensions
{
public static IServiceCollection AddResilientHttpClient(
this IServiceCollection services)
{
var retryPolicy = HttpPolicyExtensions
.HandleTransientHttpError()
.WaitAndRetryAsync(3,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
var circuitBreakerPolicy = HttpPolicyExtensions
.HandleTransientHttpError()
.CircuitBreakerAsync(5, TimeSpan.FromSeconds(30));
var timeoutPolicy = Policy.TimeoutAsync<HttpResponseMessage>(10);
services.AddHttpClient("ResilientApi")
.AddPolicyHandler(retryPolicy)
.AddPolicyHandler(circuitBreakerPolicy)
.AddPolicyHandler(timeoutPolicy);
return services;
}
}错误传播策略
// 错误传播与包装
public class ErrorPropagationService
{
private readonly ILogger _logger;
// 异常链保留
public void ExecuteWithPropagation(Action action)
{
try
{
action();
}
catch (Exception ex)
{
// 保留原始异常堆栈
throw new ApplicationException("操作执行失败", ex);
}
}
// 异常转换(跨层时隐藏实现细节)
public T ExecuteWithTranslation<T>(Func<T> action)
{
try
{
return action();
}
catch (SqlException ex)
{
throw new DataAccessException("数据库操作失败", ex);
}
catch (RedisException ex)
{
throw new CacheException("缓存操作失败", ex);
}
catch (HttpRequestException ex)
{
throw new ServiceInvocationException("远程服务调用失败", ex);
}
}
}
// 自定义异常层次结构
public abstract class AppException : Exception
{
public string ErrorCode { get; }
public bool IsTransient { get; }
protected AppException(string message, string errorCode, bool isTransient = false)
: base(message)
{
ErrorCode = errorCode;
IsTransient = isTransient;
}
protected AppException(string message, Exception inner, string errorCode, bool isTransient = false)
: base(message, inner)
{
ErrorCode = errorCode;
IsTransient = isTransient;
}
}
public class DataAccessException : AppException
{
public DataAccessException(string message, Exception inner = null)
: base(message, inner, "DATA_001") { }
}
public class ServiceInvocationException : AppException
{
public ServiceInvocationException(string message, Exception inner = null)
: base(message, inner, "SVC_001", isTransient: true) { }
}
public class CacheException : AppException
{
public CacheException(string message, Exception inner = null)
: base(message, inner, "CACHE_001", isTransient: true) { }
}优点
- 系统稳定性:通过弹性策略大幅提升系统在故障场景下的可用性
- 用户体验:降级和重试策略让用户感知到的错误更少
- 故障隔离:舱壁和熔断器防止单点故障扩散为全局故障
- 可观测性:弹性事件提供系统健康状况的关键指标
- 可测试性:Result 模式让错误处理逻辑可单元测试
缺点
- 复杂性增加:弹性策略的组合增加了代码和理解成本
- 延迟增大:重试策略会增加用户感知到的响应时间
- 副作用:重试可能导致非幂等操作重复执行
- 调试困难:多层包装使问题定位更加困难
- 配置敏感:错误的参数(如阈值、超时)可能导致策略失效
性能注意事项
// 避免对不可重试的错误执行重试
var policy = Policy
.Handle<HttpRequestException>()
.Or<TimeoutException>()
.OrResult<HttpResponseMessage>(r =>
(int)r.StatusCode >= 500) // 只重试 5xx 错误
.WaitAndRetryAsync(3, attempt =>
TimeSpan.FromSeconds(Math.Pow(2, attempt)));
// 注意:4xx 客户端错误不应重试总结
错误处理与弹性模式是构建可靠系统的基石。Result 类型提供了函数式的错误处理范式,异常机制提供了应急错误传播路径,而弹性策略(重试、熔断、舱壁、超时、降级)则提供了系统级的故障防护能力。Polly 库为 .NET 开发者提供了生产级的弹性策略实现。关键是在架构设计阶段就规划好错误处理策略,而不是在出现问题时临时补救。
关键知识点
- 瞬态故障:临时性的网络/服务故障,通常可通过重试解决
- 幂等性:重试安全性要求操作幂等,重复执行不产生副作用
- 熔断器三态:Closed -> Open -> HalfOpen -> Closed 循环
- 指数退避 + 抖动:避免重试风暴(Thundering Herd)
- 舱壁隔离:限制资源使用防止级联故障
- 策略组合顺序:从外到内通常为 降级 -> 舱壁 -> 熔断 -> 重试 -> 超时
常见误区
- 对所有异常重试:业务异常(如验证失败)不应重试
- 重试无退避:固定间隔重试可能加重服务负担
- 忽略重试的幂等性:POST 请求重试可能导致重复操作
- 熔断器阈值设置不当:过低容易误触发,过高起不到保护作用
- 忽略超时:没有超时控制的操作可能永久阻塞
- 降级返回 null:降级应返回有意义的默认值而非 null
进阶路线
- 可观测性集成:将弹性事件接入 OpenTelemetry/Jaeger
- 自适应策略:基于历史数据动态调整重试和熔断参数
- 服务网格:Istio/Linkerd 提供的 Sidecar 级别弹性策略
- 混沌工程:Chaos Monkey 主动注入故障验证弹性策略
- Rate Limiting:限流策略与弹性策略的配合
- Health Check:健康检查驱动的流量切换
适用场景
- 微服务间调用:网络不稳定、服务可能降级
- 数据库访问:连接超时、死锁、主从切换
- 外部 API 调用:第三方服务的可用性不可控
- 文件/IO 操作:磁盘繁忙、网络存储断连
- 消息队列消费:处理失败的消息重试
落地建议
- 分层处理:基础设施层用弹性策略,业务层用 Result 类型
- 统一封装:通过 HttpClientFactory 或服务代理统一应用弹性策略
- 配置外部化:重试次数、超时时间等通过配置文件管理
- 监控告警:熔断器状态变更和重试次数超过阈值时告警
- 文档化:记录每个服务的弹性策略配置和预期行为
- 压力测试:验证弹性策略在极端负载下的表现
排错清单
复盘问题
- 你的系统中有哪些故障场景没有弹性保护?
- 重试策略是否考虑了被调用方的承受能力?
- 熔断器打开后用户的体验是什么?有没有降级方案?
- 你如何测试弹性策略是否正常工作?
- 弹性策略的参数上次调整是什么时候?依据是什么?
- 有没有因为弹性策略导致问题更严重的案例?
