事件溯源 Event Sourcing
大约 10 分钟约 2984 字
事件溯源 Event Sourcing
简介
事件溯源(Event Sourcing)的核心思想是:系统不直接把“当前状态”作为唯一事实存储,而是把每一次状态变化记录成事件。之后系统状态可以通过回放事件流重新计算出来。这种方式特别适合订单、支付、账务、审计、流程追踪等对变更历史、可追溯性和回放能力要求很高的场景。
特点
实现
传统状态存储 vs 事件溯源
传统模式:
Orders
- Id
- UserId
- Amount
- Status
- UpdatedAt
优点:
- 简单直观
- CRUD 友好
缺点:
- 中间状态丢失
- 难还原完整变更历史
- 审计能力弱事件溯源模式:
OrderEvents
- OrderId
- Version
- EventType
- EventData
- OccurredAt
示例:
1. OrderCreated
2. PaymentReceived
3. OrderShipped
4. OrderRefunded
优点:
- 全历史保留
- 可重放
- 适合审计和领域事件传播事件定义与聚合根
public abstract record DomainEvent(Guid AggregateId, long Version, DateTimeOffset OccurredAt)
{
protected DomainEvent(Guid aggregateId) : this(aggregateId, 0, DateTimeOffset.UtcNow)
{
}
}
public record OrderCreatedEvent(
Guid OrderId,
long UserId,
decimal TotalAmount)
: DomainEvent(OrderId);
public record OrderPaidEvent(
Guid OrderId,
string PaymentId,
decimal Amount)
: DomainEvent(OrderId);
public record OrderShippedEvent(
Guid OrderId,
string TrackingNumber)
: DomainEvent(OrderId);
public record OrderCancelledEvent(
Guid OrderId,
string Reason)
: DomainEvent(OrderId);public class OrderAggregate
{
private readonly List<DomainEvent> _uncommittedEvents = new();
public Guid Id { get; private set; }
public long UserId { get; private set; }
public decimal TotalAmount { get; private set; }
public string Status { get; private set; } = "None";
public long Version { get; private set; }
public IReadOnlyCollection<DomainEvent> GetUncommittedEvents() => _uncommittedEvents.AsReadOnly();
public void ClearUncommittedEvents() => _uncommittedEvents.Clear();
public static OrderAggregate Create(long userId, decimal totalAmount)
{
var aggregate = new OrderAggregate();
aggregate.RaiseEvent(new OrderCreatedEvent(Guid.NewGuid(), userId, totalAmount));
return aggregate;
}
public void Pay(string paymentId, decimal amount)
{
if (Status != "Created")
throw new InvalidOperationException("Only created orders can be paid.");
RaiseEvent(new OrderPaidEvent(Id, paymentId, amount));
}
public void Ship(string trackingNumber)
{
if (Status != "Paid")
throw new InvalidOperationException("Only paid orders can be shipped.");
RaiseEvent(new OrderShippedEvent(Id, trackingNumber));
}
public void Cancel(string reason)
{
if (Status is "Shipped" or "Cancelled")
throw new InvalidOperationException("Current status cannot be cancelled.");
RaiseEvent(new OrderCancelledEvent(Id, reason));
}
public static OrderAggregate FromHistory(IEnumerable<DomainEvent> history)
{
var aggregate = new OrderAggregate();
foreach (var evt in history.OrderBy(x => x.Version))
{
aggregate.Apply(evt);
aggregate.Version = evt.Version;
}
return aggregate;
}
private void RaiseEvent(DomainEvent evt)
{
Version++;
var versioned = evt with { Version = Version };
Apply(versioned);
_uncommittedEvents.Add(versioned);
}
private void Apply(DomainEvent evt)
{
switch (evt)
{
case OrderCreatedEvent e:
Id = e.OrderId;
UserId = e.UserId;
TotalAmount = e.TotalAmount;
Status = "Created";
break;
case OrderPaidEvent:
Status = "Paid";
break;
case OrderShippedEvent:
Status = "Shipped";
break;
case OrderCancelledEvent:
Status = "Cancelled";
break;
}
}
}事件存储与并发控制
public class EventRecord
{
public long Id { get; set; }
public Guid AggregateId { get; set; }
public string EventType { get; set; } = string.Empty;
public string EventData { get; set; } = string.Empty;
public long Version { get; set; }
public DateTimeOffset OccurredAt { get; set; }
}public class EventStore
{
private readonly AppDbContext _dbContext;
public EventStore(AppDbContext dbContext)
{
_dbContext = dbContext;
}
public async Task<List<DomainEvent>> LoadAsync(Guid aggregateId, CancellationToken cancellationToken = default)
{
var records = await _dbContext.Set<EventRecord>()
.Where(x => x.AggregateId == aggregateId)
.OrderBy(x => x.Version)
.ToListAsync(cancellationToken);
return records.Select(Deserialize).ToList();
}
public async Task SaveAsync(Guid aggregateId, IReadOnlyCollection<DomainEvent> events, long expectedVersion, CancellationToken cancellationToken = default)
{
var currentVersion = await _dbContext.Set<EventRecord>()
.Where(x => x.AggregateId == aggregateId)
.MaxAsync(x => (long?)x.Version, cancellationToken) ?? 0;
if (currentVersion != expectedVersion)
throw new InvalidOperationException($"Concurrency conflict. expected={expectedVersion}, actual={currentVersion}");
foreach (var evt in events)
{
_dbContext.Add(new EventRecord
{
AggregateId = aggregateId,
EventType = evt.GetType().Name,
EventData = JsonSerializer.Serialize(evt, evt.GetType()),
Version = evt.Version,
OccurredAt = evt.OccurredAt
});
}
await _dbContext.SaveChangesAsync(cancellationToken);
}
private static DomainEvent Deserialize(EventRecord record)
{
return record.EventType switch
{
nameof(OrderCreatedEvent) => JsonSerializer.Deserialize<OrderCreatedEvent>(record.EventData)!,
nameof(OrderPaidEvent) => JsonSerializer.Deserialize<OrderPaidEvent>(record.EventData)!,
nameof(OrderShippedEvent) => JsonSerializer.Deserialize<OrderShippedEvent>(record.EventData)!,
nameof(OrderCancelledEvent) => JsonSerializer.Deserialize<OrderCancelledEvent>(record.EventData)!,
_ => throw new NotSupportedException($"Unknown event type: {record.EventType}")
};
}
}事件存储关键点:
- 按 aggregateId + version 保证顺序
- 用 expectedVersion 做乐观并发控制
- 事件序列化格式要稳定可演进读模型 / 投影(Projection)
public class OrderReadModel
{
public Guid OrderId { get; set; }
public long UserId { get; set; }
public decimal TotalAmount { get; set; }
public string Status { get; set; } = string.Empty;
public DateTimeOffset UpdatedAt { get; set; }
}public class OrderProjectionHandler
{
private readonly AppDbContext _dbContext;
public OrderProjectionHandler(AppDbContext dbContext)
{
_dbContext = dbContext;
}
public async Task HandleAsync(DomainEvent evt, CancellationToken cancellationToken = default)
{
switch (evt)
{
case OrderCreatedEvent e:
_dbContext.Set<OrderReadModel>().Add(new OrderReadModel
{
OrderId = e.OrderId,
UserId = e.UserId,
TotalAmount = e.TotalAmount,
Status = "Created",
UpdatedAt = e.OccurredAt
});
break;
case OrderPaidEvent e:
var paidOrder = await _dbContext.Set<OrderReadModel>().FindAsync([e.OrderId], cancellationToken);
if (paidOrder is not null)
{
paidOrder.Status = "Paid";
paidOrder.UpdatedAt = e.OccurredAt;
}
break;
case OrderShippedEvent e:
var shippedOrder = await _dbContext.Set<OrderReadModel>().FindAsync([e.OrderId], cancellationToken);
if (shippedOrder is not null)
{
shippedOrder.Status = "Shipped";
shippedOrder.UpdatedAt = e.OccurredAt;
}
break;
}
await _dbContext.SaveChangesAsync(cancellationToken);
}
}事件溯源通常不直接拿事件表做页面查询,
而是通过投影构建:
- 列表页读模型
- 报表视图
- 搜索索引
- 审计展示模型优点
快照机制
长事件流优化
/// <summary>
/// 快照(Snapshot)— 避免每次从头回放所有事件
/// 当事件数量超过阈值时,保存当前状态的快照
/// </summary>
public class Snapshot
{
public Guid AggregateId { get; set; }
public long Version { get; set; }
public string StateData { get; set; } = ""; // 聚合根序列化后的状态
public DateTimeOffset CreatedAt { get; set; }
}
public class SnapshotRepository
{
private readonly AppDbContext _dbContext;
public SnapshotRepository(AppDbContext dbContext)
{
_dbContext = dbContext;
}
public async Task<Snapshot?> GetLatestSnapshotAsync(Guid aggregateId)
{
return await _dbContext.Set<Snapshot>()
.Where(s => s.AggregateId == aggregateId)
.OrderByDescending(s => s.Version)
.FirstOrDefaultAsync();
}
public async Task SaveSnapshotAsync(Snapshot snapshot)
{
_dbContext.Set<Snapshot>().Add(snapshot);
await _dbContext.SaveChangesAsync();
}
}
/// <summary>
/// 支持快照的聚合根加载
/// </summary>
public class AggregateRepository
{
private readonly EventStore _eventStore;
private readonly SnapshotRepository _snapshotRepo;
private const int SnapshotInterval = 100; // 每 100 个事件保存一次快照
public AggregateRepository(EventStore eventStore, SnapshotRepository snapshotRepo)
{
_eventStore = eventStore;
_snapshotRepo = snapshotRepo;
}
public async Task<T> LoadAsync<T>(
Guid aggregateId,
Func<IEnumerable<DomainEvent>, Snapshot, T> replayFromSnapshot,
Func<IEnumerable<DomainEvent>, T> replayFromHistory) where T : class
{
// 先查找最新快照
var snapshot = await _snapshotRepo.GetLatestSnapshotAsync(aggregateId);
if (snapshot != null)
{
// 从快照开始,只回放快照之后的事件
var eventsAfterSnapshot = await _eventStore.LoadEventsAfterVersionAsync(
aggregateId, snapshot.Version);
return replayFromSnapshot(eventsAfterSnapshot, snapshot);
}
else
{
// 没有快照,从头回放所有事件
var allEvents = await _eventStore.LoadAsync(aggregateId);
return replayFromHistory(allEvents);
}
}
public async Task SaveAsync<T>(
Guid aggregateId,
IReadOnlyCollection<DomainEvent> newEvents,
long currentVersion,
T aggregateState) where T : class
{
// 保存事件
await _eventStore.SaveAsync(aggregateId, newEvents, currentVersion);
// 检查是否需要创建快照
if (currentVersion > 0 && currentVersion % SnapshotInterval == 0)
{
var snapshot = new Snapshot
{
AggregateId = aggregateId,
Version = currentVersion,
StateData = JsonSerializer.Serialize(aggregateState),
CreatedAt = DateTimeOffset.UtcNow
};
await _snapshotRepo.SaveSnapshotAsync(snapshot);
}
}
}事件版本演进
事件兼容性管理
/// <summary>
/// 事件版本演进 — 保持向后兼容
/// </summary
// V1 事件
public record OrderCreatedEventV1(
Guid OrderId,
long UserId,
decimal Amount)
: DomainEvent(OrderId);
// V2 事件 — 新增字段
public record OrderCreatedEventV2(
Guid OrderId,
long UserId,
decimal Amount,
string Currency, // 新增:货币
string ShippingAddress, // 新增:收货地址
List<OrderItemData> Items) // 新增:订单项明细
: DomainEvent(OrderId);
// 事件升级器
public class EventUpgrader
{
private readonly Dictionary<(Type, int), Func<object, object>> _upgraders = new();
public EventUpgrader()
{
// V1 → V2
_upgraders[(typeof(OrderCreatedEventV1), 1)] = UpgradeV1ToV2;
}
public object Upgrade(DomainEvent evt, int fromVersion, int toVersion)
{
var current = (object)evt;
for (int v = fromVersion; v < toVersion; v++)
{
var key = (current.GetType(), v);
if (_upgraders.TryGetValue(key, out var upgrader))
{
current = upgrader(current);
}
}
return current;
}
private static object UpgradeV1ToV2(object source)
{
var v1 = (OrderCreatedEventV1)source;
return new OrderCreatedEventV2(
v1.OrderId,
v1.UserId,
v1.Amount,
Currency: "CNY", // 默认值
ShippingAddress: "", // 默认值
Items: new List<OrderItemData>());
}
}
// 事件存储中的反序列化支持
public class EventSerializer
{
private readonly EventUpgrader _upgrader;
private readonly int _currentVersion;
public EventSerializer(EventUpgrader upgrader, int currentVersion = 2)
{
_upgrader = upgrader;
_currentVersion = currentVersion;
}
public DomainEvent Deserialize(EventRecord record)
{
var eventType = Type.GetType($"MyApp.Events.{record.EventType}")!;
var evt = JsonSerializer.Deserialize(record.EventData, eventType)!;
// 检查是否需要升级
var versionInHeader = record.EventVersion;
if (versionInHeader < _currentVersion)
{
evt = (DomainEvent)_upgrader.Upgrade(evt, versionInHeader, _currentVersion);
}
return (DomainEvent)evt;
}
}事件发布与 Outbox 模式
保证事件最终一致性
/// <summary>
/// Outbox 模式 — 事件存储与消息发布的事务一致性
/// 先将事件写入 Outbox 表(与业务数据在同一事务中)
/// 后台服务定期扫描 Outbox 表,将事件发布到消息队列
/// </summary>
public class OutboxMessage
{
public long Id { get; set; }
public Guid AggregateId { get; set; }
public string EventType { get; set; } = "";
public string Payload { get; set; } = ""
public string Status { get; set; } = "Pending"; // Pending, Published, Failed
public int RetryCount { get; set; }
public DateTimeOffset CreatedAt { get; set; }
public DateTimeOffset? PublishedAt { get; set; }
public string? ErrorMessage { get; set; }
}
public class OutboxPublisher : BackgroundService
{
private readonly AppDbContext _dbContext;
private readonly IMessageBus _messageBus;
private readonly ILogger<OutboxPublisher> _logger;
public OutboxPublisher(
AppDbContext dbContext,
IMessageBus messageBus,
ILogger<OutboxPublisher> logger)
{
_dbContext = dbContext;
_messageBus = messageBus;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
var pendingMessages = await _dbContext.Set<OutboxMessage>()
.Where(m => m.Status == "Pending")
.OrderBy(m => m.CreatedAt)
.Take(100)
.ToListAsync(ct);
foreach (var message in pendingMessages)
{
try
{
await _messageBus.PublishAsync(
message.EventType, message.Payload);
message.Status = "Published";
message.PublishedAt = DateTimeOffset.UtcNow;
}
catch (Exception ex)
{
message.RetryCount++;
message.ErrorMessage = ex.Message;
if (message.RetryCount >= 5)
{
message.Status = "Failed";
_logger.LogError(ex,
"Outbox 消息发布失败(超过重试次数): {MessageId}", message.Id);
}
}
}
await _dbContext.SaveChangesAsync(ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "Outbox 扫描异常");
}
await Task.Delay(TimeSpan.FromSeconds(5), ct);
}
}
}
// 保存聚合根时同时写入 Outbox
public class OrderUnitOfWork
{
private readonly AppDbContext _dbContext;
public OrderUnitOfWork(AppDbContext dbContext)
{
_dbContext = dbContext;
}
public async Task SaveAsync(OrderAggregate aggregate)
{
using var transaction = await _dbContext.Database.BeginTransactionAsync();
// 保存事件
var events = aggregate.GetUncommittedEvents();
foreach (var evt in events)
{
_dbContext.Add(new EventRecord
{
AggregateId = aggregate.Id,
EventType = evt.GetType().Name,
EventData = JsonSerializer.Serialize(evt, evt.GetType()),
Version = evt.Version,
OccurredAt = evt.OccurredAt
});
// 写入 Outbox
_dbContext.Add(new OutboxMessage
{
AggregateId = aggregate.Id,
EventType = evt.GetType().Name,
Payload = JsonSerializer.Serialize(evt, evt.GetType()),
Status = "Pending",
CreatedAt = DateTimeOffset.UtcNow
});
}
await _dbContext.SaveChangesAsync();
await transaction.CommitAsync();
aggregate.ClearUncommittedEvents();
}
}事件重建与调试工具
聚合根状态查看
/// <summary>
/// 事件溯源调试 API — 查看聚合根的历史状态
/// </summary>
[ApiController]
[Route("api/debug/event-sourcing")]
public class EventSourcingDebugController : ControllerBase
{
private readonly EventStore _eventStore;
[HttpGet("{aggregateId:guid}/events")]
public async Task<IActionResult> GetEvents(Guid aggregateId)
{
var events = await _eventStore.LoadAsync(aggregateId);
return Ok(events.Select(e => new
{
e.Version,
EventType = e.GetType().Name,
e.OccurredAt,
Data = JsonSerializer.Serialize(e)
}));
}
[HttpGet("{aggregateId:guid}/state-at/{version}")]
public async Task<IActionResult> GetStateAtVersion(Guid aggregateId, long version)
{
var events = await _eventStore.LoadAsync(aggregateId);
var eventsUpToVersion = events.Where(e => e.Version <= version);
var aggregate = OrderAggregate.FromHistory(eventsUpToVersion);
return Ok(new
{
aggregate.Id,
aggregate.UserId,
aggregate.TotalAmount,
aggregate.Status,
aggregate.Version
});
}
[HttpPost("{aggregateId:guid}/replay")]
public async Task<IActionResult> ReplayProjection(Guid aggregateId)
{
var events = await _eventStore.LoadAsync(aggregateId);
// 重新执行投影...
return Ok(new { Message = $"已重放 {events.Count} 个事件" });
}
}缺点
总结
事件溯源最适合那些“变更历史本身就是业务价值”的系统,而不适合为了追求架构时髦而硬套到所有项目里。真正落地时,重点不只是保存事件,而是设计清楚聚合边界、事件命名、并发控制、读模型投影和回放补偿策略。
关键知识点
- 事件记录的是“已经发生的事实”,不是命令也不是当前状态快照。
- 聚合根负责校验业务规则并产生事件。
- 事件流按版本有序存储,重放后才能恢复状态。
- 读模型通常独立维护,不应直接依赖事件表给前端查询。
项目落地视角
- 支付、订单、账务、库存流水都很适合事件溯源。
- 审批、流程、合规审计系统也非常适合保留完整事件链路。
- 与 Kafka / Outbox 配合,可以把事件同时用于内部状态恢复和外部传播。
- 中后台查询页面通常需要单独投影表,而不是直接回放事件流。
常见误区
- 把任何有状态变化的系统都强行改成事件溯源。
- 事件命名不稳定、事件字段语义不清,后续难以演进。
- 只有事件流,没有可靠投影与回放机制。
- 忽略并发控制,导致聚合版本错乱。
进阶路线
- 深入学习 Event Store、Snapshot、Projection 和 Replay 机制。
- 结合 CQRS、Outbox、Kafka 做完整事件驱动架构。
- 学习事件版本演进与兼容策略。
- 研究账务系统、审批系统、订单系统的事件建模案例。
适用场景
- 账务、支付、订单、库存、审批、合规审计等强追溯系统。
- 需要保留完整变更历史的业务领域。
- 与 CQRS / 事件驱动架构深度结合的系统。
- 需要重放、补偿、追查历史状态的复杂业务。
落地建议
- 先确认“历史变更记录是否真的有业务价值”,再决定是否采用。
- 从单个边界清晰的聚合开始试点,不要一次性全系统事件化。
- 为每类事件建立稳定命名、版本和序列化规范。
- 读模型、回放工具、补偿流程要和事件存储一起设计。
排错清单
- 状态不一致时,先核对事件流版本和投影是否同步成功。
- 出现并发冲突时,先检查 expectedVersion 和聚合加载逻辑。
- 查询慢时,先检查是不是误拿事件流直接给前端查询。
- 历史回放异常时,先检查事件反序列化和事件版本兼容性。
