定时任务深入
大约 10 分钟约 2866 字
定时任务深入
简介
定时任务不是"把代码定时跑起来"这么简单,它本质上是在后台用稳定、可追踪、可恢复的方式执行周期性工作。真实项目里常见的难点不在 CRON 语法,而在重复执行、错过执行、分布式多实例抢占、失败重试、任务幂等和监控告警。
定时任务在系统中的位置
定时任务触发源:
1. BackgroundService — 简单周期循环
2. Hangfire — 持久化队列 + Dashboard
3. Quartz.NET — 复杂调度 + 集群协调
4. Cron Job (K8s) — 容器化调度
定时任务类型:
- 周期性任务:缓存预热、数据同步、报表生成
- 一次性任务:数据迁移、批量导入、系统初始化
- 延迟任务:发送通知、超时取消、延迟处理
- 队列任务:邮件发送、文件处理、外部调用
关键挑战:
- 幂等性 — 重复执行不产生副作用
- 分布式协调 — 多实例不重复执行
- 故障恢复 — 失败后自动重试或人工补跑
- 可观测性 — 执行状态、耗时、成功率可监控特点
实现
BackgroundService:轻量级周期任务
// ============================================
// BackgroundService — 最简单的后台任务
// ============================================
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
/// <summary>
/// 缓存预热任务 — 每 5 分钟刷新热门商品缓存
/// </summary>
public class CacheWarmupWorker : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<CacheWarmupWorker> _logger;
public CacheWarmupWorker(IServiceScopeFactory scopeFactory, ILogger<CacheWarmupWorker> logger)
{
_scopeFactory = scopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 启动时立即执行一次
await WarmupAsync(stoppingToken);
using var timer = new PeriodicTimer(TimeSpan.FromMinutes(5));
while (!stoppingToken.IsCancellationRequested)
{
try
{
await timer.WaitForNextTickAsync(stoppingToken);
await WarmupAsync(stoppingToken);
}
catch (OperationCanceledException)
{
_logger.LogInformation("缓存预热任务已取消");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "缓存预热任务执行失败");
}
}
}
private async Task WarmupAsync(CancellationToken cancellationToken)
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var cache = scope.ServiceProvider.GetRequiredService<IDistributedCache>();
var sw = Stopwatch.StartNew();
var hotProducts = await db.Products.AsNoTracking()
.Where(x => x.IsHot && x.IsActive)
.OrderByDescending(x => x.Sales)
.Take(50)
.Select(x => new { x.Id, x.Name, x.Price, x.ImageUrl })
.ToListAsync(cancellationToken);
await cache.SetStringAsync(
"hot-products",
JsonSerializer.Serialize(hotProducts),
new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(10)
},
cancellationToken);
sw.Stop();
_logger.LogInformation(
"缓存预热完成: Count={Count}, DurationMs={DurationMs}",
hotProducts.Count, sw.ElapsedMilliseconds);
}
}
// Program.cs
builder.Services.AddHostedService<CacheWarmupWorker>();// ============================================
// 更复杂的周期任务 — 带超时和重试
// ============================================
public class DataSyncWorker : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<DataSyncWorker> _logger;
public DataSyncWorker(IServiceScopeFactory scopeFactory, ILogger<DataSyncWorker> logger)
{
_scopeFactory = scopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 每 30 分钟执行一次
using var timer = new PeriodicTimer(TimeSpan.FromMinutes(30));
while (!stoppingToken.IsCancellationRequested)
{
try
{
await timer.WaitForNextTickAsync(stoppingToken);
// 使用 CancellationTokenSource 设置任务级超时
using var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
cts.CancelAfter(TimeSpan.FromMinutes(5)); // 单次任务最多 5 分钟
await SyncDataAsync(cts.Token);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break; // 应用关闭
}
catch (OperationCanceledException)
{
_logger.LogWarning("数据同步任务超时(5 分钟)");
}
catch (Exception ex)
{
_logger.LogError(ex, "数据同步任务执行失败");
}
}
}
private async Task SyncDataAsync(CancellationToken cancellationToken)
{
using var scope = _scopeFactory.CreateScope();
var syncService = scope.ServiceProvider.GetRequiredService<IDataSyncService>();
_logger.LogInformation("开始数据同步...");
var result = await syncService.SyncAsync(cancellationToken);
_logger.LogInformation("数据同步完成: Synced={Synced}, Failed={Failed}",
result.SyncedCount, result.FailedCount);
}
}
// 适合轻量任务:缓存预热、配置同步、清理临时文件
// 不适合:复杂重试、持久化调度、人工补跑、Dashboard 管理Hangfire:持久化队列与周期任务
// ============================================
// NuGet: Hangfire.AspNetCore, Hangfire.SqlServer
// ============================================
using Hangfire;
using Hangfire.SqlServer;
// Program.cs
builder.Services.AddHangfire(config => config
.SetDataCompatibilityLevel(CompatibilityLevel.Version_180)
.UseSimpleAssemblyNameTypeSerializer()
.UseRecommendedSerializerSettings()
.UseSqlServerStorage(builder.Configuration.GetConnectionString("Hangfire")!, new SqlServerStorageOptions
{
CommandBatchMaxTimeout = TimeSpan.FromMinutes(5),
SlidingInvisibilityTimeout = TimeSpan.FromMinutes(5),
QueuePollInterval = TimeSpan.FromSeconds(5),
UseRecommendedIsolationLevel = true,
DisableGlobalLocks = true
}));
// 注册 Hangfire Server(处理任务的进程)
builder.Services.AddHangfireServer(options =>
{
options.Queues = new[] { "default", "critical", "reports" };
options.WorkerCount = Math.Max(Environment.ProcessorCount, 4);
options.SchedulePollingInterval = TimeSpan.FromSeconds(15);
options.ServerTimeout = TimeSpan.FromMinutes(5);
});
var app = builder.Build();
// Dashboard(生产环境需要鉴权)
app.UseHangfireDashboard("/hangfire", new DashboardOptions
{
Authorization = new[] { new HangfireDashboardAuthorizationFilter() },
StatsPollingInterval = TimeSpan.FromSeconds(10)
});
// ============================================
// 注册周期任务
// ============================================
RecurringJob.AddOrUpdate<ReportJob>(
"daily-sales-report",
job => job.GenerateDailySalesReportAsync(),
"0 2 * * *", // 每天凌晨 2 点
new RecurringJobOptions
{
TimeZone = TimeZoneInfo.FindSystemTimeZoneById("China Standard Time"),
MisfireHandling = MisfireHandlingMode.Ignore
});
RecurringJob.AddOrUpdate<CacheJob>(
"cache-warmup",
job => job.WarmupCacheAsync(),
"*/5 * * * *", // 每 5 分钟
new RecurringJobOptions { Queue = "default" });
RecurringJob.AddOrUpdate<DataSyncJob>(
"inventory-sync",
job => job.SyncInventoryAsync(),
Cron.Minutely, // 每分钟
new RecurringJobOptions { Queue = "critical" });
// ============================================
// 一次性任务(Fire-and-Forget)
// ============================================
BackgroundJob.Enqueue<IEmailService>(x => x.SendWelcomeEmailAsync(userId, cancellationToken: default));
// ============================================
// 延迟任务(Schedule)
// ============================================
BackgroundJob.Schedule<IOrderService>(
x => x.CancelUnpaidOrdersAsync(cancellationToken: default),
TimeSpan.FromMinutes(30)); // 30 分钟后执行
// ============================================
// 延迟队列(Continuations)
// ============================================
var jobId = BackgroundJob.Enqueue<IImportService>(x => x.ImportDataAsync(fileId));
BackgroundJob.ContinueJobWith<IValidationService>(jobId,
x => x.ValidateImportedDataAsync(cancellationToken: default),
JobContinuationOptions.OnlyOnSucceededState);// ============================================
// 任务实现
// ============================================
public class ReportJob
{
private readonly AppDbContext _dbContext;
private readonly IEmailService _emailService;
private readonly ILogger<ReportJob> _logger;
public ReportJob(
AppDbContext dbContext,
IEmailService emailService,
ILogger<ReportJob> logger)
{
_dbContext = dbContext;
_emailService = emailService;
_logger = logger;
}
[AutomaticRetry(Attempts = 3, OnAttemptsExceeded = AttemptsExceededAction.Fail)]
[Queue("critical")]
public async Task GenerateDailySalesReportAsync(CancellationToken cancellationToken = default)
{
var yesterday = DateOnly.FromDateTime(DateTime.Today.AddDays(-1));
var start = yesterday.ToDateTime(TimeOnly.MinValue);
var end = yesterday.AddDays(1).ToDateTime(TimeOnly.MinValue);
var report = await _dbContext.Orders
.Where(x => x.CreatedAt >= start && x.CreatedAt < end)
.GroupBy(x => 1)
.Select(g => new DailySalesReport
{
Date = yesterday,
OrderCount = g.Count(),
TotalAmount = g.Sum(x => x.Amount),
AvgAmount = g.Average(x => x.Amount)
})
.FirstOrDefaultAsync(cancellationToken);
if (report != null)
{
await SendReportAsync(report, cancellationToken);
}
_logger.LogInformation(
"日报生成完成: Date={Date}, Count={Count}, Total={Total}",
yesterday, report?.OrderCount ?? 0, report?.TotalAmount ?? 0);
}
private async Task SendReportAsync(DailySalesReport report, CancellationToken ct)
{
var recipients = await _dbContext.Users
.Where(u => u.Role == "Manager" && u.IsActive)
.Select(u => u.Email)
.ToListAsync(ct);
foreach (var email in recipients)
{
await _emailService.SendEmailAsync(email, "每日销售日报", JsonSerializer.Serialize(report), ct);
}
}
}
public class DailySalesReport
{
public DateOnly Date { get; set; }
public int OrderCount { get; set; }
public decimal TotalAmount { get; set; }
public decimal AvgAmount { get; set; }
}Quartz.NET:复杂调度与集群场景
// ============================================
// NuGet: Quartz, Quartz.Extensions.Hosting, Quartz.Serialization.Json
// ============================================
using Quartz;
using Quartz.Impl;
using Quartz.Logging;
builder.Services.AddQuartz(options =>
{
options.UseMicrosoftDependencyInjectionJobFactory();
// 注册任务
var inventoryJobKey = new JobKey("inventory-sync-job", "sync-group");
options.AddJob<InventorySyncJob>(inventoryJobKey, job => job
.StoreDurably()
.WithDescription("库存同步任务")
.WithIdentity(inventoryJobKey));
// 注册触发器 — 每 10 分钟
options.AddTrigger(trigger => trigger
.ForJob(inventoryJobKey)
.WithIdentity("inventory-sync-trigger")
.WithCronSchedule("0 */10 * * * ?")
.WithDescription("每 10 分钟同步库存"));
// 多触发器 — 每天凌晨 3 点全量同步
options.AddTrigger(trigger => trigger
.ForJob(inventoryJobKey)
.WithIdentity("inventory-full-sync-trigger")
.WithCronSchedule("0 0 3 * * ?")
.WithDescription("每天凌晨 3 点全量同步库存"));
// 持久化存储(集群协调)
options.UsePersistentStore(store =>
{
store.UseProperties = true;
store.UseSqlServer(builder.Configuration.GetConnectionString("Quartz")!);
store.UseClustering(); // 启用集群协调
store.UseJsonSerializer();
});
});
builder.Services.AddQuartzHostedService(options =>
{
options.WaitForJobsToComplete = true;
options.AwaitApplicationStarted = true;
});// ============================================
// Quartz 任务实现
// ============================================
[DisallowConcurrentExecution] // 禁止并发执行
[PersistJobDataAfterExecution] // 执行后保存状态
public class InventorySyncJob : IJob
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<InventorySyncJob> _logger;
public InventorySyncJob(IServiceScopeFactory scopeFactory, ILogger<InventorySyncJob> logger)
{
_scopeFactory = scopeFactory;
_logger = logger;
}
public async Task Execute(IJobExecutionContext context)
{
using var scope = _scopeFactory.CreateScope();
var inventoryService = scope.ServiceProvider.GetRequiredService<IInventoryService>();
var fireTime = context.FireTimeUtc;
_logger.LogInformation("库存同步开始: FireTime={FireTime}", fireTime);
try
{
var result = await inventoryService.SyncAsync(context.CancellationToken);
// 保存执行状态到 JobDataMap
context.JobDetail.JobDataMap["LastSyncTime"] = DateTime.UtcNow.ToString("O");
context.JobDetail.JobDataMap["LastSyncCount"] = result.SyncedCount;
_logger.LogInformation("库存同步完成: Synced={Synced}, Failed={Failed}",
result.SyncedCount, result.FailedCount);
}
catch (Exception ex)
{
_logger.LogError(ex, "库存同步失败");
throw new JobExecutionException("库存同步失败", false); // false = 要求重试
}
}
}分布式锁 — 防止多实例重复执行
// ============================================
// 分布式锁 — 基于 Redis 的简单实现
// ============================================
public class DistributedLockProvider
{
private readonly IDistributedCache _cache;
private readonly ILogger<DistributedLockProvider> _logger;
public DistributedLockProvider(IDistributedCache cache, ILogger<DistributedLockProvider> logger)
{
_cache = cache;
_logger = logger;
}
public async Task<IDisposable?> AcquireLockAsync(
string lockKey,
TimeSpan expiration,
CancellationToken ct = default)
{
var value = Guid.NewGuid().ToString("N");
var acquired = await _cache.SetStringAsync(
$"lock:{lockKey}", value,
new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = expiration },
ct);
if (acquired == null)
{
// 锁已被其他实例持有
_logger.LogDebug("获取锁失败: LockKey={LockKey}", lockKey);
return null;
}
return new DistributedLockReleaser(_cache, $"lock:{lockKey}", value, _logger);
}
}
public class DistributedLockReleaser : IDisposable
{
private readonly IDistributedCache _cache;
private readonly string _key;
private readonly string _value;
private readonly ILogger _logger;
private bool _released;
public DistributedLockReleaser(IDistributedCache cache, string key, string value, ILogger logger)
{
_cache = cache;
_key = key;
_value = value;
_logger = logger;
}
public void Dispose()
{
if (!_released)
{
_cache.Remove(_key);
_released = true;
_logger.LogDebug("释放锁: LockKey={LockKey}", _key);
}
}
}
// 使用示例
public class SafeDataSyncWorker : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var timer = new PeriodicTimer(TimeSpan.FromMinutes(10));
while (!stoppingToken.IsCancellationRequested)
{
await timer.WaitForNextTickAsync(stoppingToken);
using var lockScope = await _lockProvider.AcquireLockAsync(
"data-sync-job", TimeSpan.FromMinutes(15), stoppingToken);
if (lockScope == null)
{
_logger.LogInformation("跳过执行 — 锁已被其他实例持有");
continue;
}
await SyncDataAsync(stoppingToken);
}
}
}任务监控
// ============================================
// 任务执行日志记录
// ============================================
public class JobExecutionLog
{
public long Id { get; set; }
public string JobName { get; set; } = string.Empty;
public DateTimeOffset StartedAt { get; set; }
public DateTimeOffset? CompletedAt { get; set; }
public TimeSpan? Duration => CompletedAt - StartedAt;
public bool Success { get; set; }
public string? ErrorMessage { get; set; }
public string? ServerName { get; set; }
}
public class JobExecutionLogger
{
private readonly AppDbContext _context;
public JobExecutionLogger(AppDbContext context)
{
_context = context;
}
public async Task LogStartAsync(string jobName)
{
_context.JobExecutionLogs.Add(new JobExecutionLog
{
JobName = jobName,
StartedAt = DateTimeOffset.UtcNow,
ServerName = Environment.MachineName
});
await _context.SaveChangesAsync();
}
public async Task LogSuccessAsync(string jobName, long logId)
{
var log = await _context.JobExecutionLogs.FindAsync(logId);
if (log != null)
{
log.CompletedAt = DateTimeOffset.UtcNow;
log.Success = true;
await _context.SaveChangesAsync();
}
}
public async Task LogFailureAsync(string jobName, long logId, Exception ex)
{
var log = await _context.JobExecutionLogs.FindAsync(logId);
if (log != null)
{
log.CompletedAt = DateTimeOffset.UtcNow;
log.Success = false;
log.ErrorMessage = ex.Message;
await _context.SaveChangesAsync();
}
}
}优点
缺点
总结
定时任务的重点不是选 Hangfire 还是 Quartz,而是先把任务语义设计清楚:多久执行一次、失败怎么办、是否允许并发、是否需要补跑、是否必须幂等。轻量周期任务可先用 BackgroundService,复杂调度和可视化治理则优先考虑 Hangfire 或 Quartz。分布式部署必须使用分布式锁或持久化存储协调。
关键知识点
- 任务调度和任务执行要分开思考,失败重试也要单独设计。
- 所有定时任务都应默认假设"可能重复执行"。
- 分布式部署下必须考虑抢占、锁、幂等和时钟一致性。
- 任务上线前要明确监控指标、失败告警和人工补跑流程。
项目落地视角
- 每日对账、库存同步、报表生成适合用持久化调度方案。
- 缓存预热、临时数据清理适合先用 BackgroundService。
- 大批量任务拆成队列作业,避免单个任务执行过久。
- 对外副作用任务(发券、短信、回调)必须设计幂等键。
常见误区
- 只写 CRON,不考虑失败重试和补偿方案。
- 多实例部署后每台机器都执行一次同一个任务。
- 长时间任务没有超时与取消机制,导致堆积。
- 没有 Dashboard、日志与告警,上线后只能靠猜。
进阶路线
- 学习 Hangfire 队列隔离、批处理、Continuation Job。
- 学习 Quartz Misfire、Calendar、Cluster 模式。
- 为任务体系增加 Outbox、审计日志和执行追踪。
- 把定时任务治理接入统一运维平台与权限系统。
适用场景
- 周期性清理、同步、统计、报表任务。
- 数据补偿、异步重试、离线批处理。
- 多租户系统的分租户定时执行需求。
- 需要可视化运维和人工补跑的业务任务平台。
落地建议
- 每个任务都定义唯一任务名、执行频率、超时时间和失败策略。
- 所有任务都做幂等设计,至少做到"重复执行不造成数据错乱"。
- 长任务拆小、重任务异步化、批量任务分片执行。
- 为任务建立执行日志、失败告警、人工补跑和停用机制。
排错清单
- 检查调度器服务是否启动、连接串是否正确。
- 检查 CRON 表达式、时区和下一次触发时间是否符合预期。
- 检查任务是否被并发执行、是否存在数据库锁等待。
- 检查失败日志、重试次数、队列堆积和外部依赖超时。
复盘问题
- 当前任务如果重复执行 2 次,会不会造成数据或副作用错误?
- 任务失败后是自动重试、人工补跑还是允许跳过?
- 多实例部署后,如何证明任务没有重复执行?
- 你能否从日志和 Dashboard 快速判断任务卡在哪一层?
