事件溯源与 CQRS 模式
大约 10 分钟约 2936 字
事件溯源与 CQRS 模式
简介
事件溯源(Event Sourcing)以事件序列替代传统 CRUD 存储状态。CQRS(Command Query Responsibility Segregation)分离读写模型。理解这两种模式的 C# 实现,有助于构建高性能、可审计的领域驱动设计系统。
特点
事件溯源实现
事件存储与聚合
// 领域事件基类
public interface IDomainEvent
{
Guid EventId { get; }
DateTime OccurredAt { get; }
int Version { get; }
}
public abstract record DomainEvent : IDomainEvent
{
public Guid EventId { get; init; } = Guid.NewGuid();
public DateTime OccurredAt { get; init; } = DateTime.UtcNow;
public int Version { get; init; }
}
// 银行账户事件
public record AccountOpenedEvent(Guid AccountId, string OwnerName, decimal InitialBalance) : DomainEvent;
public record MoneyDepositedEvent(Guid AccountId, decimal Amount, decimal BalanceAfter) : DomainEvent;
public record MoneyWithdrawnEvent(Guid AccountId, decimal Amount, decimal BalanceAfter) : DomainEvent;
public record AccountClosedEvent(Guid AccountId, string Reason) : DomainEvent;
// 事件存储
public class EventStore
{
private readonly IEventStoreRepository _repository;
private readonly ILogger<EventStore> _logger;
public EventStore(IEventStoreRepository repository, ILogger<EventStore> logger)
{
_repository = repository;
_logger = logger;
}
public async Task AppendAsync(Guid aggregateId, IDomainEvent @event, int expectedVersion, CancellationToken ct)
{
// 乐观并发检查
var currentVersion = await _repository.GetVersionAsync(aggregateId, ct);
if (currentVersion != expectedVersion)
throw new ConcurrencyException($"并发冲突: 期望 {expectedVersion},实际 {currentVersion}");
var stored = new StoredEvent
{
AggregateId = aggregateId,
EventType = @event.GetType().AssemblyQualifiedName!,
Data = JsonSerializer.Serialize(@event),
Version = @event.Version,
Timestamp = @event.OccurredAt
};
await _repository.AppendAsync(stored, ct);
_logger.LogInformation("事件已存储: {Type} v{Version}", @event.GetType().Name, @event.Version);
}
public async Task<List<IDomainEvent>> GetEventsAsync(Guid aggregateId, CancellationToken ct)
{
var stored = await _repository.GetEventsAsync(aggregateId, ct);
return stored.Select(Deserialize).ToList();
}
private IDomainEvent Deserialize(StoredEvent stored)
{
var type = Type.GetType(stored.EventType)!;
return (IDomainEvent)JsonSerializer.Deserialize(stored.Data, type)!;
}
}
// 聚合根 — 银行账户
public class BankAccount
{
public Guid Id { get; private set; }
public string OwnerName { get; private set; } = "";
public decimal Balance { get; private set; }
public bool IsClosed { get; private set; }
public int Version { get; private set; }
private readonly List<IDomainEvent> _uncommittedEvents = new();
public IReadOnlyList<IDomainEvent> UncommittedEvents => _uncommittedEvents.AsReadOnly();
// 工厂方法
public static BankAccount Open(Guid id, string ownerName, decimal initialBalance)
{
if (initialBalance < 0) throw new DomainException("初始余额不能为负");
var account = new BankAccount();
account.ApplyEvent(new AccountOpenedEvent(id, ownerName, initialBalance) { Version = 1 });
return account;
}
// 业务操作
public void Deposit(decimal amount)
{
if (IsClosed) throw new DomainException("账户已关闭");
if (amount <= 0) throw new DomainException("存款金额必须大于0");
ApplyEvent(new MoneyDepositedEvent(Id, amount, Balance + amount) { Version = Version + 1 });
}
public void Withdraw(decimal amount)
{
if (IsClosed) throw new DomainException("账户已关闭");
if (amount <= 0) throw new DomainException("取款金额必须大于0");
if (amount > Balance) throw new DomainException("余额不足");
ApplyEvent(new MoneyWithdrawnEvent(Id, amount, Balance - amount) { Version = Version + 1 });
}
public void Close(string reason)
{
if (IsClosed) throw new DomainException("账户已关闭");
ApplyEvent(new AccountClosedEvent(Id, reason) { Version = Version + 1 });
}
// 应用事件(状态变更)
private void ApplyEvent(IDomainEvent @event)
{
switch (@event)
{
case AccountOpenedEvent e:
Id = e.AccountId;
OwnerName = e.OwnerName;
Balance = e.InitialBalance;
break;
case MoneyDepositedEvent e:
Balance = e.BalanceAfter;
break;
case MoneyWithdrawnEvent e:
Balance = e.BalanceAfter;
break;
case AccountClosedEvent:
IsClosed = true;
break;
}
Version = @event.Version;
_uncommittedEvents.Add(@event);
}
// 从事件重建
public static BankAccount FromEvents(IEnumerable<IDomainEvent> events)
{
var account = new BankAccount();
foreach (var @event in events)
{
account.ApplyEvent(@event);
}
account._uncommittedEvents.Clear(); // 重建的事件不需要再提交
return account;
}
public void ClearUncommittedEvents() => _uncommittedEvents.Clear();
}
// 快照
public class BankAccountSnapshot
{
public Guid AccountId { get; set; }
public string OwnerName { get; set; } = "";
public decimal Balance { get; set; }
public bool IsClosed { get; set; }
public int Version { get; set; }
}
public class BankAccountRepository
{
private readonly EventStore _eventStore;
private readonly ISnapshotRepository _snapshotRepo;
private const int SnapshotInterval = 100;
public async Task<BankAccount> GetByIdAsync(Guid id, CancellationToken ct)
{
// 先加载快照
var snapshot = await _snapshotRepo.GetAsync<BankAccountSnapshot>(id, ct);
BankAccount account;
int fromVersion = 0;
if (snapshot != null)
{
account = new BankAccount
{
Id = snapshot.AccountId,
OwnerName = snapshot.OwnerName,
Balance = snapshot.Balance,
IsClosed = snapshot.IsClosed,
Version = snapshot.Version
};
fromVersion = snapshot.Version;
}
else
{
account = new BankAccount();
}
// 只加载快照之后的事件
var events = await _eventStore.GetEventsAsync(id, ct);
var newEvents = events.Where(e => e.Version > fromVersion).ToList();
foreach (var @event in newEvents)
{
// 不通过公开方法,直接应用
}
return BankAccount.FromEvents(events);
}
public async Task SaveAsync(BankAccount account, CancellationToken ct)
{
foreach (var @event in account.UncommittedEvents)
{
await _eventStore.AppendAsync(account.Id, @event, @event.Version - 1, ct);
}
// 定期快照
if (account.Version % SnapshotInterval == 0)
{
await _snapshotRepo.SaveAsync(account.Id, new BankAccountSnapshot
{
AccountId = account.Id,
OwnerName = account.OwnerName,
Balance = account.Balance,
IsClosed = account.IsClosed,
Version = account.Version
}, ct);
}
account.ClearUncommittedEvents();
}
}CQRS 实现
读写分离
// Command(写操作)
public record OpenAccountCommand(string OwnerName, decimal InitialBalance);
public record DepositCommand(Guid AccountId, decimal Amount);
public record WithdrawCommand(Guid AccountId, decimal Amount);
// Query(读操作)
public record GetAccountByIdQuery(Guid AccountId);
public record GetAccountBalanceQuery(Guid AccountId);
public record GetTransactionHistoryQuery(Guid AccountId, int Page, int PageSize);
// Command Handler
public class AccountCommandHandler
{
private readonly BankAccountRepository _repository;
public async Task<Guid> HandleAsync(OpenAccountCommand cmd)
{
var id = Guid.NewGuid();
var account = BankAccount.Open(id, cmd.OwnerName, cmd.InitialBalance);
await _repository.SaveAsync(account, CancellationToken.None);
return id;
}
public async Task HandleAsync(DepositCommand cmd)
{
var account = await _repository.GetByIdAsync(cmd.AccountId, CancellationToken.None);
account.Deposit(cmd.Amount);
await _repository.SaveAsync(account, CancellationToken.None);
}
}
// Query Handler(使用独立的读模型)
public class AccountQueryHandler
{
private readonly IReadDbContext _readDb;
public async Task<AccountReadModel?> HandleAsync(GetAccountByIdQuery query)
{
return await _readDb.Accounts
.Where(a => a.Id == query.AccountId)
.Select(a => new AccountReadModel(a.Id, a.OwnerName, a.Balance, a.Status))
.FirstOrDefaultAsync();
}
public async Task<PagedResult<TransactionReadModel>> HandleAsync(GetTransactionHistoryQuery query)
{
return await _readDb.Transactions
.Where(t => t.AccountId == query.AccountId)
.OrderByDescending(t => t.Timestamp)
.ToPagedResultAsync(query.Page, query.PageSize);
}
}
// 读模型(优化查询性能)
public class AccountReadModel
{
public Guid Id { get; set; }
public string OwnerName { get; set; } = "";
public decimal Balance { get; set; }
public string Status { get; set; } = "";
}
// 事件投影(更新读模型)
public class AccountProjection
{
private readonly IReadDbContext _readDb;
public async Task ProjectAsync(IDomainEvent @event, CancellationToken ct)
{
switch (@event)
{
case AccountOpenedEvent e:
_readDb.Accounts.Add(new AccountReadModel
{
Id = e.AccountId,
OwnerName = e.OwnerName,
Balance = e.InitialBalance,
Status = "Active"
});
break;
case MoneyDepositedEvent e:
var depositAccount = await _readDb.Accounts.FindAsync(e.AccountId);
if (depositAccount != null) depositAccount.Balance = e.BalanceAfter;
break;
case MoneyWithdrawnEvent e:
var withdrawAccount = await _readDb.Accounts.FindAsync(e.AccountId);
if (withdrawAccount != null) withdrawAccount.Balance = e.BalanceAfter;
break;
}
await _readDb.SaveChangesAsync(ct);
}
}事件版本演化
事件 Schema 演化策略
// 事件不可避免地会随业务变化,需要设计演化策略
// 原则:事件一旦写入不可修改,只能通过向上兼容的方式演化
// V1 事件
public record AccountOpenedEventV1(
Guid AccountId,
string OwnerName,
decimal InitialBalance
) : DomainEvent;
// V2 事件 — 新增字段(向上兼容)
public record AccountOpenedEventV2(
Guid AccountId,
string OwnerName,
decimal InitialBalance,
string Currency, // 新增:货币类型
string AccountType // 新增:账户类型
) : DomainEvent;
// V3 事件 — 字段含义变化(需要事件转换)
public record AccountOpenedEventV3(
Guid AccountId,
string OwnerName,
decimal InitialBalance,
string Currency,
AccountType AccountType, // 从 string 变为枚举
string OwnerId // 新增: ownerId
) : DomainEvent;
// 事件转换器 — 将旧版本事件转换为新版本
public class EventUpgrader
{
private readonly Dictionary<Type, Func<IDomainEvent, IDomainEvent>> _upgraders = new();
public EventUpgrader()
{
// 注册升级函数
_upgraders[typeof(AccountOpenedEventV1)] = UpgradeV1ToV3;
_upgraders[typeof(AccountOpenedEventV2)] = UpgradeV2ToV3;
}
public IDomainEvent Upgrade(IDomainEvent @event)
{
var eventType = Type.GetType(((dynamic)@event).EventType ?? @event.GetType().AssemblyQualifiedName!);
if (eventType != null && _upgraders.TryGetValue(eventType, out var upgrade))
{
return upgrade(@event);
}
return @event;
}
private IDomainEvent UpgradeV1ToV3(IDomainEvent @event)
{
// V1 -> V3:补充默认值
dynamic v1 = @event;
return new AccountOpenedEventV3(
v1.AccountId,
v1.OwnerName,
v1.InitialBalance,
"CNY", // 默认货币
AccountType.Personal, // 默认个人账户
string.Empty // 旧事件没有 OwnerId
) { Version = v1.Version };
}
private IDomainEvent UpgradeV2ToV3(IDomainEvent @event)
{
// V2 -> V3:AccountType 从 string 转枚举
dynamic v2 = @event;
return new AccountOpenedEventV3(
v2.AccountId,
v2.OwnerName,
v2.InitialBalance,
v2.Currency,
Enum.Parse<AccountType>(v2.AccountType),
string.Empty
) { Version = v2.Version };
}
}MediatR 集成 CQRS
使用 MediatR 实现 CQRS 管道
// dotnet add package MediatR
// Command 基类
public record OpenAccountCommand(string OwnerName, decimal InitialBalance)
: IRequest<Guid>;
// Command Handler
public class OpenAccountCommandHandler : IRequestHandler<OpenAccountCommand, Guid>
{
private readonly BankAccountRepository _repository;
private readonly IMediator _mediator;
public async Task<Guid> Handle(OpenAccountCommand request, CancellationToken ct)
{
var id = Guid.NewGuid();
var account = BankAccount.Open(id, request.OwnerName, request.InitialBalance);
await _repository.SaveAsync(account, ct);
// 发布领域事件(触发投影等副作用)
foreach (var @event in account.UncommittedEvents)
{
await _mediator.Publish(new DomainEventNotification(@event), ct);
}
return id;
}
}
// 领域事件通知
public record DomainEventNotification(IDomainEvent DomainEvent) : INotification;
// 事件处理器 — 更新读模型
public class AccountProjectionHandler : INotificationHandler<DomainEventNotification>
{
private readonly IReadDbContext _readDb;
public async Task Handle(DomainEventNotification notification, CancellationToken ct)
{
var @event = notification.DomainEvent;
// 投影逻辑...
await _readDb.SaveChangesAsync(ct);
}
}
// 注册 MediatR
builder.Services.AddMediatR(cfg =>
{
cfg.RegisterServicesFromAssembly(typeof(Program).Assembly);
cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));
cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));
});
// 管道行为 — 验证
public class ValidationBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : IRequest<TResponse>
{
private readonly IEnumerable<IValidator<TRequest>> _validators;
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next,
CancellationToken ct)
{
var failures = _validators
.Select(v => v.Validate(request))
.SelectMany(r => r.Errors)
.Where(e => e != null)
.ToList();
if (failures.Count != 0)
throw new ValidationException(failures);
return await next();
}
}事件溯源性能考量
快照与分区优化
// 1. 聚合分区策略 — 按聚合 ID 分区存储事件
public class PartitionedEventStore
{
private readonly IEventStoreRepository _repository;
// 按时间范围查询事件(用于回放和审计)
public async Task<List<StoredEvent>> GetEventsByTimeRangeAsync(
DateTime from, DateTime to, CancellationToken ct)
{
return await _repository.GetEventsByTimeRangeAsync(from, to, ct);
}
// 按聚合类型查询事件
public async Task<List<StoredEvent>> GetEventsByAggregateTypeAsync(
string aggregateType, CancellationToken ct)
{
return await _repository.GetEventsByAggregateTypeAsync(aggregateType, ct);
}
}
// 2. 事件压缩 — 历史事件归档
public class EventArchiveService
{
private readonly IEventStoreRepository _eventStore;
private readonly IArchiveRepository _archive;
private readonly ILogger<EventArchiveService> _logger;
public async Task ArchiveOldEventsAsync(int olderThanDays, CancellationToken ct)
{
var cutoffDate = DateTime.UtcNow.AddDays(-olderThanDays);
// 获取需要归档的事件
var oldEvents = await _eventStore.GetEventsOlderThanAsync(cutoffDate, ct);
_logger.LogInformation("找到 {Count} 条需要归档的事件", oldEvents.Count);
// 压缩并写入归档存储
var compressed = CompressEvents(oldEvents);
await _archive.StoreAsync(compressed, ct);
// 从主存储中删除已归档事件
await _eventStore.DeleteEventsAsync(oldEvents.Select(e => e.Id).ToList(), ct);
}
private byte[] CompressEvents(List<StoredEvent> events)
{
var json = JsonSerializer.Serialize(events);
using var output = new MemoryStream();
using (var gzip = new System.IO.Compression.GZipStream(output, System.IO.Compression.CompressionLevel.Optimal))
{
var bytes = Encoding.UTF8.GetBytes(json);
gzip.Write(bytes, 0, bytes.Length);
}
return output.ToArray();
}
}
// 3. 事件存储的常用数据库选择
// - EventStoreDB:专用事件存储数据库,内置投影和订阅
// - PostgreSQL + JSONB:关系型数据库,灵活的 JSON 存储
// - MongoDB:文档型数据库,天然适合事件存储
// - SQL Server:企业级关系型数据库,事务支持完善优点
缺点
总结
事件溯源以不可变的事件序列替代传统的状态存储,通过事件回放重建聚合状态。聚合根通过 ApplyEvent 方法处理状态变更并收集未提交事件。快照机制定期保存状态,避免重建大量事件。CQRS 分离命令(写)和查询(读)模型,写端使用事件存储,读端使用优化的查询模型。事件投影将事件转换为读模型,保持最终一致性。建议在需要完整审计日志的领域(金融、医疗)使用事件溯源。
关键知识点
- 模式不是目标,降低耦合和控制变化才是目标。
- 先找变化点、稳定点和协作边界,再决定是否引入模式。
- 同一个模式在不同规模下的收益和代价差异很大。
项目落地视角
- 优先画出参与对象、依赖方向和调用链,再落到代码。
- 把模式放到一个真实场景里,比如支付、规则引擎、工作流或插件扩展。
- 配合单元测试或契约测试,保证重构后的行为没有漂移。
常见误区
- 为了看起来“高级”而套模式。
- 把简单问题拆成过多抽象层,导致阅读和排障都变难。
- 只会背 UML,不会解释为什么这里需要这个模式。
进阶路线
- 继续关注模式之间的组合用法,而不是孤立记忆。
- 从业务建模、演进策略和团队协作角度看模式的适用性。
- 把模式结论沉淀为项目模板、基类或约束文档。
适用场景
- 当你准备把《事件溯源与 CQRS 模式》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合在业务规则频繁变化、分支增多或对象协作复杂时引入。
- 当你希望提高扩展性,但又不想把系统拆得过度抽象时,这类主题很有参考价值。
落地建议
- 先识别变化点,再决定是否引入模式,而不是反过来套模板。
- 优先为模式的边界、依赖和调用路径画出简单结构图。
- 把模式落到一个明确场景,例如支付、规则计算、插件扩展或工作流。
排错清单
- 检查抽象层是否过多,导致调用路径和责任不清晰。
- 确认引入模式后是否真的减少了条件分支和重复代码。
- 警惕“为了模式而模式”,尤其是在简单业务里。
复盘问题
- 如果把《事件溯源与 CQRS 模式》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《事件溯源与 CQRS 模式》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《事件溯源与 CQRS 模式》最大的收益和代价分别是什么?
