EF Core 乐观并发控制
大约 12 分钟约 3701 字
EF Core 乐观并发控制
简介
并发控制确保多个用户同时修改相同数据时的一致性。EF Core 支持乐观并发控制——通过并发令牌(Concurrency Token)检测冲突,在保存时发现冲突则抛出 DbUpdateConcurrencyException。本文深入讲解并发控制的原理、冲突处理策略和实战模式。
特点
并发令牌
配置并发令牌
// 方式 1:Data Annotation
public class Product
{
public int Id { get; set; }
public string Name { get; set; } = null!;
public decimal Price { get; set; }
public int Stock { get; set; }
// 并发令牌(每次 UPDATE 检查此值)
[Timestamp]
public byte[] RowVersion { get; set; } = null!;
// SQL Server: rowversion 类型
// 每次更新自动递增
}
// 方式 2:Fluent API
public class ProductConfig : IEntityTypeConfiguration<Product>
{
public void Configure(EntityTypeBuilder<Product> builder)
{
builder.Property(p => p.RowVersion)
.IsRowVersion() // 自动生成的 rowversion
.IsConcurrencyToken(); // 标记为并发令牌
// 或者使用普通字段作为并发令牌
builder.Property(p => p.Version)
.IsConcurrencyToken();
// GUID 并发令牌
builder.Property(p => p.ConcurrencyStamp)
.IsConcurrencyToken()
.HasDefaultValueSql("NEWID()");
}
}
// 方式 3:自定义并发令牌(非 rowversion)
public class Order
{
public int Id { get; set; }
public string Status { get; set; } = null!;
public int Version { get; set; } // 手动管理的版本号
// 配置
// builder.Property(o => o.Version).IsConcurrencyToken();
}冲突检测原理
生成的 SQL
// 假设 Product 有 RowVersion 作为并发令牌
// 原始数据:Id=1, Name="Widget", Price=9.99, RowVersion=0x00000001
// 用户 A 加载产品
var productA = await contextA.Products.FindAsync(1);
// productA.RowVersion = 0x00000001
// 用户 B 也加载同一产品
var productB = await contextB.Products.FindAsync(1);
// productB.RowVersion = 0x00000001
// 用户 A 修改价格
productA.Price = 12.99;
await contextA.SaveChangesAsync();
// SQL:
// UPDATE [Products]
// SET [Price] = 12.99
// WHERE [Id] = 1 AND [RowVersion] = 0x00000001
// → 成功!RowVersion 自动更新为 0x00000002
// 用户 B 修改库存
productB.Stock = 100;
await contextB.SaveChangesAsync();
// SQL:
// UPDATE [Products]
// SET [Stock] = 100
// WHERE [Id] = 1 AND [RowVersion] = 0x00000001
// → 失败!RowVersion 已变为 0x00000002,WHERE 条件不匹配
// → 抛出 DbUpdateConcurrencyException冲突处理策略
策略 1:客户端获胜(覆盖)
public async Task<bool> UpdateWithClientWinAsync(Product updated)
{
var maxRetries = 3;
for (int i = 0; i < maxRetries; i++)
{
try
{
var existing = await _context.Products.FindAsync(updated.Id);
if (existing == null) return false;
// 客户端的值覆盖数据库的值
_context.Entry(existing).CurrentValues.SetValues(updated);
await _context.SaveChangesAsync();
return true;
}
catch (DbUpdateConcurrencyException ex)
{
// 解决冲突:使用客户端的值
foreach (var entry in ex.Entries)
{
// 方式 1:直接覆盖数据库值
entry.OriginalValues.SetValues(
await entry.GetDatabaseValuesAsync());
}
if (i == maxRetries - 1) throw;
}
}
return false;
}策略 2:数据库获胜(放弃)
public async Task<bool> UpdateWithDatabaseWinAsync(int productId, Action<Product> update)
{
try
{
var product = await _context.Products.FindAsync(productId);
if (product == null) return false;
update(product);
await _context.SaveChangesAsync();
return true;
}
catch (DbUpdateConcurrencyException)
{
// 放弃本次修改,使用数据库的值
foreach (var entry in _context.ChangeTracker.Entries())
{
var dbValues = await entry.GetDatabaseValuesAsync();
if (dbValues != null)
{
entry.OriginalValues.SetValues(dbValues);
entry.State = EntityState.Unchanged; // 放弃修改
}
}
return false; // 告知客户端冲突
}
}策略 3:合并(属性级冲突解决)
public async Task<Product?> UpdateWithMergeAsync(int productId, Product updated)
{
var maxRetries = 3;
for (int i = 0; i < maxRetries; i++)
{
try
{
var dbValues = await _context.Products.AsNoTracking().FirstOrDefaultAsync(p => p.Id == productId);
if (dbValues == null) return null;
var entry = _context.Entry(dbValues);
entry.State = EntityState.Unchanged;
// 只更新客户端实际修改的属性
if (updated.Name != null) dbValues.Name = updated.Name;
if (updated.Price != 0) dbValues.Price = updated.Price;
if (updated.Stock != 0) dbValues.Stock = updated.Stock;
await _context.SaveChangesAsync();
return dbValues;
}
catch (DbUpdateConcurrencyException ex)
{
foreach (var entry in ex.Entries)
{
var proposed = entry.CurrentValues; // 客户端值
var database = await entry.GetDatabaseValuesAsync(); // 数据库最新值
var original = entry.OriginalValues; // 原始加载的值
if (database == null) throw;
// 属性级合并
foreach (var property in entry.Metadata.GetProperties())
{
var proposedValue = proposed[property];
var originalValue = original[property];
var databaseValue = database[property];
// 如果客户端修改了属性
if (!Equals(proposedValue, originalValue))
{
// 客户端的修改优先
proposed[property] = proposedValue;
}
else
{
// 客户端未修改,使用数据库的值
proposed[property] = databaseValue;
}
}
// 更新原始值为数据库值,重试
entry.OriginalValues.SetValues(database);
}
if (i == maxRetries - 1) throw;
}
}
return null;
}自定义并发控制
手动版本控制
// 不使用 RowVersion,用整型版本号
public class Document
{
public int Id { get; set; }
public string Title { get; set; } = null!;
public string Content { get; set; } = null!;
public int Version { get; set; } // 并发令牌
}
// API 层传递版本号
public record UpdateDocumentRequest(int Id, string Title, string Content, int Version);
[HttpPut("{id}")]
public async Task<IActionResult> UpdateDocument(int id, UpdateDocumentRequest request)
{
var doc = await _context.Documents.FindAsync(id);
if (doc == null) return NotFound();
// 检查版本号
if (doc.Version != request.Version)
{
return Conflict(new
{
Message = "文档已被其他人修改",
CurrentVersion = doc.Version,
YourVersion = request.Version
});
}
doc.Title = request.Title;
doc.Content = request.Content;
doc.Version++; // 手动递增版本号
try
{
await _context.SaveChangesAsync();
return Ok(doc);
}
catch (DbUpdateConcurrencyException)
{
return Conflict(new { Message = "并发冲突,请刷新后重试" });
}
}乐观并发深入配置
Fluent API 配置
// 方式 1:IsConcurrencyToken(单个令牌列)
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity<Product>(entity =>
{
entity.HasKey(e => e.Id);
entity.Property(e => e.Name).IsRequired().HasMaxLength(200);
entity.Property(e => e.Price).HasColumnType("decimal(18,2)");
// 将 Version 列标记为并发令牌
entity.Property(e => e.Version)
.IsConcurrencyToken();
});
// 方式 2:多个并发令牌
modelBuilder.Entity<Account>(entity =>
{
entity.Property(e => e.Balance).IsConcurrencyToken();
entity.Property(e => e.LastModifiedAt).IsConcurrencyToken();
});
// 方式 3:使用 RowVersion(自动管理,推荐 SQL Server)
modelBuilder.Entity<Document>(entity =>
{
entity.Property(e => e.RowVersion)
.IsRowVersion()
.ValueGeneratedOnAddOrUpdate();
// SQL Server 自动使用 timestamp/rowversion 类型
// 每次更新自动递增,无需手动管理
});
}
// 数据库层面
// SQL Server: [RowVersion] rowversion NOT NULL
// MySQL: 使用额外版本号列
// PostgreSQL: 使用 xmin 系统列生成的 SQL 对比
// 没有 RowVersion 的更新:
// UPDATE Products SET Name = @p0, Price = @p1 WHERE Id = @p2
// → 无法检测并发冲突
// 有 IsConcurrencyToken 的更新:
// UPDATE Products SET Name = @p0, Price = @p1
// WHERE Id = @p2 AND Version = @p3
// → Version 不匹配时影响行数为 0,触发 DbUpdateConcurrencyException
// 有 IsRowVersion 的更新:
// UPDATE Products SET Name = @p0, Price = @p1
// OUTPUT INSERTED.RowVersion
// WHERE Id = @p2 AND RowVersion = @p3
// → SQL Server 自动返回新的 RowVersion 值冲突处理策略详解
客户端获胜(强制覆盖)
public async Task<bool> ForceUpdateAsync(Product product)
{
var entry = _context.Entry(product);
try
{
await _context.SaveChangesAsync();
return true;
}
catch (DbUpdateConcurrencyException)
{
// 客户端获胜:将数据库值设为当前值,忽略并发修改
foreach (var property in entry.Metadata.GetProperties())
{
var originalValue = entry.OriginalValues.GetValue(property);
entry.OriginalValues.SetValue(property, originalValue);
}
// 或者更简单的方式
var databaseValues = await entry.GetDatabaseValuesAsync();
if (databaseValues == null)
{
// 实体已被删除
_context.Remove(product);
return false;
}
// 用数据库当前值覆盖原始值,然后重试
entry.OriginalValues.SetValues(databaseValues);
await _context.SaveChangesAsync();
return true;
}
}数据库获胜(放弃修改)
public async Task<Product?> RefreshAndReturnAsync(int productId)
{
var product = await _context.Products.FindAsync(productId);
if (product == null) return null;
try
{
product.Price = 99.99m; // 尝试修改
await _context.SaveChangesAsync();
return product;
}
catch (DbUpdateConcurrencyException ex)
{
// 数据库获胜:放弃当前修改,返回数据库最新值
foreach (var entry in ex.Entries)
{
var databaseValues = await entry.GetDatabaseValuesAsync();
if (databaseValues != null)
{
// 用数据库值覆盖当前值
entry.CurrentValues.SetValues(databaseValues);
}
else
{
// 实体已被删除
entry.State = EntityState.Detached;
return null;
}
}
// 现在 product 包含数据库最新值
return product;
}
}属性级合并(智能合并)
public async Task<Product?> MergeUpdateAsync(ProductDto dto)
{
var product = await _context.Products.FindAsync(dto.Id);
if (product == null) return null;
product.Name = dto.Name;
product.Price = dto.Price;
try
{
await _context.SaveChangesAsync();
return product;
}
catch (DbUpdateConcurrencyException ex)
{
foreach (var entry in ex.Entries)
{
var databaseValues = await entry.GetDatabaseValuesAsync();
if (databaseValues == null) return null;
// 属性级合并策略
var databaseProduct = (Product)databaseValues.ToObject();
var clientProduct = (Product)entry.Entity;
// 智能合并:只保留客户端确实修改的字段
// 未修改的字段使用数据库最新值
if (clientProduct.Name != databaseProduct.Name)
{
// 客户端修改了名称,保留客户端值
}
else
{
// 客户端未修改名称,使用数据库最新值
product.Name = databaseProduct.Name;
}
if (clientProduct.Price != databaseProduct.Price)
{
// 客户端修改了价格,保留客户端值
}
else
{
product.Price = databaseProduct.Price;
}
// 更新原始值后重试
entry.OriginalValues.SetValues(databaseValues);
}
await _context.SaveChangesAsync();
return product;
}
}通用重试机制
Polly 重试封装
// 安装:Polly
public class ConcurrencyResilientDbContext : DbContext
{
private readonly IResiliencePipeline _retryPipeline;
public ConcurrencyResilientDbContext(DbContextOptions options) : base(options)
{
_retryPipeline = new ResiliencePipelineBuilder()
.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromMilliseconds(100),
BackoffType = DelayBackoffType.Constant,
ShouldHandle = new PredicateBuilder().Handle<DbUpdateConcurrencyException>(),
OnRetry = args =>
{
Console.WriteLine($"并发冲突重试 {args.AttemptNumber}");
return default;
}
})
.Build();
}
public override async Task<int> SaveChangesAsync(
CancellationToken cancellationToken = default)
{
return await _retryPipeline.ExecuteAsync(async token =>
{
try
{
return await base.SaveChangesAsync(token);
}
catch (DbUpdateConcurrencyException ex)
{
// 自动解决策略:数据库获胜
foreach (var entry in ex.Entries)
{
var databaseValues = await entry.GetDatabaseValuesAsync(token);
if (databaseValues != null)
{
entry.OriginalValues.SetValues(databaseValues);
}
}
// 重新保存
return await base.SaveChangesAsync(token);
}
}, cancellationToken);
}
}自定义重试策略
public class ConcurrencyRetryHandler
{
private readonly int _maxRetries;
private readonly TimeSpan _delay;
private readonly ConcurrencyResolution _resolution;
public ConcurrencyRetryHandler(
int maxRetries = 3,
TimeSpan? delay = null,
ConcurrencyResolution resolution = ConcurrencyResolution.DatabaseWins)
{
_maxRetries = maxRetries;
_delay = delay ?? TimeSpan.FromMilliseconds(100);
_resolution = resolution;
}
public async Task<T> ExecuteAsync<T>(
Func<Task<T>> operation,
Action<DbUpdateConcurrencyException, int>? onRetry = null)
{
int attempt = 0;
while (true)
{
try
{
return await operation();
}
catch (DbUpdateConcurrencyException ex)
{
attempt++;
if (attempt > _maxRetries)
{
onRetry?.Invoke(ex, attempt);
throw;
}
onRetry?.Invoke(ex, attempt);
// 解决冲突
ResolveConflicts(ex.Entries, _resolution);
await Task.Delay(_delay);
}
}
}
private void ResolveConflicts(
IEnumerable<DbContextEntry> entries, ConcurrencyResolution resolution)
{
foreach (var entry in entries)
{
var databaseValues = entry.GetDatabaseValues();
if (databaseValues == null)
{
entry.State = EntityState.Detached;
continue;
}
switch (resolution)
{
case ConcurrencyResolution.DatabaseWins:
entry.CurrentValues.SetValues(databaseValues);
break;
case ConcurrencyResolution.ClientWins:
entry.OriginalValues.SetValues(databaseValues);
break;
}
}
}
}
public enum ConcurrencyResolution
{
DatabaseWins,
ClientWins
}
// 使用
var handler = new ConcurrencyRetryHandler(maxRetries: 3);
var product = await handler.ExecuteAsync(async () =>
{
var p = await _context.Products.FindAsync(id);
p!.Price = newPrice;
await _context.SaveChangesAsync();
return p;
}, onRetry: (ex, attempt) =>
{
_logger.LogWarning("并发冲突,第 {Attempt} 次重试", attempt);
});悲观锁(补充)
原生 SQL 悲观锁
// EF Core 本身不直接支持悲观锁
// 但可以通过原生 SQL 或事务隔离级别实现
// SQL Server: WITH (UPDLOCK, HOLDLOCK)
var product = await _context.Products
.FromSqlRaw(
"SELECT * FROM Products WITH (UPDLOCK, HOLDLOCK) WHERE Id = {0}",
productId)
.FirstOrDefaultAsync();
// 或使用事务 + FOR UPDATE(PostgreSQL)
await using var transaction = await _context.Database.BeginTransactionAsync(
IsolationLevel.Serializable, cancellationToken);
try
{
var product = await _context.Products
.FirstOrDefaultAsync(p => p.Id == productId, cancellationToken);
if (product != null)
{
product.Stock -= quantity;
await _context.SaveChangesAsync(cancellationToken);
}
await transaction.CommitAsync(cancellationToken);
}
catch
{
await transaction.RollbackAsync(cancellationToken);
throw;
}
// 悲观锁适用场景:
// 1. 库存扣减(不能超卖)
// 2. 账户余额(不能透支)
// 3. 唯一编号分配(不能重复)
// 4. 高并发写操作(乐观锁重试太多)
//
// 悲观锁缺点:
// 1. 锁等待降低并发度
// 2. 可能死锁
// 3. 数据库兼容性差(不同数据库语法不同)并发性能对比
乐观锁 vs 悲观锁
场景对比:
低并发(读写比 10:1):
乐观锁:冲突率 < 1%,几乎无重试开销 → 推荐
悲观锁:每次都加锁,增加等待时间 → 不推荐
中并发(读写比 5:1):
乐观锁:冲突率 5-10%,偶尔重试 → 可用
悲观锁:等待时间可控 → 可用
高并发写(读写比 1:1):
乐观锁:冲突率 30%+,大量重试 → 不推荐
悲观锁:串行化保证正确性 → 推荐
超高并发(如秒杀):
乐观锁 + Redis 预扣减 → 推荐
悲观锁:数据库压力大 → 不推荐
使用分布式锁 + 队列 → 最佳
性能建议:
1. 读多写少 → 乐观锁
2. 写多读少 → 悲观锁
3. 秒杀/抢购 → Redis 原子操作 + 消息队列
4. 金额操作 → 悲观锁或数据库层面的原子操作(UPDATE SET Balance = Balance - @amount WHERE Balance >= @amount)优点
缺点
缺点
总结
EF Core 乐观并发通过 [Timestamp](RowVersion)或 .IsConcurrencyToken() 配置令牌。SaveChanges 时 WHERE 子句包含令牌原始值,数据库值不匹配则抛出 DbUpdateConcurrencyException。三种冲突策略:客户端获胜(覆盖数据库)、数据库获胜(放弃修改)、合并(属性级解决)。entry.GetDatabaseValuesAsync() 获取数据库最新值,entry.OriginalValues.SetValues() 更新原始值后重试。乐观并发适合读多写少场景,写密集场景考虑悲观锁。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
- 先把数据模型、访问模式和执行代价绑定起来理解。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
- 保留执行计划、样本 SQL、索引定义和优化前后指标。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
- 脱离真实数据分布设计索引。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
- 继续深入存储引擎、复制机制、归档与冷热分层治理。
适用场景
- 当你准备把《EF Core 乐观并发控制》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《EF Core 乐观并发控制》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《EF Core 乐观并发控制》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《EF Core 乐观并发控制》最大的收益和代价分别是什么?
