CQRS 与 MediatR
大约 10 分钟约 2910 字
CQRS 与 MediatR
简介
CQRS(Command Query Responsibility Segregation)是一种将读写操作分离的架构模式。MediatR 是 .NET 平台最流行的中介者模式实现库,它简化了 CQRS 的落地。通过将请求/响应解耦,MediatR 让业务逻辑更清晰、更易测试。
特点
MediatR 基本用法
安装与注册
# 安装 MediatR
dotnet add package MediatR// Program.cs 注册
// builder.Services.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(typeof(Program).Assembly));
// .NET 8+ 推荐方式
builder.Services.AddMediatR(cfg =>
{
cfg.RegisterServicesFromAssemblyContaining<Program>();
cfg.AddOpenBehavior(typeof(LoggingBehavior<,>));
cfg.AddOpenBehavior(typeof(ValidationBehavior<,>));
});Request/Response 模式
/// <summary>
/// MediatR 三种请求类型
/// </summary>
// 1. Request — 无返回值的命令
public record CreateOrderCommand(string UserId, List<OrderItemDto> Items) : IRequest;
// 2. Request<T> — 有返回值的命令
public record GetOrderQuery(int OrderId) : IRequest<OrderDto>;
// 3. Notification — 发布/订阅通知
public record OrderCreatedNotification(Guid OrderId, string UserId, decimal TotalAmount) : INotification;
// DTO
public record OrderDto(int Id, string OrderNo, decimal TotalAmount, string Status);
public record OrderItemDto(int ProductId, string ProductName, int Quantity, decimal Price);Handler 实现
/// <summary>
/// 命令 Handler — 处理写操作
/// </summary>
public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand>
{
private readonly IOrderRepository _repository;
private readonly ILogger<CreateOrderCommandHandler> _logger;
public CreateOrderCommandHandler(IOrderRepository repository, ILogger<CreateOrderCommandHandler> logger)
{
_repository = repository;
_logger = logger;
}
public async Task Handle(CreateOrderCommand request, CancellationToken ct)
{
var order = new Order
{
UserId = request.UserId,
Items = request.Items.Select(i => new OrderItem
{
ProductId = i.ProductId,
Quantity = i.Quantity,
Price = i.Price
}).ToList(),
Status = "Created"
};
await _repository.CreateAsync(order, ct);
_logger.LogInformation("订单创建成功:{OrderId}", order.Id);
}
}
/// <summary>
/// 查询 Handler — 处理读操作
/// </summary>
public class GetOrderQueryHandler : IRequestHandler<GetOrderQuery, OrderDto?>
{
private readonly IOrderRepository _repository;
public GetOrderQueryHandler(IOrderRepository repository)
{
_repository = repository;
}
public async Task<OrderDto?> Handle(GetOrderQuery request, CancellationToken ct)
{
var order = await _repository.GetByIdAsync(request.OrderId, ct);
if (order == null) return null;
return new OrderDto(order.Id, order.OrderNo, order.TotalAmount, order.Status);
}
}Controller 调用
/// <summary>
/// 使用 MediatR 的 Controller
/// </summary>
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
private readonly IMediator _mediator;
public OrdersController(IMediator mediator)
{
_mediator = mediator;
}
// 创建订单 — 发送命令
[HttpPost]
public async Task<IActionResult> Create([FromBody] CreateOrderRequest request)
{
var command = new CreateOrderCommand(request.UserId, request.Items);
await _mediator.Send(command);
return Ok(new { Message = "订单创建成功" });
}
// 查询订单 — 发送查询
[HttpGet("{id}")]
public async Task<IActionResult> GetById(int id)
{
var query = new GetOrderQuery(id);
var result = await _mediator.Send(query);
return result != null ? Ok(result) : NotFound();
}
// 带返回值的命令
[HttpPost("with-result")]
public async Task<IActionResult> CreateWithResult([FromBody] CreateOrderRequest request)
{
var command = new CreateOrderWithResultCommand(request.UserId, request.Items);
var result = await _mediator.Send(command);
return CreatedAtAction(nameof(GetById), new { id = result.Id }, result);
}
}Pipeline Behavior — 管道行为
日志管道
/// <summary>
/// 日志管道 — 自动记录所有 MediatR 请求
/// </summary>
public class LoggingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull
{
private readonly ILogger<LoggingBehavior<TRequest, TResponse>> _logger;
public LoggingBehavior(ILogger<LoggingBehavior<TRequest, TResponse>> logger)
{
_logger = logger;
}
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken ct)
{
var requestName = typeof(TRequest).Name;
_logger.LogInformation("处理请求:{Request} {@Data}", requestName, request);
var stopwatch = Stopwatch.StartNew();
try
{
var response = await next();
stopwatch.Stop();
_logger.LogInformation("请求完成:{Request},耗时 {Ms}ms", requestName, stopwatch.ElapsedMilliseconds);
return response;
}
catch (Exception ex)
{
stopwatch.Stop();
_logger.LogError(ex, "请求失败:{Request},耗时 {Ms}ms", requestName, stopwatch.ElapsedMilliseconds);
throw;
}
}
}验证管道
/// <summary>
/// 自动验证管道 — FluentValidation 集成
/// </summary>
public class ValidationBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull
{
private readonly IEnumerable<IValidator<TRequest>> _validators;
public ValidationBehavior(IEnumerable<IValidator<TRequest>> validators)
{
_validators = validators;
}
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken ct)
{
if (!_validators.Any())
return await next();
var context = new ValidationContext<TRequest>(request);
var errors = _validators
.Select(v => v.Validate(context))
.SelectMany(r => r.Errors)
.Where(f => f != null)
.ToList();
if (errors.Count != 0)
{
var errorMessages = errors.Select(e => e.ErrorMessage).ToList();
throw new ValidationException(errorMessages);
}
return await next();
}
}
// 验证器定义
public class CreateOrderCommandValidator : AbstractValidator<CreateOrderCommand>
{
public CreateOrderCommandValidator()
{
RuleFor(x => x.UserId).NotEmpty().WithMessage("用户ID不能为空");
RuleFor(x => x.Items).NotEmpty().WithMessage("订单项不能为空");
RuleForEach(x => x.Items).ChildRules(item =>
{
item.RuleFor(i => i.ProductId).GreaterThan(0);
item.RuleFor(i => i.Quantity).GreaterThan(0);
});
}
}缓存管道
/// <summary>
/// 缓存管道 — 自动缓存查询结果
/// </summary>
public class CachingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull
{
private readonly IMemoryCache _cache;
private readonly ILogger<CachingBehavior<TRequest, TResponse>> _logger;
public CachingBehavior(IMemoryCache cache, ILogger<CachingBehavior<TRequest, TResponse>> logger)
{
_cache = cache;
_logger = logger;
}
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken ct)
{
// 只缓存标记了 ICachableQuery 的请求
if (request is not ICachableQuery cachable)
return await next();
var cacheKey = cachable.CacheKey;
if (_cache.TryGetValue(cacheKey, out TResponse? cached))
{
_logger.LogDebug("缓存命中:{Key}", cacheKey);
return cached!;
}
var response = await next();
_cache.Set(cacheKey, response, cachable.Expiration ?? TimeSpan.FromMinutes(10));
return response;
}
}
public interface ICachableQuery
{
string CacheKey { get; }
TimeSpan? Expiration { get; }
}
// 使用
public record GetProductsQuery(string Category, int Page) : IRequest<List<ProductDto>>, ICachableQuery
{
public string CacheKey => $"products:{Category}:{Page}";
public TimeSpan? Expiration => TimeSpan.FromMinutes(5);
}Notification — 事件通知
发布/订阅
/// <summary>
/// Notification — 一对多事件通知
/// </summary>
// 事件定义
public record OrderCreatedEvent(Guid OrderId, string UserId, decimal Amount) : INotification;
// 处理器1:发送通知
public class SendOrderNotificationHandler : INotificationHandler<OrderCreatedEvent>
{
private readonly IEmailService _emailService;
public SendOrderNotificationHandler(IEmailService emailService)
{
_emailService = emailService;
}
public async Task Handle(OrderCreatedEvent notification, CancellationToken ct)
{
await _emailService.SendOrderConfirmationAsync(notification.UserId, notification.OrderId);
}
}
// 处理器2:更新库存
public class UpdateInventoryHandler : INotificationHandler<OrderCreatedEvent>
{
private readonly IInventoryService _inventoryService;
public UpdateInventoryHandler(IInventoryService inventoryService)
{
_inventoryService = inventoryService;
}
public async Task Handle(OrderCreatedEvent notification, CancellationToken ct)
{
await _inventoryService.ReserveForOrderAsync(notification.OrderId);
}
}
// 处理器3:记录日志
public class OrderAuditLogHandler : INotificationHandler<OrderCreatedEvent>
{
private readonly IAuditLogService _auditService;
public OrderAuditLogHandler(IAuditLogService auditService)
{
_auditService = auditService;
}
public async Task Handle(OrderCreatedEvent notification, CancellationToken ct)
{
await _auditService.LogAsync("OrderCreated", notification.OrderId.ToString());
}
}
// 在 Command Handler 中发布事件
public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand, OrderDto>
{
private readonly IMediator _mediator;
private readonly IOrderRepository _repository;
public CreateOrderCommandHandler(IMediator mediator, IOrderRepository repository)
{
_mediator = mediator;
_repository = repository;
}
public async Task<OrderDto> Handle(CreateOrderCommand request, CancellationToken ct)
{
var order = CreateOrder(request);
await _repository.CreateAsync(order, ct);
// 发布事件 — 所有 Handler 都会收到
await _mediator.Publish(new OrderCreatedEvent(order.Id, request.UserId, order.TotalAmount), ct);
return MapToDto(order);
}
}CQRS 架构分层
项目结构
OrderService/
├── Commands/ # 命令(写操作)
│ ├── CreateOrderCommand.cs
│ ├── UpdateOrderCommand.cs
│ ├── DeleteOrderCommand.cs
│ └── Handlers/
│ ├── CreateOrderCommandHandler.cs
│ └── UpdateOrderCommandHandler.cs
├── Queries/ # 查询(读操作)
│ ├── GetOrderQuery.cs
│ ├── ListOrdersQuery.cs
│ └── Handlers/
│ └── GetOrderQueryHandler.cs
├── Events/ # 领域事件
│ ├── OrderCreatedEvent.cs
│ └── Handlers/
│ ├── SendNotificationHandler.cs
│ └── UpdateInventoryHandler.cs
├── Behaviors/ # 管道行为
│ ├── LoggingBehavior.cs
│ ├── ValidationBehavior.cs
│ └── CachingBehavior.cs
└── Controllers/
└── OrdersController.csCQRS 读写分离策略
| 策略 | 写端 | 读端 | 复杂度 |
|---|---|---|---|
| 简单分离 | 同一数据库 | 同一数据库 | 低 |
| 物理分离 | 写数据库 | 读副本 | 中 |
| Event Sourcing | 事件存储 | 读模型 | 高 |
优点
缺点
总结
MediatR 是实现 CQRS 的最佳工具。通过 Command/Query 分离、Pipeline Behavior 管道、Notification 事件通知,让业务逻辑清晰可维护。中大型项目推荐使用,简单 CRUD 项目直接用 Service 即可。核心原则:写操作用 Command,读操作用 Query,横切关注点用 Pipeline。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
适用场景
- 当你准备把《CQRS 与 MediatR》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《CQRS 与 MediatR》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《CQRS 与 MediatR》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《CQRS 与 MediatR》最大的收益和代价分别是什么?
异常处理管道
/// <summary>
/// 全局异常处理管道 — 捕获未处理的异常并转换为友好响应
/// </summary>
public class ExceptionHandlingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull
{
private readonly ILogger<ExceptionHandlingBehavior<TRequest, TResponse>> _logger;
public ExceptionHandlingBehavior(ILogger<ExceptionHandlingBehavior<TRequest, TResponse>> logger)
{
_logger = logger;
}
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken ct)
{
try
{
return await next();
}
catch (ValidationException ex)
{
_logger.LogWarning("验证失败: {Errors}", string.Join(", ", ex.Errors));
throw;
}
catch (NotFoundException ex)
{
_logger.LogWarning("资源未找到: {Message}", ex.Message);
throw;
}
catch (BusinessException ex)
{
_logger.LogWarning("业务异常: {Code} - {Message}", ex.Code, ex.Message);
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, "未处理异常: {Request}", typeof(TRequest).Name);
throw;
}
}
}
// 自定义异常类型
public class NotFoundException : Exception
{
public string EntityName { get; }
public object EntityId { get; }
public NotFoundException(string entityName, object entityId)
: base($"{entityName} ({entityId}) 未找到")
{
EntityName = entityName;
EntityId = entityId;
}
}
public class BusinessException : Exception
{
public string Code { get; }
public BusinessException(string code, string message) : base(message)
{
Code = code;
}
}授权管道
/// <summary>
/// 授权管道 — 基于 Attribute 自动检查权限
/// </summary>
public class AuthorizationBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull
{
private readonly ICurrentUserService _currentUser;
private readonly ILogger<AuthorizationBehavior<TRequest, TResponse>> _logger;
public AuthorizationBehavior(
ICurrentUserService currentUser,
ILogger<AuthorizationBehavior<TRequest, TResponse>> logger)
{
_currentUser = currentUser;
_logger = logger;
}
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken ct)
{
// 检查请求上的 AuthorizeAttribute
var authAttributes = typeof(TRequest).GetCustomAttributes<AuthorizeAttribute>();
foreach (var attr in authAttributes)
{
if (string.IsNullOrEmpty(attr.Policy)) continue;
if (!_currentUser.HasPermission(attr.Policy))
{
_logger.LogWarning("用户 {UserId} 无权限 {Policy}", _currentUser.UserId, attr.Policy);
throw new ForbiddenException($"缺少权限: {attr.Policy}");
}
}
return await next();
}
}
// 使用 — 在 Command/Query 上标记权限
[Authorize(Policy = "Order.Create")]
public record CreateOrderCommand(string UserId, List<OrderItemDto> Items) : IRequest<OrderDto>;
[Authorize(Policy = "Order.View")]
public record GetOrderQuery(int OrderId) : IRequest<OrderDto?>;MediatR 与 FluentValidation 深度集成
/// <summary>
/// 验证管道增强 — 支持自定义错误码和多语言错误消息
/// </summary>
public class EnhancedValidationBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull
{
private readonly IEnumerable<IValidator<TRequest>> _validators;
private readonly ILogger<EnhancedValidationBehavior<TRequest, TResponse>> _logger;
public EnhancedValidationBehavior(
IEnumerable<IValidator<TRequest>> validators,
ILogger<EnhancedValidationBehavior<TRequest, TResponse>> logger)
{
_validators = validators;
_logger = logger;
}
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken ct)
{
if (!_validators.Any())
return await next();
var context = new ValidationContext<TRequest>(request);
// 并行执行所有验证器
var validationResults = await Task.WhenAll(
_validators.Select(v => v.ValidateAsync(context, ct))
);
var failures = validationResults
.SelectMany(r => r.Errors)
.Where(f => f != null)
.GroupBy(f => f.PropertyName)
.Select(g => new ValidationError(
g.Key,
g.Select(f => f.ErrorMessage).ToArray(),
g.First().ErrorCode
))
.ToList();
if (failures.Any())
{
_logger.LogWarning("验证失败: {Errors}", string.Join("; ", failures.Select(f => $"{f.Property}: {string.Join(", ", f.Messages)}")));
throw new ValidationException(failures.SelectMany(f => f.Messages.Select(m => new ValidationFailure(f.Property, m))).ToList());
}
return await next();
}
}
public record ValidationError(string Property, string[] Messages, string? Code = null);CQRS 进阶 — 命令调度与队列
/// <summary>
/// 异步命令调度 — 将命令发送到消息队列异步处理
/// </summary>
public interface ICommandDispatcher
{
Task DispatchAsync<TCommand>(TCommand command, CancellationToken ct = default) where TCommand : class, IRequest;
Task<TResult> DispatchAsync<TResult>(IRequest<TResult> command, CancellationToken ct = default);
}
public class RabbitMqCommandDispatcher : ICommandDispatcher
{
private readonly IMediator _mediator;
private readonly IRabbitMqPublisher _publisher;
private readonly ILogger<RabbitMqCommandDispatcher> _logger;
public RabbitMqCommandDispatcher(IMediator mediator, IRabbitMqPublisher publisher, ILogger<RabbitMqCommandDispatcher> logger)
{
_mediator = mediator;
_publisher = publisher;
_logger = logger;
}
public async Task DispatchAsync<TCommand>(TCommand command, CancellationToken ct) where TCommand : class, IRequest
{
var queueAttribute = typeof(TCommand).GetCustomAttribute<QueueAttribute>();
if (queueAttribute != null)
{
// 异步处理 — 发送到消息队列
_logger.LogInformation("发送命令到队列 {Queue}: {Command}", queueAttribute.QueueName, typeof(TCommand).Name);
await _publisher.PublishAsync(queueAttribute.QueueName, command, ct);
}
else
{
// 同步处理 — 直接执行
await _mediator.Send(command, ct);
}
}
public async Task<TResult> DispatchAsync<TResult>(IRequest<TResult> command, CancellationToken ct)
{
return await _mediator.Send(command, ct);
}
}
// 标记为异步队列处理
[Queue("order-commands")]
public record CreateOrderCommand(string UserId, List<OrderItemDto> Items) : IRequest;