后台任务处理深入
大约 9 分钟约 2669 字
后台任务处理深入
简介
ASP.NET Core 提供了多种后台任务处理机制:IHostedService、BackgroundService、IJobScheduler 等。理解不同方案的生命周期管理、并发控制和错误恢复策略,有助于选择合适的后台任务方案。
特点
BackgroundService 进阶
生命周期管理
// BackgroundService 基类封装了 IHostedService
// ExecuteAsync 在主机启动后立即调用
// stoppingToken 在应用关闭时触发
public abstract class ManagedBackgroundService : BackgroundService
{
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
protected ManagedBackgroundService(
ILogger logger,
IServiceProvider serviceProvider)
{
_logger = logger;
_serviceProvider = serviceProvider;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("{Service} 启动", GetType().Name);
// 等待主机完全启动
using var scope = _serviceProvider.CreateScope();
var lifetime = scope.ServiceProvider.GetRequiredService<IHostApplicationLifetime>();
while (!stoppingToken.IsCancellationRequested && !lifetime.ApplicationStarted.IsCancellationRequested)
{
await Task.Delay(100, stoppingToken);
}
try
{
await ExecuteInternalAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// 正常关闭,不记录错误
_logger.LogInformation("{Service} 正常关闭", GetType().Name);
}
catch (Exception ex)
{
_logger.LogCritical(ex, "{Service} 异常终止", GetType().Name);
throw; // 重新抛出,触发主机关闭
}
}
protected abstract Task ExecuteInternalAsync(CancellationToken stoppingToken);
}
// 定时轮询服务
public class DataSyncService : ManagedBackgroundService
{
private readonly IDataSyncService _syncService;
public DataSyncService(
ILogger<DataSyncService> logger,
IServiceProvider sp,
IDataSyncService syncService) : base(logger, sp)
{
_syncService = syncService;
}
protected override async Task ExecuteInternalAsync(CancellationToken stoppingToken)
{
using var timer = new PeriodicTimer(TimeSpan.FromMinutes(5));
while (await timer.WaitForNextTickAsync(stoppingToken))
{
await _syncService.SyncAllAsync(stoppingToken);
}
}
}
// 注册
builder.Services.AddHostedService<DataSyncService>();
// 多个后台服务的启动顺序
// 通过 AddHostedService 注册顺序决定启动顺序
builder.Services.AddHostedService<CacheWarmupService>(); // 1. 先预热缓存
builder.Services.AddHostedService<DataSyncService>(); // 2. 再同步数据
builder.Services.AddHostedService<HealthCheckService>(); // 3. 最后启动健康检查Channel 消息队列
生产者-消费者模式
// System.Threading.Channels — 高性能进程内消息队列
// 1. 定义消息类型
public record OrderCreatedEvent(int OrderId, string CustomerName, decimal Amount);
public record PaymentProcessedEvent(int OrderId, string TransactionId);
// 2. 创建有界通道
public class OrderEventChannel
{
// 容量 10000,满时等待
private readonly Channel<object> _channel = Channel.CreateBounded<object>(
new BoundedChannelOptions(10000)
{
FullMode = BoundedChannelFullMode.Wait, // 满时等待
SingleReader = true, // 单消费者优化
SingleWriter = false // 多生产者
});
// 写入消息
public async ValueTask PublishAsync(object evt, CancellationToken ct = default)
=> await _channel.Writer.WriteAsync(evt, ct);
// 读取消息流
public IAsyncEnumerable<object> ReadAllAsync(CancellationToken ct = default)
=> _channel.Reader.ReadAllAsync(ct);
// 尝试写入(非阻塞)
public bool TryPublish(object evt)
=> _channel.Writer.TryWrite(evt);
}
// 3. 消费者服务
public class OrderEventConsumer : BackgroundService
{
private readonly OrderEventChannel _channel;
private readonly IServiceProvider _sp;
private readonly ILogger<OrderEventConsumer> _logger;
public OrderEventConsumer(
OrderEventChannel channel,
IServiceProvider sp,
ILogger<OrderEventConsumer> logger)
{
_channel = channel;
_sp = sp;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var evt in _channel.ReadAllAsync(stoppingToken))
{
try
{
using var scope = _sp.CreateScope();
switch (evt)
{
case OrderCreatedEvent orderCreated:
await HandleOrderCreated(scope, orderCreated, stoppingToken);
break;
case PaymentProcessedEvent paymentProcessed:
await HandlePaymentProcessed(scope, paymentProcessed, stoppingToken);
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "处理事件 {EventType} 失败", evt.GetType().Name);
}
}
}
private async Task HandleOrderCreated(
IServiceScope scope, OrderCreatedEvent evt, CancellationToken ct)
{
var emailService = scope.ServiceProvider.GetRequiredService<IEmailService>();
await emailService.SendOrderConfirmationAsync(evt.OrderId, evt.CustomerName, ct);
}
private async Task HandlePaymentProcessed(
IServiceScope scope, PaymentProcessedEvent evt, CancellationToken ct)
{
var inventoryService = scope.ServiceProvider.GetRequiredService<IInventoryService>();
await inventoryService.ReserveStockAsync(evt.OrderId, ct);
}
}
// 4. 注册(Singleton)
builder.Services.AddSingleton<OrderEventChannel>();
builder.Services.AddHostedService<OrderEventConsumer>();
// 5. 生产者(在 Controller 或 Service 中)
app.MapPost("/api/orders", async (
CreateOrderRequest request,
OrderEventChannel channel) =>
{
var orderId = await CreateOrderAsync(request);
await channel.PublishAsync(new OrderCreatedEvent(orderId, request.Name, request.Amount));
return Results.Created($"/api/orders/{orderId}", new { OrderId = orderId });
});多消费者竞争
// 多个消费者竞争消费(每个消息只被一个消费者处理)
public class WorkerPool<TMessage>
{
private readonly Channel<TMessage> _channel;
private readonly IServiceProvider _sp;
private readonly ILogger _logger;
private readonly int _workerCount;
private readonly Func<IServiceProvider, TMessage, CancellationToken, ValueTask> _handler;
public WorkerPool(
IServiceProvider sp,
ILogger logger,
int workerCount,
Func<IServiceProvider, TMessage, CancellationToken, ValueTask> handler,
int capacity = 10000)
{
_sp = sp;
_logger = logger;
_workerCount = workerCount;
_handler = handler;
_channel = Channel.CreateBounded<TMessage>(new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false, // 多消费者
SingleWriter = false
});
}
public async ValueTask EnqueueAsync(TMessage message, CancellationToken ct = default)
=> await _channel.Writer.WriteAsync(message, ct);
public async Task RunAsync(CancellationToken stoppingToken)
{
var tasks = Enumerable.Range(0, _workerCount)
.Select(i => ConsumeAsync($"Worker-{i}", stoppingToken));
await Task.WhenAll(tasks);
}
private async Task ConsumeAsync(string workerId, CancellationToken ct)
{
_logger.LogInformation("{WorkerId} 启动", workerId);
await foreach (var message in _channel.Reader.ReadAllAsync(ct))
{
try
{
using var scope = _sp.CreateScope();
await _handler(scope.ServiceProvider, message, ct);
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
_logger.LogError(ex, "{WorkerId} 处理消息失败", workerId);
}
}
}
}
// 使用
var pool = new WorkerPool<EmailMessage>(
sp, logger, workerCount: 4,
handler: async (provider, msg, ct) =>
{
var emailService = provider.GetRequiredService<IEmailService>();
await emailService.SendAsync(msg, ct);
});Coravel 任务调度
Coravel 框架集成
// dotnet add package Coravel
// 注册 Coravel
builder.Services.AddScheduler();
builder.Services.AddQueue();
// 配置调度任务
builder.Services.Configure<SchedulerOptions>(options =>
{
options.ErrorHandler = ex =>
Console.WriteLine($"Scheduled task failed: {ex.Message}");
});
var app = builder.Build();
// 配置调度器
app.Services.UseScheduler(scheduler =>
{
// 每分钟执行
scheduler.Schedule<CleanupTask>()
.EveryMinute();
// 每天凌晨 2 点
scheduler.Schedule<DataSyncTask>()
.DailyAtHour(2);
// 工作日每天 9 点
scheduler.Schedule<ReportTask>()
.Weekday()
.DailyAtHour(9);
// 每 5 分钟
scheduler.Schedule<HealthCheckTask>()
.EveryFiveMinutes();
// Cron 表达式
scheduler.ScheduleWithParams<SyncTask>("api-server")
.Cron("0 */2 * * *"); // 每 2 小时
// 防止重叠执行
scheduler.Schedule<LongRunningTask>()
.EveryMinute()
.PreventOverlapping(nameof(LongRunningTask));
});
// 任务实现
public class CleanupTask : IInvocable
{
private readonly ILogger<CleanupTask> _logger;
private readonly IServiceProvider _sp;
public CleanupTask(ILogger<CleanupTask> logger, IServiceProvider sp)
{
_logger = logger;
_sp = sp;
}
public async Task Invoke()
{
using var scope = _sp.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<ITempFileRepository>();
var deleted = await repository.CleanupOlderThanAsync(TimeSpan.FromDays(7));
_logger.LogInformation("清理了 {Count} 个临时文件", deleted);
}
}
// 队列任务(即时执行的后台任务)
app.MapPost("/api/emails", async (
SendEmailRequest request,
IQueue queue) =>
{
// 入队,立即返回
queue.QueueAsyncTask(async () =>
{
using var scope = app.Services.CreateScope();
var emailService = scope.ServiceProvider.GetRequiredService<IEmailService>();
await emailService.SendAsync(request.To, request.Subject, request.Body);
});
return Results.Accepted();
});错误恢复策略
重试与死信队列
// 带重试的消息处理器
public class RetryMessageProcessor<TMessage>
{
private readonly ILogger _logger;
private readonly int _maxRetries;
private readonly TimeSpan _retryDelay;
private readonly Channel<RetryMessage<TMessage>> _deadLetterChannel;
public RetryMessageProcessor(
ILogger logger,
int maxRetries = 3,
TimeSpan? retryDelay = null)
{
_logger = logger;
_maxRetries = maxRetries;
_retryDelay = retryDelay ?? TimeSpan.FromSeconds(5);
_deadLetterChannel = Channel.CreateUnbounded<RetryMessage<TMessage>>();
}
public async Task ProcessWithRetryAsync(
TMessage message,
Func<TMessage, CancellationToken, Task> handler,
CancellationToken ct)
{
var retryCount = 0;
while (retryCount < _maxRetries)
{
try
{
await handler(message, ct);
return; // 成功
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
retryCount++;
_logger.LogWarning(ex,
"处理消息失败,第 {Retry}/{Max} 次重试",
retryCount, _maxRetries);
if (retryCount >= _maxRetries)
{
// 超过最大重试,发送到死信队列
await _deadLetterChannel.Writer.WriteAsync(
new RetryMessage<TMessage>(message, retryCount, ex), ct);
return;
}
// 指数退避
var delay = TimeSpan.FromMilliseconds(
_retryDelay.TotalMilliseconds * Math.Pow(2, retryCount - 1));
await Task.Delay(delay, ct);
}
}
}
// 死信队列消费
public IAsyncEnumerable<RetryMessage<TMessage>> ReadDeadLettersAsync(CancellationToken ct)
=> _deadLetterChannel.Reader.ReadAllAsync(ct);
}
public record RetryMessage<T>(T Message, int RetryCount, Exception LastError);
// 使用
public class OrderProcessorService : BackgroundService
{
private readonly OrderEventChannel _channel;
private readonly RetryMessageProcessor<OrderCreatedEvent> _processor;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 启动死信队列监控
_ = MonitorDeadLettersAsync(stoppingToken);
await foreach (var evt in _channel.ReadAllAsync(stoppingToken))
{
await _processor.ProcessWithRetryAsync(evt, async (msg, ct) =>
{
// 处理逻辑
}, stoppingToken);
}
}
private async Task MonitorDeadLettersAsync(CancellationToken ct)
{
await foreach (var dead in _processor.ReadDeadLettersAsync(ct))
{
_logger.LogCritical(
"消息处理最终失败: {Message}, 重试 {Count} 次, 最后错误: {Error}",
dead.Message, dead.RetryCount, dead.LastError.Message);
// 持久化到数据库或发送告警
}
}
}优雅关闭
令牌传播与资源释放
// 优雅关闭的核心:正确传播 CancellationToken
public class GracefulShutdownService : BackgroundService
{
private readonly ILogger _logger;
private readonly IServiceProvider _sp;
public GracefulShutdownService(ILogger<GracefulShutdownService> logger, IServiceProvider sp)
{
_logger = logger;
_sp = sp;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("服务启动");
// 注册关闭回调
stoppingToken.Register(() =>
{
_logger.LogInformation("收到关闭信号,开始清理...");
});
try
{
await foreach (var item in ReadWithTimeoutAsync(stoppingToken))
{
// 每个操作都传入 stoppingToken
await ProcessItemAsync(item, stoppingToken);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("服务正常关闭");
}
finally
{
// 释放资源
await CleanupAsync();
}
}
private async IAsyncEnumerable<int> ReadWithTimeoutAsync(
[EnumeratorCancellation] CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
await Task.Delay(1000, ct);
yield return Random.Shared.Next();
}
}
private async Task ProcessItemAsync(int item, CancellationToken ct)
{
await Task.Delay(100, ct); // 模拟处理
}
private async Task CleanupAsync()
{
_logger.LogInformation("释放资源...");
await Task.Delay(500); // 确保资源释放
_logger.LogInformation("资源释放完成");
}
}
// 配置关闭超时
builder.WebHost.ConfigureKestrel(options =>
{
// Kestrel 关闭超时
options.Limits.KeepAliveTimeout = TimeSpan.FromMinutes(2);
});
builder.Services.Configure<HostOptions>(options =>
{
options.ShutdownTimeout = TimeSpan.FromSeconds(30);
});
// 关闭流程:
// 1. SIGTERM/Ctrl+C → CancellationToken 触发
// 2. 停止接收新请求
// 3. 等待正在执行的请求完成(超时 30s)
// 4. 停止所有 IHostedService(调用 StopAsync)
// 5. ApplicationStopped 事件触发优点
缺点
总结
BackgroundService 是 ASP.NET Core 后台任务的基础,通过 ExecuteAsync + CancellationToken 实现长运行服务。System.Threading.Channels 提供高性能的生产者-消费者模式,支持有界/无界通道。Coravel 提供更丰富的调度功能(Cron 表达式、防重叠执行)。错误恢复策略包括重试(指数退避)和死信队列。优雅关闭的关键是正确传播 CancellationToken,在 finally 块释放资源。对于需要持久化和分布式调度的场景,建议使用 Hangfire 或 Quartz.NET。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
适用场景
- 当你准备把《后台任务处理深入》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《后台任务处理深入》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《后台任务处理深入》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《后台任务处理深入》最大的收益和代价分别是什么?
