Redis 高级应用
大约 13 分钟约 3991 字
Redis 高级应用
简介
Redis 是高性能内存键值数据库。进阶应用涵盖持久化策略、主从复制、哨兵模式、集群分片、Lua 脚本、Stream 消息队列等。掌握这些高级特性是构建高可用缓存和消息系统的关键。
特点
持久化
RDB vs AOF
# RDB 快照 — 定时保存全量数据
# redis.conf
save 900 1 # 900秒内有1次修改则保存
save 300 10 # 300秒内有10次修改
save 60 10000 # 60秒内有10000次修改
rdbcompression yes
rdbfilename dump.rdb
dir /var/lib/redis
# 手动触发
redis-cli BGSAVE # 后台保存
redis-cli SAVE # 同步保存(阻塞)
# AOF 追加日志 — 记录每次写操作
appendonly yes
appendfilename "appendonly.aof"
appendfsync everysec # 每秒刷盘(推荐)
# appendfsync always # 每次写入刷盘(安全但慢)
# appendfsync no # 由 OS 决定(快但可能丢数据)
# AOF 重写(压缩)
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb持久化策略选择
| 策略 | 数据安全 | 性能 | 适用场景 |
|---|---|---|---|
| RDB | 可能丢失分钟级数据 | 高 | 备份、灾备 |
| AOF(everysec) | 最多丢1秒 | 中 | 生产推荐 |
| RDB+AOF | 最高 | 中 | 关键数据 |
| 无持久化 | 不安全 | 最高 | 纯缓存 |
高级数据结构
HyperLogLog — 基数统计
# 统计 UV(独立访客)
PFADD uv:2026-04-11 user1 user2 user3 user4
PFADD uv:2026-04-11 user2 user5 user6 # 重复自动去重
PFCOUNT uv:2026-04-11 # 返回 6(独立用户数)
# 合并多天数据
PFMERGE uv:2026-04 uv:2026-04-10 uv:2026-04-11
PFCOUNT uv:2026-04
# 内存占用固定 12KB,无论多少元素Bitmap — 位操作
# 用户签到
SETBIT sign:2026-04:user1 0 1 # 第1天签到
SETBIT sign:2026-04:user1 1 1 # 第2天签到
SETBIT sign:2026-04:user1 2 0 # 第3天未签
BITCOUNT sign:2026-04:user1 # 统计签到天数:2
# 查询某天是否签到
GETBIT sign:2026-04:user1 0 # 1(已签)
# 连续签到天数(位运算)
BITPOS sign:2026-04:user1 0 # 第一个0的位置Stream — 消息队列
# 添加消息
XADD orders * user_id 1001 amount 99.9 status created
# 返回消息 ID:1744000000000-0
XADD orders * user_id 1002 amount 199.0 status created
# 读取消息
XREAD COUNT 10 BLOCK 5000 STREAMS orders $
# 阻塞读取新消息,超时 5 秒
# 创建消费者组
XGROUP CREATE orders order-group $ MKSTREAM
# 消费者读取
XREADGROUP GROUP order-group consumer1 COUNT 1 STREAMS orders >
# 确认消息
XACK orders order-group 1744000000000-0
# 查看消费者组信息
XINFO GROUPS orders
# 查看待处理消息
XPENDING orders order-groupGEO — 地理位置
# 添加位置
GEOADD locations 113.934 22.529 "深圳北站"
GEOADD locations 114.057 22.543 "深圳福田站"
GEOADD locations 113.884 22.556 "深圳西站"
# 计算距离
GEODIST locations "深圳北站" "深圳福田站" km
# 返回:约 13 公里
# 搜索附近
GEOSEARCH locations FROMMEMBER "深圳北站" BYRADIUS 20 km WITHDIST WITHCOORD
# 搜索 20km 内的地点,返回距离和坐标.NET 集成进阶
分布式锁
/// <summary>
/// Redis 分布式锁
/// </summary>
public class RedisDistributedLock
{
private readonly ConnectionMultiplexer _redis;
private readonly string _lockPrefix = "lock:";
public RedisDistributedLock(ConnectionMultiplexer redis)
{
_redis = redis;
}
// 加锁
public async Task<string?> AcquireAsync(string resource, TimeSpan expiry, TimeSpan wait, CancellationToken ct = default)
{
var db = _redis.GetDatabase();
var lockId = Guid.NewGuid().ToString();
var lockKey = $"{_lockPrefix}{resource}";
var start = DateTime.UtcNow;
while (DateTime.UtcNow - start < wait)
{
var acquired = await db.StringSetAsync(lockKey, lockId, expiry, When.NotExists);
if (acquired) return lockId;
await Task.Delay(50, ct);
}
return null; // 获取锁失败
}
// 解锁(Lua 脚本保证原子性)
public async Task<bool> ReleaseAsync(string resource, string lockId)
{
var db = _redis.GetDatabase();
var lockKey = $"{_lockPrefix}{resource}";
var script = @"
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end";
var result = await db.ScriptEvaluateAsync(script,
new RedisKey[] { lockKey },
new RedisValue[] { lockId });
return (int)result == 1;
}
}缓存策略
/// <summary>
/// Redis 缓存策略
/// </summary>
public class CacheService
{
private readonly IDatabase _redis;
public CacheService(IConnectionMultiplexer redis)
{
_redis = redis.GetDatabase();
}
// 缓存穿透 — 布隆过滤器 + 空值缓存
public async Task<T?> GetWithBloomFilterAsync<T>(string key, Func<Task<T?>> factory, TimeSpan? expiry = null)
{
// 1. 查缓存
var value = await _redis.StringGetAsync(key);
if (value.HasValue)
{
if (value == "NULL") return default; // 空值缓存
return JsonSerializer.Deserialize<T>(value!);
}
// 2. 查数据库
var result = await factory();
if (result != null)
{
await _redis.StringSetAsync(key, JsonSerializer.Serialize(result), expiry ?? TimeSpan.FromMinutes(30));
}
else
{
// 缓存空值,防止穿透
await _redis.StringSetAsync(key, "NULL", TimeSpan.FromMinutes(5));
}
return result;
}
// 缓存雪崩 — 随机过期时间
public async Task SetWithRandomExpiryAsync<T>(string key, T value, TimeSpan baseExpiry)
{
var random = new Random();
var jitter = TimeSpan.FromSeconds(random.Next(0, 300)); // 0-300秒随机
await _redis.StringSetAsync(key, JsonSerializer.Serialize(value), baseExpiry + jitter);
}
}哨兵模式
哨兵配置
# sentinel.conf
port 26379
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 30000
sentinel failover-timeout mymaster 180000
sentinel parallel-syncs mymaster 1// .NET 连接哨兵
var config = new ConfigurationOptions
{
ServiceName = "mymaster",
AbortOnConnectFail = false
};
config.EndPoints.Add("sentinel1:26379");
config.EndPoints.Add("sentinel2:26379");
config.EndPoints.Add("sentinel3:26379");
var redis = ConnectionMultiplexer.Connect(config);Lua 脚本进阶
原子操作与限流
# 令牌桶限流(Lua 脚本)
# 限流规则:每分钟最多 100 次请求
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = tonumber(redis.call('INCR', key))
if current == 1 then
redis.call('EXPIRE', key, window)
end
if current > limit then
return 0 -- 超过限制
else
return current -- 返回当前计数
end
# 使用方式
EVAL "local key=KEYS[1] local limit=tonumber(ARGV[1]) local window=tonumber(ARGV[2]) local current=tonumber(redis.call('INCR',key)) if current==1 then redis.call('EXPIRE',key,window) end if current>limit then return 0 else return current end" 1 rate_limit:user:1001 100 60// .NET 中使用 Lua 脚本实现滑动窗口限流
public class RedisRateLimiter
{
private readonly IDatabase _redis;
private const string SlidingWindowScript = @"
local key = KEYS[1]
local window = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
local count = redis.call('ZCARD', key)
if count < limit then
redis.call('ZADD', key, now, now .. ':' .. math.random(1000000))
redis.call('EXPIRE', key, window / 1000 + 1)
return count + 1
else
return 0
end";
public async Task<bool> IsAllowedAsync(string userId, int limit, int windowMs)
{
var key = $"rate_limit:{userId}";
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var result = await _redis.ScriptEvaluateAsync(
SlidingWindowScript,
new RedisKey[] { key },
new RedisValue[] { windowMs, limit, now });
return (int)result > 0;
}
}库存扣减(原子操作)
// Lua 脚本实现原子库存扣减
public class InventoryService
{
private readonly IDatabase _redis;
private const string DeductScript = @"
local stock_key = KEYS[1]
local order_key = KEYS[2]
local quantity = tonumber(ARGV[1])
local order_id = ARGV[2]
local stock = tonumber(redis.call('GET', stock_key))
if stock == nil or stock < quantity then
return -1 -- 库存不足
end
redis.call('DECRBY', stock_key, quantity)
redis.call('HSET', order_key, order_id, quantity)
return stock - quantity";
public async Task<int> DeductStockAsync(string productId, int quantity, string orderId)
{
var stockKey = $"stock:{productId}";
var orderKey = $"orders:{productId}";
var result = await _redis.ScriptEvaluateAsync(
DeductScript,
new RedisKey[] { stockKey, orderKey },
new RedisValue[] { quantity, orderId });
return (int)result;
}
}内存优化策略
内存分析与管理
# 查看内存使用情况
INFO memory
# used_memory: 当前使用内存
# used_memory_rss: 操作系统分配的内存
# used_memory_peak: 内存使用峰值
# mem_fragmentation_ratio: 碎片率
# 查看大 Key
redis-cli --bigkeys
# 扫描分析 Key 空间
SCAN 0 MATCH user:* COUNT 1000
# 查看某个 Key 的内存占用
MEMORY USAGE user:1001
# 查看所有 Key 的内存占用排名
redis-cli memory usage $(redis-cli keys '*' | head -100 | tr '\n' ' ')
# 设置内存上限
CONFIG SET maxmemory 4gb
CONFIG SET maxmemory-policy allkeys-lru
# 淘汰策略:
# noeviction — 不淘汰,写入报错(默认)
# allkeys-lru — 所有 Key 中淘汰最近最少使用
# allkeys-lfu — 所有 Key 中淘汰使用频率最低
# allkeys-random — 随机淘汰
# volatile-lru — 过期 Key 中淘汰最近最少使用
# volatile-lfu — 过期 Key 中淘汰使用频率最低
# volatile-ttl — 过期 Key 中淘汰 TTL 最短的数据结构优化
# 1. 使用 Hash 优化对象存储
# 差:多个 Key 存储一个对象的多个字段
SET user:1001:name "张三"
SET user:1001:age 25
SET user:1001:email "zhangsan@example.com"
# 好:一个 Hash 存储整个对象
HSET user:1001 name "张三" age 25 email "zhangsan@example.com"
HGETALL user:1001
# 节省 Key 的内存开销(Hash 在字段数 < 512 时使用 ziplist 编码)
# 2. 使用 Ziplist 编码优化小集合
# 当元素数量和大小都较小时,Redis 自动使用压缩列表
CONFIG SET hash-max-ziplist-entries 512
CONFIG SET hash-max-ziplist-value 64
CONFIG SET list-max-ziplist-size -2
CONFIG SET set-max-intset-entries 512
CONFIG SET zset-max-ziplist-entries 128
# 3. 位图压缩存储
# 100 万用户签到数据
# String 方式:每条 100 字节 = 100MB
# Bitmap 方式:100 万位 = 125KB
# 4. 序列化优化
# 使用 MessagePack 或 Protobuf 替代 JSON
# 减少序列化后的体积,提升网络传输效率缓存常见问题深度解析
缓存穿透解决方案
# 方案一:布隆过滤器
BF.ADD user_filter user1 user2 user3
BF.EXISTS user_filter user4
# 返回 0:确定不存在,直接拦截
# 返回 1:可能存在,继续查缓存和 DB
# 方案二:缓存空值
SET user:notfound:999 "NULL" EX 300
# 5 分钟过期,防止恶意请求反复穿透
# 方案三:请求参数校验
# 在应用层拦截明显非法的请求
# 如:ID 为负数、格式不正确等
# 方案四:热点 Key 永不过期
# 对于热点数据,设置逻辑过期而非物理过期
HSET cache:product:1001 data '{...}' expire_at 1744999999
# 后台线程定时刷新缓存雪崩解决方案
// 方案一:随机过期时间
public async Task SetCacheAsync<T>(string key, T value, TimeSpan baseExpiry)
{
var jitter = Random.Shared.Next(0, 300); // 0-300 秒随机
var actualExpiry = baseExpiry.Add(TimeSpan.FromSeconds(jitter));
await _redis.StringSetAsync(key, JsonSerializer.Serialize(value), actualExpiry);
}
// 方案二:多级缓存(本地缓存 + Redis)
public class MultiLevelCacheService
{
private readonly IDatabase _redis;
private readonly IMemoryCache _localCache;
public async Task<T?> GetAsync<T>(string key, Func<Task<T?>> factory)
{
// L1:本地缓存(毫秒级)
if (_localCache.TryGetValue(key, out T? localValue))
return localValue;
// L2:Redis 缓存
var redisValue = await _redis.StringGetAsync(key);
if (redisValue.HasValue)
{
var result = JsonSerializer.Deserialize<T>(redisValue!);
_localCache.Set(key, result, TimeSpan.FromSeconds(30));
return result;
}
// L3:数据库
var data = await factory();
if (data != null)
{
await _redis.StringSetAsync(key, JsonSerializer.Serialize(data),
TimeSpan.FromMinutes(30));
_localCache.Set(key, data, TimeSpan.FromSeconds(30));
}
return data;
}
}
// 方案三:缓存预热
// 系统启动时或定时任务加载热点数据到缓存
public async Task WarmUpCacheAsync(IEnumerable<string> productIds)
{
var pipeline = _redis.CreateBatch();
foreach (var id in productIds)
{
var key = $"product:{id}";
var product = await _productRepository.GetAsync(id);
if (product != null)
{
_ = pipeline.StringSetAsync(key,
JsonSerializer.Serialize(product),
TimeSpan.FromMinutes(30));
}
}
pipeline.Execute();
}缓存击穿解决方案
// 热点 Key 过期时大量请求同时穿透到数据库
// 方案一:互斥锁(只允许一个线程重建缓存)
public async Task<T?> GetWithMutexAsync<T>(string key, Func<Task<T?>> factory, TimeSpan expiry)
{
var value = await _redis.StringGetAsync(key);
if (value.HasValue)
return JsonSerializer.Deserialize<T>(value!);
// 获取互斥锁
var lockKey = $"mutex:{key}";
var lockValue = Guid.NewGuid().ToString();
var acquired = await _redis.StringSetAsync(lockKey, lockValue, TimeSpan.FromSeconds(10), When.NotExists);
if (acquired)
{
try
{
// 双重检查
value = await _redis.StringGetAsync(key);
if (value.HasValue)
return JsonSerializer.Deserialize<T>(value!);
var data = await factory();
if (data != null)
await _redis.StringSetAsync(key, JsonSerializer.Serialize(data), expiry);
return data;
}
finally
{
// Lua 脚本释放锁(防止误删)
await _redis.ScriptEvaluateAsync(@"
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else return 0 end",
new RedisKey[] { lockKey },
new RedisValue[] { lockValue });
}
}
else
{
// 获取锁失败,等待后重试
await Task.Delay(100);
return await GetWithMutexAsync(key, factory, expiry);
}
}
// 方案二:逻辑过期(永不过期 + 异步刷新)
public class LogicalExpiryCache<T>
{
private readonly IDatabase _redis;
private readonly ConcurrentDictionary<string, SemaphoreSlim> _locks = new();
public async Task<T?> GetAsync(string key, Func<Task<T?>> factory, TimeSpan logicalExpiry)
{
var cacheData = await _redis.StringGetAsync(key);
if (cacheData.HasValue)
{
var wrapper = JsonSerializer.Deserialize<CacheWrapper<T>>(cacheData!);
if (wrapper != null)
{
if (wrapper.ExpireAt > DateTimeOffset.UtcNow.ToUnixTimeSeconds())
return wrapper.Data;
// 逻辑过期,异步刷新
_ = RefreshCacheAsync(key, factory, logicalExpiry);
return wrapper.Data; // 返回旧数据
}
}
// 缓存不存在,同步加载
var data = await factory();
if (data != null)
{
var wrapper = new CacheWrapper<T>
{
Data = data,
ExpireAt = DateTimeOffset.UtcNow.Add(logicalExpiry).ToUnixTimeSeconds()
};
await _redis.StringSetAsync(key, JsonSerializer.Serialize(wrapper));
}
return data;
}
private async Task RefreshCacheAsync(string key, Func<Task<T?>> factory, TimeSpan logicalExpiry)
{
var semaphore = _locks.GetOrAdd(key, _ => new SemaphoreSlim(1, 1));
if (await semaphore.WaitAsync(0))
{
try
{
var data = await factory();
if (data != null)
{
var wrapper = new CacheWrapper<T>
{
Data = data,
ExpireAt = DateTimeOffset.UtcNow.Add(logicalExpiry).ToUnixTimeSeconds()
};
await _redis.StringSetAsync(key, JsonSerializer.Serialize(wrapper));
}
}
finally
{
semaphore.Release();
_locks.TryRemove(key, out _);
}
}
}
}
private class CacheWrapper<T>
{
public T? Data { get; set; }
public long ExpireAt { get; set; }
}Pipeline 与事务
Pipeline 批量操作
// Pipeline 减少网络往返次数
public async Task<List<string>> BatchGetAsync(IEnumerable<string> keys)
{
var db = _redis.GetDatabase();
var keyList = keys.ToList();
var values = new List<string>();
// 使用 Pipeline 批量获取
var batch = db.CreateBatch();
var tasks = keyList.Select(key => batch.StringGetAsync(key)).ToList();
batch.Execute();
foreach (var task in tasks)
{
var result = await task;
values.Add(result.HasValue ? result! : string.Empty);
}
return values;
}
// 性能对比:
// 逐条 GET 1000 个 Key:约 1000 次网络 RTT
// Pipeline 1000 个 Key:约 1 次网络 RTT
// 注意:Pipeline 中命令不能依赖前一个命令的结果Redis 事务
# MULTI/EXEC 事务
MULTI
SET account:A 100
SET account:B 100
EXEC
# WATCH 乐观锁
WATCH account:A
balance = GET account:A
if balance >= 50
MULTI
DECRBY account:A 50
INCRBY account:B 50
EXEC
else
UNWATCH
end
# 事务特点:
# 1. 事务中的命令按顺序执行,不会被其他客户端打断
# 2. EXEC 之前入队的命令不会执行(只是排队)
# 3. 如果 WATCH 的 Key 被修改,整个事务取消
# 4. 事务不支持回滚,某个命令出错不影响其他命令执行
# 5. 不适合需要回滚的场景,复杂逻辑用 Lua 脚本Redis Module 扩展
常用模块
# RedisSearch — 全文搜索
# 替代 Elasticsearch 的轻量方案
FT.CREATE products_idx ON JSON PREFIX 1 product: SCHEMA $.name TEXT $.category TAG $.price NUMERIC
FT.SEARCH products_idx "@category:手机 @price:[1000 5000]"
FT.AGGREGATE products_idx "@category:手机" GROUPBY 1 @category REDUCE COUNT 0 AS count
# RedisJSON — JSON 数据类型
JSON.SET product:1001 $ '{"name":"iPhone 15","price":7999,"tags":["手机","Apple"]}'
JSON.GET product:1001 $.name
JSON.SET product:1001 $.price 7499
JSON.ARRAPPEND product:1001 $.tags '"促销"'
# RedisBloom — 布隆过滤器
BF.ADD user_filter user123
BF.EXISTS user_filter user123
BF.RESERVE spam_filter 0.001 1000000 # 误判率 0.1%,容量 100 万
# RedisTimeSeries — 时序数据
TS.CREATE temperature:room1 RETENTION 86400000 LABELS room 1
TS.ADD temperature:room1 * 23.5
TS.RANGE temperature:room1 - AGGREGATION avg 60000 # 按分钟聚合优点
缺点
总结
Redis 进阶的核心:持久化策略保证数据安全,哨兵/集群保证高可用,Lua 脚本保证原子性,Stream 提供消息队列能力。生产环境推荐 AOF + 哨兵模式。缓存策略注意穿透、雪崩和击穿三大问题。
关键知识点
- 数据库主题一定要同时看数据模型、读写模式和执行代价。
- 很多性能问题不是 SQL 语法问题,而是索引、统计信息、事务和数据分布问题。
- 高可用、备份、迁移和治理与查询优化同样重要。
- 缓存与开关类主题都在处理“配置/数据与运行时行为之间的解耦”。
项目落地视角
- 所有优化前后都保留执行计划、样本 SQL 和关键指标对比。
- 上线前准备回滚脚本、备份点和校验方案。
- 把连接池、锁等待、慢查询和容量增长纳入日常巡检。
- 明确 Key 设计、过期策略、回源逻辑和降级方案。
常见误区
- 脱离真实数据分布讨论索引或分片。
- 只看单条 SQL,不看整条业务链路的事务和锁。
- 把测试环境结论直接等同于生产环境结论。
- 只加缓存,不设计失效与一致性策略。
进阶路线
- 继续向执行计划、存储引擎、复制机制和数据治理层深入。
- 把主题与 ORM、缓存、消息队列和归档策略联动起来思考。
- 沉淀成数据库设计规范、SQL 审核规则和变更流程。
- 继续补齐多级缓存、缓存预热、分布式缓存治理和旗标管理平台。
适用场景
- 当你准备把《Redis 高级应用》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合数据建模、查询优化、事务控制、高可用和迁移治理。
- 当系统开始遇到慢查询、锁冲突、热点数据或容量增长时,这类主题价值最高。
落地建议
- 先分析真实查询模式、数据量级和写入特征,再决定索引或分片策略。
- 所有优化结论都结合执行计划、样本数据和监控指标验证。
- 高风险操作前准备备份、回滚脚本与校验 SQL。
排错清单
- 先确认瓶颈在 CPU、I/O、锁等待、网络还是 SQL 本身。
- 检查执行计划是否走错索引、是否发生排序或全表扫描。
- 排查长事务、隐式类型转换、统计信息过期和参数嗅探。
复盘问题
- 如果把《Redis 高级应用》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《Redis 高级应用》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《Redis 高级应用》最大的收益和代价分别是什么?
