分布式锁
大约 10 分钟约 2891 字
分布式锁
简介
在分布式系统中,多个服务实例可能同时操作共享资源。分布式锁确保同一时刻只有一个实例能访问临界资源,防止并发导致的数据不一致。常用的分布式锁实现包括 Redis、数据库和 ZooKeeper。
特点
Redis 分布式锁
基础实现
// NuGet: StackExchange.Redis
using StackExchange.Redis;
public class RedisDistributedLock : IDisposable
{
private readonly IDatabase _db;
private readonly string _lockKey;
private readonly string _lockValue;
private readonly TimeSpan _expiry;
private bool _isAcquired;
public RedisDistributedLock(IDatabase db, string lockKey,
TimeSpan? expiry = null, TimeSpan? wait = null, TimeSpan? retry = null)
{
_db = db;
_lockKey = $"lock:{lockKey}";
_lockValue = Guid.NewGuid().ToString(); // 唯一标识,防止误解锁
_expiry = expiry ?? TimeSpan.FromSeconds(30);
}
public async Task<bool> AcquireAsync(TimeSpan? waitTime = null, TimeSpan? retryInterval = null)
{
var wait = waitTime ?? TimeSpan.FromSeconds(10);
var retry = retryInterval ?? TimeSpan.FromMilliseconds(200);
var deadline = DateTime.UtcNow + wait;
while (DateTime.UtcNow < deadline)
{
// SET key value NX EX seconds
_isAcquired = await _db.StringSetAsync(
_lockKey, _lockValue, _expiry, When.NotExists);
if (_isAcquired) return true;
await Task.Delay(retry);
}
return false;
}
public async Task ReleaseAsync()
{
if (!_isAcquired) return;
// Lua 脚本确保只释放自己持有的锁
var script = @"
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end";
await _db.ScriptEvaluateAsync(script,
new RedisKey[] { _lockKey },
new RedisValue[] { _lockValue });
_isAcquired = false;
}
public void Dispose()
{
if (_isAcquired)
{
ReleaseAsync().GetAwaiter().GetResult();
}
}
}使用示例
// 基本使用
var redis = ConnectionMultiplexer.Connect("localhost:6379");
var db = redis.GetDatabase();
await using var Lockable = new RedisDistributedLock(db, "order:1001");
if (await Lockable.AcquireAsync(TimeSpan.FromSeconds(5)))
{
try
{
// 临界区操作
await ProcessOrderAsync(1001);
}
finally
{
await Lockable.ReleaseAsync();
}
}
else
{
Console.WriteLine("获取锁失败,请稍后重试");
}
// 封装为便捷方法
public static async Task<T> WithLockAsync<T>(
IDatabase db, string lockKey,
Func<Task<T>> action,
TimeSpan? lockExpiry = null,
TimeSpan? waitTime = null)
{
using var @lock = new RedisDistributedLock(db, lockKey, lockExpiry);
if (!await @lock.AcquireAsync(waitTime))
throw new InvalidOperationException($"获取锁失败: {lockKey}");
return await action();
}
// 使用
var result = await WithLockAsync(db, "inventory:product-123", async () =>
{
var stock = await GetStockAsync("product-123");
if (stock > 0)
{
await DeductStockAsync("product-123", 1);
return true;
}
return false;
});RedLock.net
多节点 Redis 锁
// NuGet: RedLock.net
// NuGet: RedLock.net.SERedis
using RedLockNet;
using RedLockNet.SERedis;
using RedLockNet.SERedis.Configuration;
// 创建 RedLock 工厂
var factory = RedLockFactory.Create(new List<RedLockMultiplexer>
{
new(ConnectionMultiplexer.Connect("redis1:6379")),
new(ConnectionMultiplexer.Connect("redis2:6379")),
new(ConnectionMultiplexer.Connect("redis3:6379")),
});
// 使用分布式锁
await using (var redLock = await factory.CreateLockAsync(
"order:1001", // 锁键
TimeSpan.FromSeconds(30), // 锁过期时间
TimeSpan.FromSeconds(10), // 等待获取锁时间
TimeSpan.FromMilliseconds(500))) // 重试间隔
{
if (redLock.IsAcquired)
{
// 临界区操作
await ProcessOrderAsync(1001);
}
}
// 自动续期
await using (var redLock = await factory.CreateLockAsync(
"long-task", TimeSpan.FromSeconds(30)))
{
if (redLock.IsAcquired)
{
// RedLock 自动在后台续期(每 expiry/3 检查一次)
await LongRunningTaskAsync();
}
}数据库乐观锁
乐观并发控制
// EF Core 乐观锁
public class Product
{
public int Id { get; set; }
public string Name { get; set; } = "";
public int Stock { get; set; }
[Timestamp] // 行版本号
public byte[] Version { get; set; } = null!;
}
// 并发更新
public async Task<bool> DeductStockAsync(int productId, int quantity)
{
try
{
var product = await _context.Products.FindAsync(productId);
if (product == null || product.Stock < quantity)
return false;
product.Stock -= quantity;
await _context.SaveChangesAsync(); // Version 不匹配抛 DbUpdateConcurrencyException
return true;
}
catch (DbUpdateConcurrencyException)
{
// 重新加载并重试
foreach (var entry in e.Entries)
{
await entry.ReloadAsync();
}
return false;
}
}
// 乐观锁 SQL(条件更新)
public async Task<bool> DeductStockRawAsync(int productId, int quantity)
{
var affected = await _context.Database.ExecuteSqlRawAsync(
"UPDATE Products SET Stock = Stock - @p0 WHERE Id = @p1 AND Stock >= @p0",
quantity, productId);
return affected > 0;
}乐观锁重试策略
// 通用重试策略
public static async Task<T> RetryOnConcurrencyAsync<T>(
Func<Task<T>> action,
int maxRetries = 3,
TimeSpan? delay = null)
{
var retryDelay = delay ?? TimeSpan.FromMilliseconds(100);
var attempts = 0;
while (true)
{
try
{
return await action();
}
catch (DbUpdateConcurrencyException ex) when (attempts < maxRetries)
{
attempts++;
_logger.LogWarning(ex, "并发冲突,第 {Attempt} 次重试", attempts);
// 解决冲突:重新加载当前数据库值
foreach (var entry in ex.Entries)
{
await entry.ReloadAsync();
}
await Task.Delay(retryDelay);
retryDelay = TimeSpan.FromTicks(retryDelay.Ticks * 2); // 指数退避
}
}
}
// 使用示例
var order = await RetryOnConcurrencyAsync(async () =>
{
var o = await _context.Orders.FindAsync(orderId);
o!.Status = "Completed";
await _context.SaveChangesAsync();
return o;
});
// 冲突解决策略
// 1. 数据库胜出(Database Wins)— 重新加载数据库值
// 2. 客户端胜出(Client Wins)— 强制覆盖数据库值
// 3. 合并策略(Merge)— 取两边的最新字段
// 客户端胜出示例
foreach (var entry in ex.Entries)
{
entry.OriginalValues.SetValues(
entry.GetDatabaseValues()); // 用数据库值覆盖原始值,但保留当前修改值
}
await _context.SaveChangesAsync();分布式锁进阶
可重入锁实现
// 可重入分布式锁 — 同一持有者可以多次获取同一把锁
public class ReentrantRedisLock : IDisposable
{
private readonly IDatabase _db;
private readonly string _lockKey;
private readonly string _lockValue;
private readonly TimeSpan _expiry;
private int _reentrantCount;
private bool _disposed;
public ReentrantRedisLock(IDatabase db, string lockKey,
TimeSpan? expiry = null)
{
_db = db;
_lockKey = $"lock:{lockKey}";
_lockValue = Guid.NewGuid().ToString("N");
_expiry = expiry ?? TimeSpan.FromSeconds(30);
}
public async Task<bool> AcquireAsync(TimeSpan? waitTime = null)
{
if (_reentrantCount > 0)
{
// 已持有锁,重入计数加一
_reentrantCount++;
return true;
}
var wait = waitTime ?? TimeSpan.FromSeconds(10);
var deadline = DateTime.UtcNow + wait;
var retry = TimeSpan.FromMilliseconds(100);
while (DateTime.UtcNow < deadline)
{
var acquired = await _db.StringSetAsync(
_lockKey, _lockValue, _expiry, When.NotExists);
if (acquired)
{
_reentrantCount = 1;
return true;
}
// 检查是否是自己持有的锁(可重入)
var currentValue = await _db.StringGetAsync(_lockKey);
if (currentValue == _lockValue)
{
_reentrantCount++;
return true;
}
await Task.Delay(retry);
}
return false;
}
public async Task ReleaseAsync()
{
if (_reentrantCount > 1)
{
// 重入计数减一,不释放锁
_reentrantCount--;
return;
}
if (_reentrantCount == 0) return;
// Lua 脚本原子释放
var script = @"
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end";
await _db.ScriptEvaluateAsync(script,
new RedisKey[] { _lockKey },
new RedisValue[] { _lockValue });
_reentrantCount = 0;
}
public void Dispose()
{
if (!_disposed)
{
ReleaseAsync().GetAwaiter().GetResult();
_disposed = true;
}
}
}分布式锁与 ASP.NET Core 集成
// 注册分布式锁服务
builder.Services.AddSingleton<IDistributedLockFactory>(sp =>
{
var redis = sp.GetRequiredService<IConnectionMultiplexer>();
return new RedisLockFactory(redis.GetDatabase());
});
public interface IDistributedLockFactory
{
IDistributedLock Create(string key, TimeSpan? expiry = null);
}
public interface IDistributedLock : IAsyncDisposable
{
Task<bool> AcquireAsync(TimeSpan? waitTime = null);
Task ReleaseAsync();
}
// 在 Controller 中使用
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
private readonly IDistributedLockFactory _lockFactory;
[HttpPost("{id}/pay")]
public async Task<IActionResult> PayOrder(int id)
{
await using var @lock = _lockFactory.Create($"order:pay:{id}", TimeSpan.FromSeconds(30));
if (!await @lock.AcquireAsync(TimeSpan.FromSeconds(5)))
{
return StatusCode(429, new { message = "订单正在处理中,请稍后重试" });
}
try
{
var result = await _orderService.ProcessPaymentAsync(id);
return Ok(result);
}
catch (Exception ex)
{
_logger.LogError(ex, "支付处理失败 OrderId={OrderId}", id);
return StatusCode(500, new { message = "支付处理失败" });
}
}
}分布式锁常见问题与解决方案
问题1:锁过期但任务未完成(业务执行时间超过锁的过期时间)
解决:
- 使用 RedLock 自动续期
- 设置足够长的过期时间
- 使用后台线程定期续期(看门狗机制)
- 评估任务最大执行时间,设置合理的超时
问题2:Redis 主从切换导致锁丢失
解决:
- 使用 RedLock 多节点方案
- 设置合理的锁过期时间,接受极端情况下的短暂不一致
- 关键业务使用数据库乐观锁作为兜底
问题3:死锁
解决:
- 所有锁必须设置过期时间
- 获取锁后必须在 finally 中释放
- 使用 using / await using 确保释放
- 避免嵌套锁(如需嵌套,按固定顺序获取)
问题4:锁粒度过大导致性能瓶颈
解决:
- 锁的 Key 尽量精确(如 order:pay:123 而非 order:pay)
- 缩短锁持有时间
- 考虑分段锁(如按用户 ID 哈希分段)
- 非关键路径使用乐观锁替代
问题5:网络分区导致脑裂
解决:
- RedLock 要求过半数节点获取成功
- 设置合理的等待时间和重试次数
- 关键操作增加业务层面的幂等性数据库悲观锁
// SQL Server 悲观锁 — SELECT ... WITH (UPDLOCK, HOLDLOCK)
var order = await _context.Orders
.FromSqlRaw("SELECT * FROM Orders WITH (UPDLOCK, HOLDLOCK) WHERE Id = {0}", orderId)
.FirstOrDefaultAsync();
// MySQL 悲观锁 — SELECT ... FOR UPDATE
var order2 = await _context.Orders
.FromSqlRaw("SELECT * FROM Orders WHERE Id = {0} FOR UPDATE", orderId)
.FirstOrDefaultAsync();
// PostgreSQL 悲观锁 — SELECT ... FOR UPDATE
var order3 = await _context.Orders
.FromSqlRaw("SELECT * FROM Orders WHERE Id = {0} FOR UPDATE", orderId)
.FirstOrDefaultAsync();
// 悲观锁 vs 乐观锁选择:
// 读多写少 → 乐观锁(性能好,冲突少)
// 写多读少 → 悲观锁(避免大量重试)
// 高并发竞争 → 分布式锁 + 乐观锁组合
// 低并发场景 → 数据库行锁即可分布式锁方案对比
| 方案 | 性能 | 可靠性 | 适用场景 |
|---|---|---|---|
| Redis SETNX | 高 | 中 | 一般业务场景 |
| RedLock | 中 | 高 | 高可靠需求 |
| 数据库行锁 | 低 | 高 | 低并发场景 |
| 数据库乐观锁 | 高 | 中 | 读多写少 |
| ZooKeeper | 中 | 极高 | 强一致性需求 |
| etcd | 中 | 高 | 云原生场景 |
优点
缺点
总结
分布式锁推荐方案:Redis SETNX(高性能)或 RedLock(高可靠)。Redis 锁关键点:唯一值标识持有者、Lua 脚本原子释放、设置过期时间防死锁。RedLock 在多个 Redis 节点上获取锁,容忍部分节点故障。乐观锁用 EF Core [Timestamp] 或条件更新 SQL。选择原则:高并发用 Redis,强一致用 RedLock/ZooKeeper,低并发用数据库锁。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
- 先把数据模型、访问模式和执行代价绑定起来理解。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
- 保留执行计划、样本 SQL、索引定义和优化前后指标。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
- 脱离真实数据分布设计索引。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
- 继续深入存储引擎、复制机制、归档与冷热分层治理。
适用场景
- 当你准备把《分布式锁》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《分布式锁》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《分布式锁》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《分布式锁》最大的收益和代价分别是什么?
