事件驱动架构
大约 9 分钟约 2690 字
事件驱动架构
简介
事件驱动架构(Event-Driven Architecture)通过事件的发布和订阅实现服务间的松耦合通信。理解事件溯源(Event Sourcing)、CQRS 模式和消息可靠性保证,有助于构建高可用的异步处理系统。
特点
领域事件
事件设计与发布
// 领域事件基类
public interface IDomainEvent
{
Guid EventId { get; }
DateTime OccurredAt { get; }
}
public abstract record DomainEvent : IDomainEvent
{
public Guid EventId { get; init; } = Guid.NewGuid();
public DateTime OccurredAt { get; init; } = DateTime.UtcNow;
}
// 领域事件定义
public record OrderCreatedEvent(
Guid OrderId,
Guid UserId,
List<OrderItemData> Items,
decimal TotalAmount) : DomainEvent;
public record OrderPaidEvent(
Guid OrderId,
string TransactionId,
decimal Amount) : DomainEvent;
public record OrderShippedEvent(
Guid OrderId,
string TrackingNumber,
DateTime ShippedAt) : DomainEvent;
public record OrderCancelledEvent(
Guid OrderId,
string Reason) : DomainEvent;
// 聚合根基类 — 支持领域事件收集
public abstract class AggregateRoot
{
private readonly List<IDomainEvent> _domainEvents = new();
public IReadOnlyList<IDomainEvent> DomainEvents => _domainEvents.AsReadOnly();
protected void AddDomainEvent(IDomainEvent @event) => _domainEvents.Add(@event);
public void ClearDomainEvents() => _domainEvents.Clear();
}
// Order 聚合根
public class Order : AggregateRoot
{
public Guid Id { get; private set; }
public Guid UserId { get; private set; }
public OrderStatus Status { get; private set; }
public List<OrderItem> Items { get; private set; } = new();
public decimal TotalAmount { get; private set; }
public string? PaymentTransactionId { get; private set; }
public string? TrackingNumber { get; private set; }
// 工厂方法
public static Order Create(Guid userId, List<OrderItemData> items)
{
var order = new Order
{
Id = Guid.NewGuid(),
UserId = userId,
Status = OrderStatus.Created,
Items = items.Select(i => new OrderItem(i.ProductId, i.ProductName, i.Price, i.Quantity)).ToList(),
TotalAmount = items.Sum(i => i.Price * i.Quantity)
};
// 发布领域事件
order.AddDomainEvent(new OrderCreatedEvent(
order.Id, userId, items, order.TotalAmount));
return order;
}
public void MarkAsPaid(string transactionId)
{
if (Status != OrderStatus.Created)
throw new DomainException("只有已创建的订单才能支付");
Status = OrderStatus.Paid;
PaymentTransactionId = transactionId;
AddDomainEvent(new OrderPaidEvent(Id, transactionId, TotalAmount));
}
public void Ship(string trackingNumber)
{
if (Status != OrderStatus.Paid)
throw new DomainException("只有已支付的订单才能发货");
Status = OrderStatus.Shipped;
TrackingNumber = trackingNumber;
AddDomainEvent(new OrderShippedEvent(Id, trackingNumber, DateTime.UtcNow));
}
public void Cancel(string reason)
{
if (Status is OrderStatus.Shipped or OrderStatus.Delivered)
throw new DomainException("已发货或已签收的订单不能取消");
Status = OrderStatus.Cancelled;
AddDomainEvent(new OrderCancelledEvent(Id, reason));
}
}
// 领域事件发布器
public interface IDomainEventPublisher
{
Task PublishAsync(IEnumerable<IDomainEvent> events, CancellationToken ct = default);
}
// 通过 EF Core SaveChanges 自动发布
public class EventPublishingDbContext : DbContext
{
private readonly IDomainEventPublisher _eventPublisher;
public EventPublishingDbContext(DbContextOptions options, IDomainEventPublisher eventPublisher)
: base(options)
{
_eventPublisher = eventPublisher;
}
public override async Task<int> SaveChangesAsync(CancellationToken ct = default)
{
// 收集所有聚合根的事件
var aggregates = ChangeTracker.Entries<AggregateRoot>()
.Where(e => e.Entity.DomainEvents.Any())
.Select(e => e.Entity)
.ToList();
var events = aggregates.SelectMany(a => a.DomainEvents).ToList();
// 先保存数据
var result = await base.SaveChangesAsync(ct);
// 再发布事件
if (events.Any())
{
await _eventPublisher.PublishAsync(events, ct);
aggregates.ForEach(a => a.ClearDomainEvents());
}
return result;
}
}事件溯源
事件存储与重建
// 事件存储模型
public class EventStore
{
private readonly IEventStoreRepository _repository;
private readonly ILogger<EventStore> _logger;
public EventStore(IEventStoreRepository repository, ILogger<EventStore> logger)
{
_repository = repository;
_logger = logger;
}
// 追加事件
public async Task AppendAsync(Guid aggregateId, IDomainEvent @event, int expectedVersion, CancellationToken ct)
{
// 乐观并发检查
var currentVersion = await _repository.GetVersionAsync(aggregateId, ct);
if (currentVersion != expectedVersion)
{
throw new ConcurrencyException(
$"并发冲突: 聚合 {aggregateId} 期望版本 {expectedVersion},实际版本 {currentVersion}");
}
var storedEvent = new StoredEvent
{
Id = Guid.NewGuid(),
AggregateId = aggregateId,
EventType = @event.GetType().AssemblyQualifiedName!,
EventData = JsonSerializer.Serialize(@event),
Version = expectedVersion + 1,
Timestamp = DateTime.UtcNow
};
await _repository.AppendAsync(storedEvent, ct);
_logger.LogInformation("事件已存储: {Type} (v{Version})", @event.GetType().Name, storedEvent.Version);
}
// 从事件重建聚合
public async Task<T?> LoadAsync<T>(Guid aggregateId, CancellationToken ct) where T : AggregateRoot, new()
{
var events = await _repository.GetEventsAsync(aggregateId, ct);
if (!events.Any()) return null;
var aggregate = new T();
foreach (var storedEvent in events.OrderBy(e => e.Version))
{
var @event = DeserializeEvent(storedEvent);
aggregate.Apply(@event);
}
return aggregate;
}
private IDomainEvent DeserializeEvent(StoredEvent stored)
{
var type = Type.GetType(stored.EventType)!;
return (IDomainEvent)JsonSerializer.Deserialize(stored.EventData, type)!;
}
}
// 事件存储实体
public class StoredEvent
{
public Guid Id { get; set; }
public Guid AggregateId { get; set; }
public string EventType { get; set; } = "";
public string EventData { get; set; } = "";
public int Version { get; set; }
public DateTime Timestamp { get; set; }
}
// Order 聚合根支持事件回放
public class Order : AggregateRoot
{
// 状态字段同上...
// 应用事件(状态重建)
public void Apply(IDomainEvent @event)
{
switch (@event)
{
case OrderCreatedEvent e:
Id = e.OrderId;
UserId = e.UserId;
Items = e.Items.Select(i => new OrderItem(i.ProductId, i.ProductName, i.Price, i.Quantity)).ToList();
TotalAmount = e.TotalAmount;
Status = OrderStatus.Created;
break;
case OrderPaidEvent e:
PaymentTransactionId = e.TransactionId;
Status = OrderStatus.Paid;
break;
case OrderShippedEvent e:
TrackingNumber = e.TrackingNumber;
Status = OrderStatus.Shipped;
break;
case OrderCancelledEvent:
Status = OrderStatus.Cancelled;
break;
}
}
}
// 事件快照(优化重建性能)
public class OrderSnapshot
{
public Guid OrderId { get; set; }
public Guid UserId { get; set; }
public OrderStatus Status { get; set; }
public decimal TotalAmount { get; set; }
public string? PaymentTransactionId { get; set; }
public int Version { get; set; }
}
public class SnapshotStore
{
private readonly ISnapshotRepository _repository;
// 每 100 个事件保存一次快照
public async Task<Order> LoadWithSnapshotAsync(Guid orderId, CancellationToken ct)
{
// 先加载最新快照
var snapshot = await _repository.GetLatestSnapshotAsync<OrderSnapshot>(orderId, ct);
var aggregate = new Order();
if (snapshot != null)
{
// 从快照恢复状态
aggregate.ApplySnapshot(snapshot);
}
// 只加载快照之后的事件
var fromVersion = snapshot?.Version ?? 0;
var events = await _repository.GetEventsAfterVersionAsync(orderId, fromVersion, ct);
foreach (var @event in events)
{
aggregate.Apply(@event);
}
return aggregate;
}
}CQRS 模式
读写分离
// CQRS — Command Query Responsibility Segregation
// 写操作(Command)走主库,读操作(Query)走只读副本或物化视图
// Command 端(写)
public record CreateOrderCommand(Guid UserId, List<OrderItemData> Items);
public record PayOrderCommand(Guid OrderId, string TransactionId);
// Query 端(读)
public record GetOrderByIdQuery(Guid OrderId);
public record GetOrdersByUserQuery(Guid UserId, int Page, int PageSize);
// Command Handler
public class OrderCommandHandler
{
private readonly EventStore _eventStore;
private readonly IOrderReadRepository _readRepo;
public async Task<Guid> HandleAsync(CreateOrderCommand command, CancellationToken ct)
{
// 写入事件存储
var order = Order.Create(command.UserId, command.Items);
foreach (var evt in order.DomainEvents)
{
await _eventStore.AppendAsync(order.Id, evt, 0, ct);
}
// 更新读模型(最终一致性)
await _readRepo.UpdateReadModelAsync(order, ct);
return order.Id;
}
}
// Query Handler(使用独立的读模型)
public class OrderQueryHandler
{
private readonly IOrderReadRepository _readRepo;
public async Task<OrderViewModel?> HandleAsync(GetOrderByIdQuery query, CancellationToken ct)
{
return await _readRepo.GetByIdAsync(query.OrderId, ct);
}
public async Task<PagedResult<OrderViewModel>> HandleAsync(GetOrdersByUserQuery query, CancellationToken ct)
{
return await _readRepo.GetByUserAsync(query.UserId, query.Page, query.PageSize, ct);
}
}
// 读模型(优化查询性能)
public class OrderViewModel
{
public Guid Id { get; set; }
public Guid UserId { get; set; }
public string UserName { get; set; } = "";
public OrderStatus Status { get; set; }
public decimal TotalAmount { get; set; }
public List<OrderItemViewModel> Items { get; set; } = new();
public DateTime CreatedAt { get; set; }
}
// MediatR 集成
// builder.Services.AddMediatR(typeof(Program));
// app.MapPost("/api/orders", async (CreateOrderCommand cmd, IMediator mediator) =>
// Results.Created($"/api/orders/{await mediator.Send(cmd)}"));Outbox 模式
确保事件不丢失
// Outbox 模式 — 将事件持久化到数据库,后台线程发送
public class OutboxMessage
{
public Guid Id { get; set; }
public string EventType { get; set; } = "";
public string EventData { get; set; } = "";
public DateTime CreatedAt { get; set; }
public DateTime? SentAt { get; set; }
public int RetryCount { get; set; }
public string? Error { get; set; }
}
// Outbox DbContext
public class AppDbContext : DbContext
{
public DbSet<OutboxMessage> OutboxMessages => Set<OutboxMessage>();
public override async Task<int> SaveChangesAsync(CancellationToken ct = default)
{
// 收集领域事件并写入 Outbox
var aggregates = ChangeTracker.Entries<AggregateRoot>()
.Where(e => e.Entity.DomainEvents.Any())
.Select(e => e.Entity)
.ToList();
foreach (var evt in aggregates.SelectMany(a => a.DomainEvents))
{
OutboxMessages.Add(new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = evt.GetType().AssemblyQualifiedName!,
EventData = JsonSerializer.Serialize(evt),
CreatedAt = DateTime.UtcNow
});
}
aggregates.ForEach(a => a.ClearDomainEvents());
return await base.SaveChangesAsync(ct);
}
}
// Outbox 后台处理器
public class OutboxProcessor : BackgroundService
{
private readonly IServiceProvider _sp;
private readonly ILogger<OutboxProcessor> _logger;
private readonly IEventBus _eventBus;
public OutboxProcessor(IServiceProvider sp, ILogger<OutboxProcessor> logger, IEventBus eventBus)
{
_sp = sp;
_logger = logger;
_eventBus = eventBus;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
using var scope = _sp.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var pending = await db.OutboxMessages
.Where(m => m.SentAt == null && m.RetryCount < 5)
.OrderBy(m => m.CreatedAt)
.Take(50)
.ToListAsync(stoppingToken);
foreach (var message in pending)
{
try
{
var type = Type.GetType(message.EventType)!;
var evt = JsonSerializer.Deserialize(message.EventData, type)!;
await _eventBus.PublishAsync(evt, stoppingToken);
message.SentAt = DateTime.UtcNow;
}
catch (Exception ex)
{
message.RetryCount++;
message.Error = ex.Message;
_logger.LogWarning(ex, "发送 Outbox 消息失败: {Id}", message.Id);
}
}
await db.SaveChangesAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Outbox 处理异常");
}
await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken);
}
}
}幂等处理
消息去重
// 幂等处理器 — 确保重复消息不产生副作用
public class IdempotentHandler
{
private readonly AppDbContext _db;
public IdempotentHandler(AppDbContext db)
{
_db = db;
}
public async Task HandleAsync<TMessage>(
TMessage message,
string messageId,
Func<TMessage, CancellationToken, Task> handler,
CancellationToken ct) where TMessage : class
{
// 检查是否已处理
var exists = await _db.Set<ProcessedMessage>()
.AnyAsync(m => m.MessageId == messageId, ct);
if (exists)
{
return; // 已处理,跳过
}
// 执行处理逻辑
await handler(message, ct);
// 记录已处理(同一事务中)
_db.Set<ProcessedMessage>().Add(new ProcessedMessage
{
MessageId = messageId,
MessageType = typeof(TMessage).Name,
ProcessedAt = DateTime.UtcNow
});
await _db.SaveChangesAsync(ct);
}
}
public class ProcessedMessage
{
public string MessageId { get; set; } = "";
public string MessageType { get; set; } = "";
public DateTime ProcessedAt { get; set; }
}优点
缺点
总结
事件驱动架构通过领域事件实现服务间的松耦合。聚合根收集领域事件,通过 SaveChanges 自动发布。事件溯源将状态变更保存为不可变事件序列,通过事件回放重建状态。CQRS 分离读写模型,写端使用事件存储,读端使用优化的查询模型。Outbox 模式将事件持久化到数据库,后台线程发送确保不丢失。幂等处理通过消息 ID 去重,确保重复消费不产生副作用。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
适用场景
- 当你准备把《事件驱动架构》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《事件驱动架构》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《事件驱动架构》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《事件驱动架构》最大的收益和代价分别是什么?
