企业集成模式与第三方 API 对接
企业集成模式与第三方 API 对接
简介
在现代分布式系统中,应用很少孤立运行。几乎每一个生产级 ASP.NET Core 应用都需要与外部系统进行交互——无论是调用第三方支付接口、对接短信/邮件服务商、集成 OAuth 认证,还是通过 Webhook 接收外部事件通知。企业集成模式(Enterprise Integration Patterns, EIP)为我们提供了一套经过验证的设计模式集合,帮助开发者构建健壮、可维护、可观测的外部集成方案。
第三方 API 对接的核心挑战在于:你无法控制外部系统的行为。网络可能超时、对方服务可能宕机、接口可能变更、速率限制可能被触发。因此,一个成熟的集成方案必须在可靠性、可观测性、可恢复性三个维度上做足准备。
本文将从实战角度出发,系统讲解 ASP.NET Core 环境下与第三方 API 对接的各项模式与技术。
特点
- 防御性编程:所有外部调用都假设可能失败,提前设计降级和恢复策略
- 幂等性保障:确保重复请求不会产生副作用
- 可观测性优先:每次外部调用都有完整的日志、指标和追踪
- 契约驱动:通过契约测试而非仅靠文档来验证集成正确性
- 弹性设计:熔断、限流、重试三件套缺一不可
核心模式与实现
一、HttpClient 工厂化与生命周期管理
在 ASP.NET Core 中,所有外部 HTTP 调用都应该通过 IHttpClientFactory 来管理。
// ============ 基础注册方式 ============
// Program.cs
builder.Services.AddHttpClient("PaymentService", client =>
{
client.BaseAddress = new Uri("https://api.payment-provider.com/v2/");
client.DefaultRequestHeaders.Add("User-Agent", "MyApp/1.0");
client.Timeout = TimeSpan.FromSeconds(30);
});
// ============ 类型化客户端(推荐) ============
public class PaymentServiceClient
{
private readonly HttpClient _httpClient;
private readonly ILogger<PaymentServiceClient> _logger;
public PaymentServiceClient(HttpClient httpClient, ILogger<PaymentServiceClient> logger)
{
_httpClient = httpClient;
_logger = logger;
}
public async Task<PaymentResponse> CreatePaymentAsync(PaymentRequest request, CancellationToken ct = default)
{
// 使用请求级超时,而非客户端级超时
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(TimeSpan.FromSeconds(15));
var json = JsonSerializer.Serialize(request);
using var content = new StringContent(json, Encoding.UTF8, "application/json");
var response = await _httpClient.PostAsync("payments", content, cts.Token);
response.EnsureSuccessStatusCode();
var responseBody = await response.Content.ReadAsStringAsync(cts.Token);
return JsonSerializer.Deserialize<PaymentResponse>(responseBody)
?? throw new InvalidOperationException("反序列化响应失败");
}
}
// 注册类型化客户端
builder.Services.AddHttpClient<PaymentServiceClient>(client =>
{
client.BaseAddress = new Uri("https://api.payment-provider.com/v2/");
});注意:永远不要在 Controller 或 Service 中直接
new HttpClient()。IHttpClientFactory通过HttpMessageHandler池化来避免端口耗尽问题。
二、幂等性设计
幂等性是分布式系统的基石。对于第三方 API 调用,每次请求都应携带幂等键(Idempotency Key)。
// ============ 幂等键生成与管理 ============
public static class IdempotencyKeyGenerator
{
/// <summary>
/// 基于业务参数生成确定性幂等键
/// </summary>
public static string GenerateFromBusinessParams(params object[] parts)
{
var raw = string.Join("|", parts.Select(p => p?.ToString() ?? "null"));
using var sha256 = SHA256.Create();
var hash = sha256.ComputeHash(Encoding.UTF8.GetBytes(raw));
return Convert.ToHexString(hash)[..32]; // 取前32位
}
}
// ============ 幂等请求中间件 ============
public class IdempotencyMiddleware
{
private readonly RequestDelegate _next;
private readonly IDistributedCache _cache;
public IdempotencyMiddleware(RequestDelegate next, IDistributedCache cache)
{
_next = next;
_cache = cache;
}
public async Task InvokeAsync(HttpContext context)
{
if (context.Request.Method != "POST" && context.Request.Method != "PUT")
{
await _next(context);
return;
}
if (!context.Request.Headers.TryGetValue("X-Idempotency-Key", out var idempotencyKey))
{
context.Response.StatusCode = 400;
await context.Response.WriteAsJsonAsync(new { error = "缺少幂等键 X-Idempotency-Key" });
return;
}
var cacheKey = $"idempotency:{idempotencyKey}";
var cachedResponse = await _cache.GetStringAsync(cacheKey);
if (cachedResponse != null)
{
// 返回缓存的响应,避免重复处理
context.Response.StatusCode = 200;
context.Response.Headers["X-Idempotent-Replayed"] = "true";
await context.Response.WriteAsync(cachedResponse);
return;
}
// 捕获响应体
var originalBody = context.Response.Body;
using var memoryStream = new MemoryStream();
context.Response.Body = memoryStream;
await _next(context);
// 缓存成功响应
if (context.Response.StatusCode is >= 200 and < 300)
{
memoryStream.Seek(0, SeekOrigin.Begin);
var responseBody = await new StreamReader(memoryStream).ReadToEndAsync();
await _cache.SetStringAsync(cacheKey, responseBody, new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(24)
});
memoryStream.Seek(0, SeekOrigin.Begin);
await memoryStream.CopyToAsync(originalBody);
}
context.Response.Body = originalBody;
}
}// ============ 幂等键在第三方调用中的应用 ============
public class IdempotentExternalCallService
{
private readonly IHttpClientFactory _httpClientFactory;
private readonly IDistributedCache _cache;
private readonly ILogger<IdempotentExternalCallService> _logger;
public async Task<TResponse> CallWithIdempotencyAsync<TResponse>(
string clientName,
string endpoint,
object payload,
string businessId,
CancellationToken ct = default)
{
// 生成幂等键:基于业务ID + 端点
var idempotencyKey = IdempotencyKeyGenerator
.GenerateFromBusinessParams(businessId, endpoint);
// 检查是否已处理过
var cacheKey = $"ext_call:{idempotencyKey}";
var cached = await _cache.GetStringAsync(cacheKey, ct);
if (cached != null)
{
_logger.LogInformation("命中幂等缓存,Key={Key}", idempotencyKey);
return JsonSerializer.Deserialize<TResponse>(cached)!;
}
// 执行外部调用
var client = _httpClientFactory.CreateClient(clientName);
var request = new HttpRequestMessage(HttpMethod.Post, endpoint);
request.Headers.Add("Idempotency-Key", idempotencyKey);
request.Content = new StringContent(
JsonSerializer.Serialize(payload), Encoding.UTF8, "application/json");
var response = await client.SendAsync(request, ct);
response.EnsureSuccessStatusCode();
var responseBody = await response.Content.ReadAsStringAsync(ct);
var result = JsonSerializer.Deserialize<TResponse>(responseBody)
?? throw new InvalidOperationException("反序列化失败");
// 缓存成功结果
await _cache.SetStringAsync(cacheKey, responseBody, new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(72)
}, ct);
return result;
}
}三、重试与指数退避
网络瞬态故障是常态,必须有完善的重试机制。
// ============ 使用 Polly 实现重试策略 ============
// 安装 NuGet: Polly, Microsoft.Extensions.Http.Polly
using Polly;
using Polly.Retry;
// Program.cs - 注册带重试策略的 HttpClient
builder.Services
.AddHttpClient<SmsServiceClient>(client =>
{
client.BaseAddress = new Uri("https://sms.provider.com/api/");
client.Timeout = TimeSpan.FromSeconds(30);
})
.AddPolicyHandler(GetRetryPolicy());
static AsyncRetryPolicy<HttpResponseMessage> GetRetryPolicy()
{
return Policy<HttpResponseMessage>
.Handle<HttpRequestException>()
.OrResult(r => (int)r.StatusCode >= 500 || r.StatusCode == System.Net.HttpStatusCode.RequestTimeout)
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)) // 指数退避
+ TimeSpan.FromMilliseconds(Random.Shared.Next(0, 1000)), // 随机抖动
onRetry: (outcome, timespan, retryCount, context) =>
{
var logger = context.TryGetValue("Logger", out var l) ? l as ILogger : null;
logger?.LogWarning("第 {RetryCount} 次重试,等待 {Seconds:F1}s,原因:{Reason}",
retryCount, timespan.TotalSeconds,
outcome.Exception?.Message ?? outcome.Result.StatusCode.ToString());
});
}// ============ 高级重试:可配置的重试策略 ============
public class ResilientHttpClient
{
private readonly HttpClient _httpClient;
private readonly AsyncRetryPolicy<HttpResponseMessage> _retryPolicy;
private readonly ILogger _logger;
public ResilientHttpClient(HttpClient httpClient, ILogger logger, RetryOptions options)
{
_httpClient = httpClient;
_logger = logger;
_retryPolicy = Policy<HttpResponseMessage>
.Handle<HttpRequestException>()
.Or<TaskCanceledException>(ex => ex.InnerException is TimeoutException)
.OrResult(r => IsTransientFailure(r.StatusCode))
.WaitAndRetryAsync(
retryCount: options.MaxRetries,
sleepDurationProvider: retryAttempt =>
TimeSpan.FromMilliseconds(
options.BaseDelayMs * Math.Pow(options.BackoffMultiplier, retryAttempt - 1)
+ Random.Shared.Next(0, options.JitterMs)),
onRetry: (outcome, delay, retryCount, _) =>
{
_logger.LogWarning(
"外部调用重试 #{Count},延迟 {Delay}ms,原因:{Reason}",
retryCount, delay.TotalMilliseconds,
outcome.Exception?.Message ?? outcome.Result.StatusCode.ToString());
});
}
private static bool IsTransientFailure(HttpStatusCode statusCode)
{
return statusCode == HttpStatusCode.RequestTimeout
|| statusCode == HttpStatusCode.TooManyRequests
|| (int)statusCode >= 500 && (int)statusCode < 600;
}
public async Task<T> PostAsync<T>(string url, object payload, CancellationToken ct = default)
{
var context = new Context { ["Logger"] = _logger };
var response = await _retryPolicy.ExecuteAsync(async ctx =>
{
using var content = new StringContent(
JsonSerializer.Serialize(payload), Encoding.UTF8, "application/json");
return await _httpClient.PostAsync(url, content, ct);
}, context);
response.EnsureSuccessStatusCode();
var body = await response.Content.ReadAsStringAsync(ct);
return JsonSerializer.Deserialize<T>(body)!;
}
}
public class RetryOptions
{
public int MaxRetries { get; set; } = 3;
public int BaseDelayMs { get; set; } = 200;
public double BackoffMultiplier { get; set; } = 2.0;
public int JitterMs { get; set; } = 500;
}四、熔断器模式
当外部服务持续失败时,应快速失败而非持续等待。
// ============ 使用 Polly 实现熔断器 ============
using Polly.CircuitBreaker;
// Program.cs - 注册带熔断的 HttpClient
builder.Services
.AddHttpClient<InventoryServiceClient>(client =>
{
client.BaseAddress = new Uri("https://inventory.internal.com/");
})
.AddPolicyHandler(GetCircuitBreakerPolicy());
static AsyncCircuitBreakerPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
return Policy<HttpResponseMessage>
.Handle<HttpRequestException>()
.OrResult(r => (int)r.StatusCode >= 500)
.CircuitBreakerAsync(
handledEventsAllowedBeforeBreaking: 5, // 连续5次失败后熔断
durationOfBreak: TimeSpan.FromSeconds(30), // 熔断30秒
onBreak: (outcome, duration) =>
{
Console.WriteLine($"[熔断器打开] 将在 {duration.TotalSeconds}s 后尝试恢复");
},
onReset: () =>
{
Console.WriteLine("[熔断器关闭] 服务恢复正常");
},
onHalfOpen: () =>
{
Console.WriteLine("[熔断器半开] 正在试探性调用...");
});
}// ============ 熔断 + 重试 组合策略 ============
builder.Services
.AddHttpClient<OrderServiceClient>(client =>
{
client.BaseAddress = new Uri("https://order-service.internal/");
})
.AddPolicyHandler(Policy.WrapAsync(GetRetryPolicy(), GetCircuitBreakerPolicy()));
// 执行顺序:Retry(CircuitBreaker(实际调用))
// 外层是重试,内层是熔断
// ============ 熔断状态监控 ============
public class CircuitBreakerHealthCheck : IHealthCheck
{
private readonly CircuitBreakerPolicy _circuitBreaker;
public CircuitBreakerHealthCheck(CircuitBreakerPolicy circuitBreaker)
{
_circuitBreaker = circuitBreaker;
}
public Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context, CancellationToken ct = default)
{
var state = _circuitBreaker.CircuitState;
return state switch
{
CircuitState.Closed => Task.FromResult(HealthCheckResult.Healthy("熔断器正常")),
CircuitState.HalfOpen => Task.FromResult(HealthCheckResult.Degraded("熔断器半开")),
CircuitState.Open => Task.FromResult(HealthCheckResult.Unhealthy("熔断器已打开")),
_ => Task.FromResult(HealthCheckResult.Unknown("未知状态"))
};
}
}五、限流(针对外部调用的速率控制)
// ============ 固定窗口限流器 ============
public class ExternalApiRateLimiter
{
private readonly SemaphoreSlim _semaphore = new(10, 10); // 最大并发数
private readonly TimeWindowCounter _counter;
private readonly int _maxRequestsPerMinute;
public ExternalApiRateLimiter(int maxRequestsPerMinute = 60)
{
_maxRequestsPerMinute = maxRequestsPerMinute;
_counter = new TimeWindowCounter(TimeSpan.FromMinutes(1));
}
public async Task<T> ExecuteAsync<T>(Func<Task<T>> action, CancellationToken ct = default)
{
// 等待信号量(控制并发数)
await _semaphore.WaitAsync(ct);
try
{
// 等待限流窗口
await _counter.WaitForSlotAsync(_maxRequestsPerMinute, ct);
return await action();
}
finally
{
_semaphore.Release();
}
}
}
public class TimeWindowCounter
{
private readonly TimeSpan _window;
private readonly Queue<DateTime> _timestamps = new();
private readonly object _lock = new();
public TimeWindowCounter(TimeSpan window)
{
_window = window;
}
public async Task WaitForSlotAsync(int maxCount, CancellationToken ct = default)
{
while (!ct.IsCancellationRequested)
{
lock (_lock)
{
var cutoff = DateTime.UtcNow - _window;
while (_timestamps.Count > 0 && _timestamps.Peek() < cutoff)
_timestamps.Dequeue();
if (_timestamps.Count < maxCount)
{
_timestamps.Enqueue(DateTime.UtcNow);
return;
}
}
await Task.Delay(100, ct);
}
ct.ThrowIfCancellationRequested();
}
}// ============ 使用 ASP.NET Core 内置限流(.NET 7+) ============
// Program.cs
builder.Services.AddRateLimiter(options =>
{
options.GlobalLimiter = PartitionedRateLimiter.Create<HttpContext, string>(context =>
{
var tenantId = context.User.FindFirst("tenant_id")?.Value ?? "anonymous";
return RateLimitPartition.GetSlidingWindowLimiter(tenantId, _ => new SlidingWindowRateLimiterOptions
{
PermitLimit = 100,
Window = TimeSpan.FromMinutes(1),
SegmentsPerWindow = 6,
QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
QueueLimit = 10
});
});
options.OnRejected = async (context, ct) =>
{
context.HttpContext.Response.StatusCode = 429;
if (context.Lease.TryGetMetadata(MetadataName.RetryAfter, out var retryAfter))
{
context.HttpContext.Response.Headers.RetryAfter = retryAfter.TotalSeconds.ToString("F0");
await context.HttpContext.Response.WriteAsJsonAsync(new
{
error = "请求过于频繁",
retryAfter = $"{retryAfter.TotalSeconds:F0}s"
}, ct);
}
};
});六、Webhook 接收与处理
// ============ Webhook 签名验证 ============
[ApiController]
[Route("webhooks/[action]")]
public class WebhookController : ControllerBase
{
private readonly ILogger<WebhookController> _logger;
private readonly IWebhookProcessor _processor;
// Webhook 密钥应从配置中获取,不要硬编码
private const string WebhookSecret = "whsec_your_secret_key";
public WebhookController(ILogger<WebhookController> logger, IWebhookProcessor processor)
{
_logger = logger;
_processor = processor;
}
[HttpPost("stripe")]
public async Task<IActionResult> StripeWebhook()
{
// 1. 读取请求体
var payload = await new StreamReader(HttpContext.Request.Body).ReadToEndAsync();
// 2. 验证签名
var signature = Request.Headers["Stripe-Signature"].FirstOrDefault();
if (!VerifyStripeSignature(payload, signature, WebhookSecret))
{
_logger.LogWarning("Webhook 签名验证失败");
return Unauthorized();
}
// 3. 解析事件
var stripeEvent = JsonSerializer.Deserialize<StripeWebhookEvent>(payload);
if (stripeEvent == null) return BadRequest("无效的事件数据");
// 4. 幂等处理
try
{
await _processor.ProcessAsync(stripeEvent);
}
catch (DuplicateEventException)
{
// 重复事件,直接返回成功
_logger.LogInformation("重复的 Webhook 事件,已忽略:{EventId}", stripeEvent.Id);
}
// 5. 始终返回 200,防止对方重复投递
return Ok();
}
private static bool VerifyStripeSignature(string payload, string? signature, string secret)
{
if (string.IsNullOrEmpty(signature)) return false;
// Stripe 签名格式:t=timestamp,v1=hash
var parts = signature.Split(',');
var timestamp = parts.FirstOrDefault(p => p.StartsWith("t="))?[2..];
var expectedSig = parts.FirstOrDefault(p => p.StartsWith("v1="))?[3..];
if (timestamp == null || expectedSig == null) return false;
var signedPayload = $"{timestamp}.{payload}";
using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(secret));
var computedHash = hmac.ComputeHash(Encoding.UTF8.GetBytes(signedPayload));
var computedSig = Convert.ToHexString(computedHash).ToLowerInvariant();
return computedSig == expectedSig.ToLowerInvariant();
}
}
// ============ Webhook 处理器接口与实现 ============
public interface IWebhookProcessor
{
Task ProcessAsync(StripeWebhookEvent webhookEvent);
}
public class WebhookProcessor : IWebhookProcessor
{
private readonly AppDbContext _dbContext;
private readonly ILogger<WebhookProcessor> _logger;
private readonly IEnumerable<IWebhookHandler> _handlers;
public WebhookProcessor(
AppDbContext dbContext,
ILogger<WebhookProcessor> logger,
IEnumerable<IWebhookHandler> handlers)
{
_dbContext = dbContext;
_logger = logger;
_handlers = handlers;
}
public async Task ProcessAsync(StripeWebhookEvent webhookEvent)
{
// 幂等检查
var alreadyProcessed = await _dbContext.WebhookEvents
.AnyAsync(e => e.ExternalId == webhookEvent.Id);
if (alreadyProcessed)
throw new DuplicateEventException(webhookEvent.Id);
// 记录事件
var eventRecord = new WebhookEventRecord
{
ExternalId = webhookEvent.Id,
Type = webhookEvent.Type,
Payload = webhookEvent.Data,
ReceivedAt = DateTime.UtcNow,
Status = "Processing"
};
_dbContext.WebhookEvents.Add(eventRecord);
await _dbContext.SaveChangesAsync();
// 查找并执行对应的处理器
var handler = _handlers.FirstOrDefault(h => h.CanHandle(webhookEvent.Type));
if (handler != null)
{
await handler.HandleAsync(webhookEvent);
eventRecord.Status = "Completed";
}
else
{
_logger.LogWarning("未找到 Webhook 处理器:{Type}", webhookEvent.Type);
eventRecord.Status = "Unhandled";
}
await _dbContext.SaveChangesAsync();
}
}七、请求/响应日志记录
// ============ HTTP 请求/响应日志拦截器 ============
public class HttpLoggingHandler : DelegatingHandler
{
private readonly ILogger<HttpLoggingHandler> _logger;
public HttpLoggingHandler(ILogger<HttpLoggingHandler> logger)
{
_logger = logger;
}
protected override async Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request, CancellationToken cancellationToken)
{
var correlationId = Guid.NewGuid().ToString("N")[..16];
var stopwatch = Stopwatch.StartNew();
// 记录请求
var requestLog = new
{
CorrelationId = correlationId,
Method = request.Method.ToString(),
Url = request.RequestUri?.ToString(),
Headers = SanitizeHeaders(request.Headers),
Body = await ReadBodySafely(request.Content)
};
_logger.LogInformation("外部调用请求 [{Id}]: {Request}",
correlationId, JsonSerializer.Serialize(requestLog));
HttpResponseMessage? response = null;
try
{
response = await base.SendAsync(request, cancellationToken);
stopwatch.Stop();
// 记录响应
var responseLog = new
{
CorrelationId = correlationId,
StatusCode = (int)response.StatusCode,
Duration = stopwatch.ElapsedMilliseconds,
Headers = SanitizeHeaders(response.Headers),
Body = await ReadBodySafely(response.Content)
};
_logger.LogInformation("外部调用响应 [{Id}]: {Response}",
correlationId, JsonSerializer.Serialize(responseLog));
return response;
}
catch (Exception ex)
{
stopwatch.Stop();
_logger.LogError(ex, "外部调用失败 [{Id}],耗时 {Ms}ms",
correlationId, stopwatch.ElapsedMilliseconds);
throw;
}
}
private static Dictionary<string, string?> SanitizeHeaders(HttpHeaders headers)
{
var sensitive = new[] { "authorization", "api-key", "cookie", "set-cookie" };
return headers.ToDictionary(
h => h.Key,
h => sensitive.Contains(h.Key.ToLowerInvariant()) ? "***REDACTED***" : string.Join(", ", h.Value));
}
private static async Task<string?> ReadBodySafely(HttpContent? content)
{
if (content == null) return null;
try
{
var body = await content.ReadAsStringAsync();
return body.Length > 2048 ? body[..2048] + "...[截断]" : body;
}
catch
{
return "[无法读取请求体]";
}
}
}
// 注册日志拦截器
builder.Services.AddHttpClient("ExternalApi")
.AddHttpMessageHandler<HttpLoggingHandler>();八、API 网关聚合模式
// ============ API 聚合器:将多个外部调用合并为一个响应 ============
public class DashboardAggregationService
{
private readonly IHttpClientFactory _httpClientFactory;
private readonly ILogger<DashboardAggregationService> _logger;
public DashboardAggregationService(
IHttpClientFactory httpClientFactory,
ILogger<DashboardAggregationService> logger)
{
_httpClientFactory = httpClientFactory;
_logger = logger;
}
public async Task<DashboardDto> GetDashboardDataAsync(string userId, CancellationToken ct = default)
{
// 并行调用多个外部服务
var orderTask = GetOrdersAsync(userId, ct);
var profileTask = GetProfileAsync(userId, ct);
var notificationTask = GetNotificationsAsync(userId, ct);
try
{
await Task.WhenAll(orderTask, profileTask, notificationTask);
}
catch (Exception ex)
{
_logger.LogError(ex, "部分聚合调用失败");
// 部分失败时返回可用数据
}
return new DashboardDto
{
Orders = orderTask.IsCompletedSuccessfully ? orderTask.Result : null,
Profile = profileTask.IsCompletedSuccessfully ? profileTask.Result : null,
Notifications = notificationTask.IsCompletedSuccessfully ? notificationTask.Result : null,
HasPartialFailure = orderTask.IsFaulted || profileTask.IsFaulted || notificationTask.IsFaulted
};
}
private async Task<List<OrderDto>> GetOrdersAsync(string userId, CancellationToken ct)
{
var client = _httpClientFactory.CreateClient("OrderService");
var response = await client.GetAsync($"orders?userId={userId}", ct);
response.EnsureSuccessStatusCode();
var body = await response.Content.ReadAsStringAsync(ct);
return JsonSerializer.Deserialize<List<OrderDto>>(body) ?? new();
}
private async Task<UserProfileDto> GetProfileAsync(string userId, CancellationToken ct)
{
var client = _httpClientFactory.CreateClient("UserService");
var response = await client.GetAsync($"users/{userId}/profile", ct);
response.EnsureSuccessStatusCode();
var body = await response.Content.ReadAsStringAsync(ct);
return JsonSerializer.Deserialize<UserProfileDto>(body)!;
}
private async Task<List<NotificationDto>> GetNotificationsAsync(string userId, CancellationToken ct)
{
var client = _httpClientFactory.CreateClient("NotificationService");
var response = await client.GetAsync($"notifications?userId={userId}&limit=10", ct);
response.EnsureSuccessStatusCode();
var body = await response.Content.ReadAsStringAsync(ct);
return JsonSerializer.Deserialize<List<NotificationDto>>(body) ?? new();
}
}
// ============ 聚合 API 端点 ============
[ApiController]
[Route("api/dashboard")]
public class DashboardController : ControllerBase
{
private readonly DashboardAggregationService _aggregation;
public DashboardController(DashboardAggregationService aggregation)
{
_aggregation = aggregation;
}
[HttpGet("{userId}")]
public async Task<ActionResult<DashboardDto>> Get(string userId, CancellationToken ct)
{
var data = await _aggregation.GetDashboardDataAsync(userId, ct);
return Ok(data);
}
}九、契约测试
// ============ 使用 Pact 进行契约测试 ============
// 安装 NuGet: PactNet
using PactNet;
using PactNet.Infrastructure.Outputters;
using Xunit;
public class PaymentServiceProviderApiTests : IDisposable
{
private readonly IPactBuilderV3 _pactBuilder;
public PaymentServiceProviderApiTests()
{
var config = new PactConfig
{
PactDir = "../../../pacts/",
Outputters = new List<IOutput> { new ConsoleOutput() }
};
_pactBuilder = Pact.V3("MyAppConsumer", "PaymentProvider", config)
.WithHttpInteractions();
}
[Fact]
public async Task CreatePayment_ValidRequest_ReturnsSuccess()
{
// Arrange - 定义期望的契约
_pactBuilder
.UponReceiving("创建支付请求")
.Given("用户 account-123 存在且有余额")
.WithRequest(HttpMethod.Post, "/v2/payments")
.WithHeader("Content-Type", "application/json")
.WithHeader("Idempotency-Key", "test-key-123")
.WithJsonBody(new
{
accountId = "account-123",
amount = 99.99,
currency = "CNY"
})
.WillRespond()
.WithStatus(201)
.WithHeader("Content-Type", "application/json")
.WithJsonBody(new
{
paymentId = Like.Guid("pay-abc-123"),
status = "created",
createdAt = Like.DateTime("2025-01-15T10:30:00Z")
});
await _pactBuilder.VerifyAsync(async ctx =>
{
// Act - 使用契约中定义的 Mock 服务地址进行调用
var client = new PaymentServiceClient(
new HttpClient { BaseAddress = ctx.MockServerUri });
var result = await client.CreatePaymentAsync(new PaymentRequest
{
AccountId = "account-123",
Amount = 99.99m,
Currency = "CNY",
IdempotencyKey = "test-key-123"
});
// Assert
Assert.Equal("created", result.Status);
Assert.NotNull(result.PaymentId);
});
}
public void Dispose()
{
_pactBuilder.Dispose();
}
}十、错误处理与降级策略
// ============ 外部调用统一错误处理 ============
public class ExternalCallResult<T>
{
public bool IsSuccess { get; init; }
public T? Data { get; init; }
public string? ErrorMessage { get; init; }
public ExternalCallErrorType ErrorType { get; init; }
public TimeSpan? RetryAfter { get; init; }
public Exception? Exception { get; init; }
public static ExternalCallResult<T> Success(T data) => new()
{
IsSuccess = true,
Data = data
};
public static ExternalCallResult<T> Failure(ExternalCallErrorType errorType, string message,
Exception? ex = null, TimeSpan? retryAfter = null) => new()
{
IsSuccess = false,
ErrorType = errorType,
ErrorMessage = message,
Exception = ex,
RetryAfter = retryAfter
};
}
public enum ExternalCallErrorType
{
Timeout,
RateLimited,
ServiceUnavailable,
AuthenticationFailed,
ValidationError,
NetworkError,
Unknown
}
// ============ 带降级的调用封装 ============
public class ResilientExternalCallService
{
private readonly IHttpClientFactory _httpClientFactory;
private readonly ILogger<ResilientExternalCallService> _logger;
private readonly CircuitBreakerService _circuitBreaker;
public async Task<ExternalCallResult<T>> ExecuteAsync<T>(
string clientName,
HttpRequestMessage request,
T? fallbackValue = default,
CancellationToken ct = default)
{
try
{
if (_circuitBreaker.IsOpen(clientName))
{
_logger.LogWarning("熔断器已打开,使用降级值:{Client}", clientName);
return ExternalCallResult<T>.Success(fallbackValue!);
}
var client = _httpClientFactory.CreateClient(clientName);
var response = await client.SendAsync(request, ct);
if (response.IsSuccessStatusCode)
{
_circuitBreaker.RecordSuccess(clientName);
var body = await response.Content.ReadAsStringAsync(ct);
var result = JsonSerializer.Deserialize<T>(body);
return ExternalCallResult<T>.Success(result!);
}
_circuitBreaker.RecordFailure(clientName);
var errorType = response.StatusCode switch
{
HttpStatusCode.TooManyRequests => ExternalCallErrorType.RateLimited,
HttpStatusCode.Unauthorized or HttpStatusCode.Forbidden
=> ExternalCallErrorType.AuthenticationFailed,
HttpStatusCode.RequestTimeout => ExternalCallErrorType.Timeout,
HttpStatusCode.ServiceUnavailable or HttpStatusCode.BadGateway
=> ExternalCallErrorType.ServiceUnavailable,
_ => ExternalCallErrorType.Unknown
};
TimeSpan? retryAfter = null;
if (response.Headers.RetryAfter?.Delta is { } delta)
retryAfter = delta;
return ExternalCallResult<T>.Failure(
errorType,
$"外部调用失败: {response.StatusCode}",
retryAfter: retryAfter);
}
catch (TaskCanceledException ex) when (ex.InnerException is TimeoutException)
{
_circuitBreaker.RecordFailure(clientName);
return ExternalCallResult<T>.Failure(ExternalCallErrorType.Timeout, "请求超时", ex);
}
catch (HttpRequestException ex)
{
_circuitBreaker.RecordFailure(clientName);
return ExternalCallResult<T>.Failure(ExternalCallErrorType.NetworkError, "网络错误", ex);
}
}
}优点
- 系统稳定性:通过熔断、重试、限流三重保障,极大提升系统在面对外部依赖故障时的稳定性
- 数据一致性:幂等性设计确保即使请求重复也不会导致数据不一致
- 可观测性:完善的请求日志和追踪使得问题排查效率大幅提升
- 团队协作:契约测试让前后端、服务间可以独立开发,减少集成阶段的返工
缺点
- 复杂度增加:每种模式都引入了额外的抽象层和中间件,增加了理解和维护成本
- 调试困难:重试和熔断可能掩盖真实错误,需要完善的日志才能有效排错
- 性能开销:日志记录、幂等检查、限流计数器等都会带来一定的性能开销
- 测试复杂:契约测试需要维护 Pact 文件,Mock 服务的配置可能很繁琐
性能注意事项
- 连接池管理:
IHttpClientFactory默认 2 分钟回收HttpMessageHandler,高并发场景下应适当调整PooledConnectionLifetime和PooledConnectionIdleTimeout - DNS 刷新:长生命周期的 HttpClient 可能遇到 DNS 缓存问题,设置
PooledConnectionLifetime = TimeSpan.FromMinutes(5)可定期刷新 DNS - 日志截断:生产环境中响应体日志应截断到合理长度(如 2KB),避免大量日志影响性能
- 限流精度:滑动窗口限流比固定窗口限流更精确,但消耗更多内存
- 熔断粒度:对不同端点使用独立的熔断器,避免一个慢接口拖垮整个外部服务调用
// 性能优化:精细控制 HttpClient 连接池
builder.Services.AddHttpClient("ExternalApi", client =>
{
client.BaseAddress = new Uri("https://api.external.com/");
client.Timeout = TimeSpan.FromSeconds(30);
})
.ConfigurePrimaryHttpMessageHandler(() => new SocketsHttpHandler
{
PooledConnectionLifetime = TimeSpan.FromMinutes(5), // 5分钟刷新DNS
PooledConnectionIdleTimeout = TimeSpan.FromMinutes(2),
MaxConnectionsPerServer = 50, // 每个服务端最大连接数
EnableMultipleHttp2Connections = true
});总结
企业集成模式是将单体系统与外部世界安全、可靠地连接起来的关键基础设施。在 ASP.NET Core 中,通过 IHttpClientFactory + Polly 的组合,我们可以低成本地实现重试、熔断、超时控制等弹性策略。幂等性设计是确保分布式场景下数据正确性的基石。契约测试则为多方协作提供了安全网。一个成熟的集成方案,应该是"防御性编程"思想的完整体现。
关键知识点
| 知识点 | 要点 |
|---|---|
| IHttpClientFactory | 避免端口耗尽,使用类型化客户端 |
| 幂等键 | 基于业务参数生成确定性 Hash |
| 指数退避 + 抖动 | 防止重试风暴,随机化重试间隔 |
| 熔断器三态 | Closed -> Open -> HalfOpen -> Closed |
| Webhook 签名验证 | HMAC-SHA256 验证请求来源合法性 |
| 契约测试 | Pact 框架,消费者驱动 |
| API 聚合 | Task.WhenAll 并行调用,部分失败容错 |
| 限流器 | 固定窗口 / 滑动窗口 / 令牌桶 / 漏桶 |
常见误区
误区:HttpClient 使用 using 即可
- 事实:频繁创建销毁导致端口耗尽,应使用
IHttpClientFactory
- 事实:频繁创建销毁导致端口耗尽,应使用
误区:重试次数越多越可靠
- 事实:重试过多可能加剧下游压力,3 次通常是合理的上限
误区:熔断器打开就是出问题了
- 事实:熔断是正常的保护机制,应结合监控告警而非仅靠日志判断
误区:Webhook 总是可靠的
- 事实:对方可能重复投递,必须做幂等处理
误区:所有外部调用共用一个熔断器
- 事实:应按端点或服务分组使用独立熔断器
误区:日志记录完整的请求响应体
- 事实:可能泄露敏感信息(Token、密钥),必须脱敏处理
进阶路线
- 服务网格(Service Mesh):Istio / Linkerd 可以在基础设施层实现重试、熔断、限流,无需修改应用代码
- AsyncAPI 规范:类似 OpenAPI,但用于描述事件驱动的 API(Webhook、消息队列)
- GraphQL Federation:替代 API 聚合模式,用 GraphQL Schema 统一多个后端服务
- OpenTelemetry 集成:标准化 Traces、Metrics、Logs 三大支柱的采集与导出
- Chaos Engineering:通过 Chaos Monkey 等工具主动注入故障,验证系统的弹性能力
适用场景
| 场景 | 推荐模式 |
|---|---|
| 支付对接 | 幂等键 + 重试 + 完整日志 |
| 短信/邮件发送 | 重试 + 限流 + 异步队列 |
| OAuth 认证集成 | 缓存 Token + 自动刷新 |
| 数据同步 | Webhook + 幂等 + 死信队列 |
| 微服务间调用 | 熔断 + 重试 + 契约测试 |
| BFF 聚合层 | API 聚合 + 并行调用 + 降级 |
落地建议
- 第一步:规范 HttpClient 使用。确保所有外部调用走
IHttpClientFactory,禁止手动new HttpClient() - 第二步:添加请求/响应日志。使用
DelegatingHandler统一记录,注意脱敏 - 第三步:引入重试策略。先对所有外部调用添加基本的指数退避重试
- 第四步:逐个服务添加熔断。从最不稳定的外部服务开始
- 第五步:实现幂等性。对所有写操作添加幂等键检查
- 第六步:添加健康检查。将熔断器状态暴露为健康检查端点
- 第七步:引入契约测试。对核心集成点编写 Pact 契约测试
排错清单
复盘问题
- 我们的外部调用中,有多少比例有重试机制?有幂等保障的占多少?
- 当第三方服务宕机时,我们的系统会怎样?是否有明确的降级策略?
- 最近一次第三方 API 变更导致的问题是什么?能否通过契约测试提前发现?
- 我们的外部调用日志是否足以支撑问题排查?是否有脱敏处理?
- 当前是否有监控外部调用成功率和延迟的仪表盘?
延伸阅读
- Microsoft Learn - 使用 IHttpClientFactory 发出 HTTP 请求
- Polly 官方文档
- Enterprise Integration Patterns - Gregor Hohpe
- Pact 契约测试
- ASP.NET Core 中的弹性
- Martin Fowler - CircuitBreaker 模式
