Redis Lua 脚本
大约 11 分钟约 3219 字
Redis Lua 脚本
简介
Redis Lua 脚本允许将多个 Redis 命令打包成一个原子操作在服务端执行,中间不会被其他客户端命令打断。它是实现分布式锁、限流器、库存扣减等需要原子性的复杂业务逻辑的核心工具。Redis 保证 Lua 脚本的原子性,但脚本执行期间会阻塞整个 Redis 实例(单线程)。
在没有 Lua 脚本之前,要实现"先查后改"的原子操作只能依赖 Redis 事务(MULTI/EXEC)或 WATCH 机制。但事务中的条件判断无法在服务端完成,WATCH 又容易产生乐观锁冲突。Lua 脚本将这些逻辑直接搬到 Redis 服务端,一次网络往返完成所有操作,既保证了原子性又减少了网络延迟。
Lua 脚本执行模型
客户端 Redis 服务端
| |
|--- EVAL script key1 key2 arg1 --->|
| | (原子执行)
| | 1. 读取 key1
| | 2. 判断条件
| | 3. 写入 key2
| | 4. 返回结果
|<---------- result ----------------|
| |
|--- 其他客户端命令 --->| (阻塞等待)
| |
|<--- (脚本执行完后才处理) ---|脚本执行期间,Redis 不会处理其他客户端的命令(单线程模型)。因此脚本必须尽量短小,执行时间控制在 5ms 以内。
特点
EVAL vs EVALSHA
EVAL 执行流程:
客户端 → 发送完整脚本 → Redis 计算SHA1 → 缓存脚本 → 执行 → 返回结果
EVALSHA 执行流程:
客户端 → 发送 SHA1 指纹 → Redis 查找缓存 → 执行 → 返回结果
↓
未命中则报错 NOSCRIPT生产环境中推荐先尝试 EVALSHA,失败后降级到 EVAL,大多数 Redis 客户端库(如 StackExchange.Redis、Jedis)会自动处理这个逻辑。
实现
示例 1:分布式锁的原子获取与释放
-- 获取分布式锁
-- KEYS[1] = 锁的 Key
-- ARGV[1] = 锁的值(唯一标识)
-- ARGV[2] = 过期时间(毫秒)
if redis.call('SET', KEYS[1], ARGV[1], 'NX', 'PX', ARGV[2]) then
return 1 -- 获取成功
else
return 0 -- 获取失败
end
-- 安全释放锁(Lua 保证原子性)
-- 只有锁的值匹配时才删除,防止误删其他客户端的锁
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0 -- 锁不属于当前客户端
end-- 改进版:带续约的分布式锁
-- KEYS[1] = 锁的 Key
-- ARGV[1] = 锁的值(唯一标识)
-- ARGV[2] = 过期时间(毫秒)
local lockKey = KEYS[1]
local lockValue = ARGV[1]
local expireMs = tonumber(ARGV[2])
-- 先检查是否已持有锁(可重入)
local currentValue = redis.call('GET', lockKey)
if currentValue == lockValue then
-- 已持有锁,续约
redis.call('PEXPIRE', lockKey, expireMs)
return 1 -- 获取成功(重入)
end
if currentValue then
return 0 -- 锁被其他客户端持有
end
-- 尝试获取锁
if redis.call('SET', lockKey, lockValue, 'NX', 'PX', expireMs) then
return 1
else
return 0
end示例 2:滑动窗口限流器
-- 滑动窗口限流
-- KEYS[1] = 限流 Key
-- ARGV[1] = 窗口大小(毫秒)
-- ARGV[2] = 窗口内最大请求数
-- ARGV[3] = 当前时间戳(毫秒)
local key = KEYS[1]
local window = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local window_start = now - window
-- 移除窗口外的旧记录
redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)
-- 获取当前窗口内的请求数
local current = redis.call('ZCARD', key)
if current < limit then
-- 未超限,添加当前请求
redis.call('ZADD', key, now, now .. '-' .. math.random(1000000))
redis.call('PEXPIRE', key, window)
return limit - current - 1 -- 返回剩余配额
else
return 0 -- 已超限
end-- 令牌桶限流器(Token Bucket)
-- KEYS[1] = 限流 Key
-- ARGV[1] = 桶容量(最大令牌数)
-- ARGV[2] = 令牌填充速率(每秒)
-- ARGV[3] = 请求令牌数
-- ARGV[4] = 当前时间戳(毫秒)
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local requested = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local tokens = tonumber(redis.call('HGET', key, 'tokens'))
local last_refill = tonumber(redis.call('HGET', key, 'last_refill'))
-- 首次初始化
if tokens == nil then
tokens = capacity
last_refill = now
end
-- 计算应填充的令牌数
local elapsed = (now - last_refill) / 1000
local refill = math.floor(elapsed * rate)
tokens = math.min(capacity, tokens + refill)
last_refill = now
-- 判断是否有足够令牌
if tokens >= requested then
tokens = tokens - requested
redis.call('HSET', key, 'tokens', tokens)
redis.call('HSET', key, 'last_refill', last_refill)
return 1 -- 允许
else
redis.call('HSET', key, 'tokens', tokens)
redis.call('HSET', key, 'last_refill', last_refill)
return 0 -- 拒绝
end示例 3:库存扣减(防超卖)
-- 库存原子扣减
-- KEYS[1] = 库存 Key
-- ARGV[1] = 扣减数量
local stock = tonumber(redis.call('GET', KEYS[1]))
if stock == nil then
return -1 -- 库存 Key 不存在
end
local deduct = tonumber(ARGV[1])
if stock >= deduct then
redis.call('DECRBY', KEYS[1], deduct)
return stock - deduct -- 返回剩余库存
else
return -2 -- 库存不足
end-- 改进版:带订单记录的库存扣减(幂等)
-- KEYS[1] = 库存 Key
-- KEYS[2] = 订单记录 Key
-- ARGV[1] = 扣减数量
-- ARGV[2] = 订单号
-- ARGV[3] = 用户ID
local stockKey = KEYS[1]
local orderKey = KEYS[2]
local deduct = tonumber(ARGV[1])
local orderNo = ARGV[2]
local userId = ARGV[3]
-- 检查是否已扣减(幂等)
if redis.call('HEXISTS', orderKey, orderNo) == 1 then
return 0 -- 已处理,返回成功(幂等)
end
-- 检查库存
local stock = tonumber(redis.call('GET', stockKey))
if stock == nil then
return -1 -- 库存不存在
end
if stock < deduct then
return -2 -- 库存不足
end
-- 扣减库存
redis.call('DECRBY', stockKey, deduct)
-- 记录订单
redis.call('HSET', orderKey, orderNo, userId .. ':' .. deduct)
return stock - deduct -- 返回剩余库存示例 4:延迟队列
-- 延迟队列:添加任务
-- KEYS[1] = 待执行队列(Sorted Set)
-- ARGV[1] = 任务执行时间戳(毫秒)
-- ARGV[2] = 任务ID
-- ARGV[3] = 任务数据
redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2])
redis.call('HSET', 'delay_queue:tasks', ARGV[2], ARGV[3])
return 1-- 延迟队列:取出到期任务
-- KEYS[1] = 待执行队列(Sorted Set)
-- KEYS[2] = 处理中队列(List)
-- ARGV[1] = 当前时间戳(毫秒)
local ready_tasks = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, 10)
if #ready_tasks == 0 then
return {}
end
-- 移到处理中队列
for i, task_id in ipairs(ready_tasks) do
redis.call('LPUSH', KEYS[2], task_id)
redis.call('ZREM', KEYS[1], task_id)
end
return ready_tasks示例 5:C# 中调用 Redis Lua 脚本
// 使用 StackExchange.Redis 调用 Lua 脚本
using StackExchange.Redis;
var connection = ConnectionMultiplexer.Connect("localhost:6379");
var db = connection.GetDatabase();
// 库存扣减脚本
const string stockScript = @"
local stock = tonumber(redis.call('GET', KEYS[1]))
if stock == nil then return -1 end
local deduct = tonumber(ARGV[1])
if stock >= deduct then
redis.call('DECRBY', KEYS[1], deduct)
return stock - deduct
else
return -2
end
";
// 准备参数
var prepared = LuaScript.Prepare(stockScript);
// 执行脚本
var result = db.ScriptEvaluate(prepared,
new { KEYS = new RedisKey[] { "stock:product:1001" },
ARGV = new RedisValue[] { 1 } });
int remaining = (int)result;
Console.WriteLine($"剩余库存: {remaining}");
// -1 = Key 不存在, -2 = 库存不足, >=0 = 剩余数量// 使用 EVALSHA 手动管理脚本缓存
using StackExchange.Redis;
var connection = ConnectionMultiplexer.Connect("localhost:6379");
var db = connection.GetDatabase();
var server = connection.GetServer("localhost:6379");
string script = @"
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
";
// 1. 加载脚本获取 SHA1
byte[] scriptBytes = System.Text.Encoding.UTF8.GetBytes(script);
string sha1 = server.ScriptLoad(scriptBytes);
Console.WriteLine($"SHA1: {sha1}");
// 2. 使用 EVALSHA 执行(减少网络传输)
var result = db.ScriptEvaluate(sha1,
new RedisKey[] { "lock:order:1001" },
new RedisValue[] { "unique-token-abc123" });// 使用 LoadedLuaScript 缓存并批量执行
using StackExchange.Redis;
var connection = ConnectionMultiplexer.Connect("localhost:6379");
var db = connection.GetDatabase();
const string rateLimitScript = @"
local key = KEYS[1]
local window = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', key, '-inf', now - window)
local current = redis.call('ZCARD', key)
if current < limit then
redis.call('ZADD', key, now, now .. '-' .. math.random(1000000))
redis.call('PEXPIRE', key, window)
return limit - current - 1
else
return 0
end
";
// Prepared 方式:自动管理 EVAL/EVALSHA 降级
var prepared = LuaScript.Prepare(rateLimitScript);
// 批量检查多个用户的限流
var tasks = new List<Task<int>>();
for (int userId = 1; userId <= 100; userId++)
{
var userIdCopy = userId;
tasks.Add(Task.Run(() =>
{
var result = db.ScriptEvaluate(prepared,
new { KEYS = new RedisKey[] { $"ratelimit:user:{userIdCopy}" },
ARGV = new RedisValue[] { 60000, 100, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } });
return (int)result;
}));
}
await Task.WhenAll(tasks);
int allowedCount = tasks.Count(t => t.Result > 0);
Console.WriteLine($"允许通过的请求: {allowedCount}/100");优点
缺点
总结
Redis Lua 脚本是实现原子操作的核心工具,特别适合分布式锁、限流器、库存扣减等场景。核心原则是脚本尽量短小(执行时间不超过 5ms),操作的 Key 使用 {hashtag} 保证在同一 Slot,使用 EVALSHA 减少带宽消耗。
关键知识点
- Lua 脚本中的
redis.call()出错会中断脚本,redis.pcall()出错会返回错误对象 KEYS和ARGV数组下标从 1 开始(Lua 特性)redis.log(redis.LOG_WARNING, 'message')可以写日志到 Redis 日志- Redis 7.0 引入 Redis Function,支持持久化脚本到 RDB/AOF
- 脚本中可以使用
redis.replicate_commands()(Redis 5.0+ 默认开启)控制复制行为 TYPE命令在 Lua 中返回的是字符串类型("string"、"hash"、"list" 等)
redis.call() vs redis.pcall()
-- redis.call():命令执行失败时,脚本终止并返回错误
-- redis.pcall():命令执行失败时,返回错误对象,脚本继续执行
-- 使用 redis.pcall() 做优雅降级
local result = redis.pcall('GET', KEYS[1])
if type(result) == 'table' and result.err then
-- GET 命令失败,可能是 Key 类型不对
return { error = result.err }
end
-- 正常处理 result项目落地视角
- 所有 Lua 脚本纳入版本控制,文件命名规范:
redis_scripts/限流器.lua - 使用 EVALSHA 代替 EVAL,客户端库通常自动处理 SCRIPT LOAD 缓存
- 设置
lua-time-limit(默认 5000ms),超时后 Redis 进入阻塞状态但仍可执行 SCRIPT KILL - Redis Cluster 中使用
{hashtag}保证多 Key 在同一 Slot:{user}:1001:lock
脚本版本管理策略
# 脚本目录结构
redis_scripts/
├── v1/
│ ├── rate_limiter.lua
│ ├── stock_deduct.lua
│ └── distributed_lock.lua
├── v2/
│ ├── rate_limiter.lua # 改进版
│ └── stock_deduct.lua # 增加幂等支持
├── deploy.sh # 部署脚本
└── README.md
# deploy.sh: 自动加载所有脚本到 Redis
#!/bin/bash
REDIS_HOST="localhost"
REDIS_PORT="6379"
SCRIPT_DIR="./v2"
for script in "$SCRIPT_DIR"/*.lua; do
name=$(basename "$script" .lua)
sha1=$(redis-cli -h "$REDIS_HOST" -p "$REDIS_PORT" --no-auth-warning \
SCRIPT LOAD "$(cat "$script")")
echo "$name: $sha1"
done常见误区
- 认为 Lua 脚本 = 分布式事务:Lua 只保证单节点原子性,跨节点不支持
- Lua 脚本中写死 Key 名而不是通过 KEYS 参数传入:导致 Redis Cluster 无法正确路由
- 忽略脚本执行时间的监控:长脚本会阻塞整个 Redis 实例
- 在脚本中使用
math.random不设种子:Redis 会自动设种子以保证可重放性 - 在 Lua 脚本中使用不存在的 Redis 命令:会直接报错终止脚本
Redis Cluster 中的 Key 路由
-- 错误:不同 Hash Slot 的 Key
-- KEYS[1] = "user:1001:profile"
-- KEYS[2] = "user:1002:profile"
-- 这两个 Key 很可能不在同一个 Slot,脚本执行会报错
-- 正确:使用 {hashtag} 保证在同一 Slot
-- KEYS[1] = "{user}:1001:profile"
-- KEYS[2] = "{user}:1002:profile"
-- Hash Tag 内的内容用于计算 Slot,所以都在同一 Slot
-- 正确:单个 Key 操作
-- KEYS[1] = "stock:product:1001"
-- 单 Key 不存在路由问题进阶路线
- 学习 Redis 7.0 Function 特性:支持 Lua 和 Redis 7.0 新增的脚本引擎
- 研究 Redis 7.0 的 Function Library 管理
- 深入 Redis Cluster 的 Hash Slot 路由机制
- 学习 Redis 模块(Module)开发,实现更复杂的服务端逻辑
- 了解 Redis 脚本的调试工具(redis-cli --ldb)
适用场景
- 分布式锁的获取与释放需要原子判断
- 限流器、库存扣减等"先查后改"的原子操作
- 多个命令需要打包执行减少网络往返
- 延迟队列、消息去重等需要原子保证的场景
落地建议
- 所有脚本存储在独立文件中,通过 CI/CD 预加载到 Redis
- 监控
slowlog捕获执行超时的 Lua 脚本 - 使用
{hashtag}确保 Cluster 模式下多 Key 操作正确 - 脚本执行时间控制在 5ms 以内,复杂逻辑拆分成多次调用
排错清单
SLOWLOG GET 20检查是否有长时间执行的脚本SCRIPT EXISTS sha1确认脚本是否已缓存- 检查脚本中的 Key 是否使用了
{hashtag}(Cluster 模式) - 检查
redis.log输出是否记录到 Redis 日志文件
复盘问题
- 是否有 Lua 脚本执行时间超过 5ms?是否可以优化?
- 脚本版本管理策略是什么?上线流程是否包含脚本预加载?
- Redis Cluster 模式下是否所有多 Key 操作都使用了 hashtag?
- 哪些"先查后改"操作还在用 WATCH 而不是 Lua 脚本?
