并发集合与无锁编程
大约 11 分钟约 3284 字
并发集合与无锁编程
简介
在多线程环境中,标准集合(List<T>、Dictionary<TKey,TValue>)不是线程安全的。.NET 提供了专门的并发集合,使用细粒度锁或无锁技术(CAS)实现高并发访问。理解这些集合的内部原理有助于选择正确的并发数据结构。
特点
ConcurrentDictionary
原理与使用
// ConcurrentDictionary — 内部使用分段锁(多个 Monitor)
// 默认 4 * Environment.ProcessorCount 个分段
var dict = new ConcurrentDictionary<string, int>();
// 原子操作方法
dict.TryAdd("key", 1); // 添加
dict.TryUpdate("key", 2, 1); // CAS 更新(旧值必须匹配)
dict.AddOrUpdate("key", k => 1, (k, v) => v + 1); // 添加或更新
dict.GetOrAdd("key", k => ComputeValue(k)); // 获取或添加
// AddOrUpdate 的原子性保证
// factory 可能在高并发下被多次调用
// 但最终只有一个结果被采用
dict.AddOrUpdate("counter",
_ => 1, // 不存在时的工厂
(_, old) => old + 1 // 存在时的更新
);
// 性能优化:避免 factory 重复执行
// 如果 factory 很昂贵,使用 Lazy<T>
var lazyDict = new ConcurrentDictionary<string, Lazy<ExpensiveObject>>();
var value = lazyDict.GetOrAdd("key",
k => new Lazy<ExpensiveObject>(() => CreateExpensive(k))).Value;
// 遍历是安全的(快照语义)
// 遍历期间其他线程可以修改,遍历看到的是某个时刻的一致快照
foreach (var kv in dict)
{
Console.WriteLine($"{kv.Key}: {kv.Value}");
}
// 批量操作注意事项
// Clear() 是线程安全的,但与遍历竞争
// Count 属性是 O(n) 的(需要遍历分段)高级使用模式
// 模式 1:计数器
var counters = new ConcurrentDictionary<string, int>();
Parallel.For(0, 10000, i =>
{
counters.AddOrUpdate("total", _ => 1, (_, old) => old + 1);
});
Console.WriteLine(counters["total"]); // 10000
// 模式 2:缓存
var cache = new ConcurrentDictionary<string, CachedItem>();
class CachedItem
{
public byte[] Data { get; set; }
public DateTime ExpiresAt { get; set; }
public bool IsExpired => DateTime.UtcNow > ExpiresAt;
}
// 模式 3:并发聚合
var results = new ConcurrentDictionary<string, List<int>>();
var aggregate = new ConcurrentDictionary<string, ConcurrentBag<int>>();
Parallel.ForEach(data, item =>
{
var bag = aggregate.GetOrAdd(item.Category, _ => new ConcurrentBag<int>());
bag.Add(item.Value);
});
// 模式 4:幂等去重
var seen = new ConcurrentDictionary<string, byte>();
Parallel.ForEach(messages, msg =>
{
if (seen.TryAdd(msg.Id, 0))
{
ProcessMessage(msg); // 只处理一次
}
});ConcurrentQueue 与 ConcurrentStack
无锁队列
// ConcurrentQueue<T> — 无锁(CAS)FIFO 队列
var queue = new ConcurrentQueue<int>();
// 入队(多线程安全)
queue.Enqueue(1);
queue.Enqueue(2);
queue.Enqueue(3);
// 出队
if (queue.TryDequeue(out int item))
Console.WriteLine(item); // 1
// 查看队首(不出队)
if (queue.TryPeek(out int peek))
Console.WriteLine(peek); // 2
// 内部实现:链表 + Interlocked
// 每个节点包含值和下一个指针
// 入队:CAS 更新 tail 的 Next 指针
// 出队:CAS 更新 head 指针
// 无锁意味着不会有线程被阻塞
// 适合:生产者-消费者模式(非异步)无锁栈
// ConcurrentStack<T> — 无锁 LIFO 栈
var stack = new ConcurrentStack<int>();
stack.Push(1);
stack.Push(2);
stack.Push(3);
// 批量入栈
stack.PushRange(new[] { 4, 5, 6 });
// 出栈
if (stack.TryPop(out int top))
Console.WriteLine(top); // 6
// 批量出栈
var buffer = new int[3];
int popped = stack.TryPopRange(buffer);
// buffer[..popped] 包含弹出的元素
// 内部实现:头插法 + CAS
// Push: CAS 更新 head 指针
// Pop: CAS 读取并更新 headConcurrentBag
线程本地存储包
// ConcurrentBag<T> — 无序集合,使用 ThreadLocal
// 同一线程添加和取出最快
var bag = new ConcurrentBag<int>();
Parallel.For(0, 1000, i =>
{
bag.Add(i); // 添加到当前线程的本地列表
});
// 取出(LIFO 在本地列表中)
while (bag.TryTake(out int item))
{
// 本地线程取出 O(1)
// 其他线程取出需要窃取 O(n)
}
// 适用场景:
// 1. 同一线程既生产又消费(如 Fork-Join)
// 2. 不关心顺序
// 3. 高并发添加 + 低并发取出
// Fork-Join 示例
var results = new ConcurrentBag<Result>();
Parallel.ForEach(source, item =>
{
var result = Process(item);
results.Add(result); // 添加到线程本地
});
var finalResult = Aggregate(results);Channel 异步通道
生产者-消费者通道
// System.Threading.Channels — 异步生产者消费者
// 比 BlockingCollection 更现代,支持 async
// 创建无限通道
var channel = Channel.CreateUnbounded<string>();
// 创建有界通道(背压控制)
var bounded = Channel.CreateBounded<string>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait, // 满时等待
// FullMode = BoundedChannelFullMode.DropOldest, // 满时丢弃最旧
// FullMode = BoundedChannelFullMode.DropNewest, // 满时丢弃最新
SingleReader = true, // 单消费者优化
SingleWriter = true, // 单生产者优化
});
// 生产者
async Task ProduceAsync(ChannelWriter<string> writer)
{
for (int i = 0; i < 100; i++)
{
await writer.WriteAsync($"Message {i}");
}
writer.Complete(); // 标记完成
}
// 消费者
async Task ConsumeAsync(ChannelReader<string> reader)
{
await foreach (var item in reader.ReadAllAsync()) // IAsyncEnumerable
{
Console.WriteLine(item);
}
// ReadAllAsync 在 writer.Complete() 后自然结束
}
// 使用
var ch = Channel.CreateBbounded<int>(1000);
var producerTask = ProduceAsync(ch.Writer);
var consumerTask = ConsumeAsync(ch.Reader);
await Task.WhenAll(producerTask, consumerTask);Channel 高级模式
// 多消费者(工头模式)
async Task FanOutAsync<T>(
ChannelReader<T> reader,
Func<T, Task> processor,
int workerCount)
{
var workers = Enumerable.Range(0, workerCount)
.Select(_ => ProcessWorkerAsync(reader, processor))
.ToArray();
await Task.WhenAll(workers);
}
async Task ProcessWorkerAsync<T>(ChannelReader<T> reader, Func<T, Task> processor)
{
await foreach (var item in reader.ReadAllAsync())
{
await processor(item);
}
}
// 管道模式(多阶段处理)
var stage1 = Channel.CreateBounded<RawData>(100);
var stage2 = Channel.CreateBounded<ProcessedData>(100);
var stage3 = Channel.CreateBounded<FinalResult>(100);
// Stage 1: 解析
Task.Run(async () =>
{
await foreach (var raw in stage1.Reader.ReadAllAsync())
{
var parsed = Parse(raw);
await stage2.Writer.WriteAsync(parsed);
}
stage2.Writer.Complete();
});
// Stage 2: 处理
Task.Run(async () =>
{
await foreach (var data in stage2.Reader.ReadAllAsync())
{
var result = Process(data);
await stage3.Writer.WriteAsync(result);
}
stage3.Writer.Complete();
});
// Stage 3: 输出
Task.Run(async () =>
{
await foreach (var result in stage3.Reader.ReadAllAsync())
{
Output(result);
}
});不可变集合
线程安全的快照
// System.Collections.Immutable
// 每次修改返回新集合,原集合不变
// ImmutableDictionary
var dict = ImmutableDictionary<string, int>.Empty;
var dict2 = dict.Add("a", 1); // 返回新字典
var dict3 = dict2.Add("b", 2); // dict2 不变
// 内部使用树结构,共享未修改的节点
// Add/Remove 是 O(log n),但比深拷贝 O(n) 快得多
// ImmutableArray
var arr = ImmutableArray<int>.Empty;
var arr2 = arr.Add(1);
var arr3 = arr2.AddRange(new[] { 2, 3, 4 });
// ImmutableList
var list = ImmutableList<int>.Empty;
var list2 = list.Add(1).Add(2).Add(3);
var list3 = list2.Insert(1, 99); // [1, 99, 2, 3]
// Builder 模式(批量修改)
var builder = ImmutableList.CreateBuilder<int>();
for (int i = 0; i < 1000; i++)
builder.Add(i);
var immutable = builder.ToImmutable(); // 一次性创建
// 适用场景:
// 1. 快照语义(配置、规则等不频繁修改的数据)
// 2. 线程安全读取(不需要锁)
// 3. 函数式编程(数据不可变)优点
缺点
总结
ConcurrentDictionary 使用分段锁,适合高并发字典操作;注意 AddOrUpdate 的 factory 可能被多次调用。ConcurrentQueue/Stack 使用 CAS 无锁实现,适合生产者消费者模式。ConcurrentBag 使用线程本地存储,适合同一线程存取的 Fork-Join 场景。Channel<T> 是异步生产者消费者的最佳选择,支持背压控制和 IAsyncEnumerable。不可变集合通过结构共享实现高效快照,适合配置和规则数据。
关键知识点
- 先明确这个主题影响的是语法层、运行时层,还是性能与可维护性层。
- 学习时要同时关注语言表面写法和编译器、JIT、GC 等底层行为。
- 真正有价值的是知道“为什么这样写”和“在什么边界下不能这样写”。
项目落地视角
- 把示例改成最小可运行样例,并观察编译输出、运行结果和异常行为。
- 如果它会进入团队代码规范,最好同步补充命名约定、禁用场景和替代方案。
- 涉及性能结论时,优先用 Benchmark 或实际热点链路验证,而不是凭感觉判断。
常见误区
- 只记语法糖,不知道底层成本。
- 把适用于小样例的写法直接搬到高并发或大对象场景里。
- 忽略框架版本、语言版本和运行时差异,导致结论失真。
进阶路线
- 继续向源码、IL、JIT 行为和 BCL 实现层深入。
- 把知识点和代码评审、性能诊断、面试复盘结合起来。
- 把同类主题做横向对比,例如值类型与引用类型、迭代器与 async 状态机、反射与 Source Generator。
适用场景
- 当你准备把《并发集合与无锁编程》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合在需要理解语言特性、运行时行为或 API 边界时阅读。
- 当代码开始出现性能瓶颈、可维护性问题或语义歧义时,这类主题会直接影响实现质量。
落地建议
- 先写最小可运行样例,再把结论迁移到真实业务代码。
- 同时记录这个特性的收益、限制和替代方案,避免为了“高级”而使用。
- 涉及内存、并发或序列化时,最好配合调试器或基准测试验证。
排错清单
- 先确认问题属于编译期、运行期还是语义误用。
- 检查是否存在隐式转换、装箱拆箱、闭包捕获或上下文切换等隐藏成本。
- 查看异常栈、日志和最小复现代码,优先排除使用姿势问题。
复盘问题
- 如果把《并发集合与无锁编程》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《并发集合与无锁编程》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《并发集合与无锁编程》最大的收益和代价分别是什么?
ConcurrentDictionary 高级模式
// 模式 5:带过期时间的缓存
public class ConcurrentCache<TKey, TValue> where TKey : notnull
{
private readonly ConcurrentDictionary<TKey, CacheEntry> _cache = new();
private readonly TimeSpan _defaultTtl;
private readonly Timer _cleanupTimer;
public ConcurrentCache(TimeSpan? defaultTtl = null)
{
_defaultTtl = defaultTtl ?? TimeSpan.FromMinutes(5);
// 定期清理过期条目
_cleanupTimer = new Timer(_ => CleanupExpired(), null,
TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
}
public TValue GetOrAdd(TKey key, Func<TKey, TValue> factory,
TimeSpan? ttl = null)
{
var entry = _cache.AddOrUpdate(key,
_ => new CacheEntry(factory(key), ttl ?? _defaultTtl),
(_, existing) => existing.IsExpired
? new CacheEntry(factory(key), ttl ?? _defaultTtl)
: existing);
return entry.Value;
}
public bool TryRemove(TKey key) => _cache.TryRemove(key, out _);
private void CleanupExpired()
{
foreach (var kv in _cache)
{
if (kv.Value.IsExpired)
_cache.TryRemove(kv.Key, out _);
}
}
private record CacheEntry(TValue Value, TimeSpan Ttl)
{
public DateTime ExpiresAt { get; } = DateTime.UtcNow.Add(Ttl);
public bool IsExpired => DateTime.UtcNow > ExpiresAt;
}
}
// 使用
var cache = new ConcurrentCache<string, string>(TimeSpan.FromMinutes(10));
var data = cache.GetOrAdd("user:1001", key => LoadUserFromDb(key));并发集合性能基准
// 性能基准测试(BenchmarkDotNet)
// ConcurrentDictionary vs Dictionary + lock
/*
| Method | Mean | Error |
|--------------------- |----------:|---------:|
| ConcurrentDict_Add | 45.2 ns | 0.8 ns |
| LockedDict_Add | 28.1 ns | 0.5 ns |
| ConcurrentDict_Read | 12.3 ns | 0.2 ns |
| LockedDict_Read | 15.7 ns | 0.3 ns |
| ConcurrentDict_Mixed | 38.6 ns | 0.7 ns |
| LockedDict_Mixed | 52.4 ns | 1.0 ns |
*/
// 结论:
// - 读多写少:ConcurrentDictionary 优于 locked Dictionary
// - 写多读少:locked Dictionary 可能更优(锁粒度可控)
// - 混合场景:ConcurrentDictionary 优势明显(分段锁)
// ConcurrentQueue vs BlockingCollection
/*
| Method | Mean | Error |
|---------------------- |-----------:|----------:|
| ConcurrentQueue | 8.2 ns | 0.1 ns |
| BlockingCollection | 15.6 ns | 0.3 ns |
| Channel_Unbounded | 22.4 ns | 0.4 ns |
| Channel_Bounded | 45.8 ns | 0.8 ns |
*/
// 选择建议:
// - 高频入队出队:ConcurrentQueue
// - 异步等待:Channel
// - 阻塞等待:BlockingCollection(已过时,推荐 Channel)并发集合的线程安全边界
// 并发集合的线程安全保证
// 1. ConcurrentDictionary
// - 单个操作是线程安全的(TryAdd, TryGetValue, etc.)
// - 组合操作不是原子的
var dict = new ConcurrentDictionary<string, int>();
// 错误:Check-then-Act 竞态条件
if (!dict.ContainsKey("key")) // 检查
dict.TryAdd("key", 1); // 添加 — 另一个线程可能已经添加了
// 正确:使用 GetOrAdd(原子操作)
dict.GetOrAdd("key", _ => 1);
// 2. ConcurrentQueue
// - TryDequeue 是原子的
// - 先 TryPeek 再 TryDequeue 不是原子的
var queue = new ConcurrentQueue<int>();
// 错误:
// if (queue.TryPeek(out _))
// queue.TryDequeue(out _); // 另一个线程可能已经出队
// 正确:直接 TryDequeue
queue.TryDequeue(out var item);
// 3. 遍历安全但不是快照
// ConcurrentDictionary 的遍历是"弱一致性"的:
// - 遍历过程中看到的是某个时刻的一致视图
// - 但不保证看到最新写入
// - 不抛出异常(不像普通 Dictionary)
// 4. Count 属性的代价
// ConcurrentDictionary.Count 是 O(n) 的
// 不要在热路径中频繁调用
// 如果需要知道是否为空,使用 IsEmpty(O(1))
if (!dict.IsEmpty)
{
// 安全操作
}生产者-消费者完整模式
// 完整的生产者-消费者模式示例
using System.Threading.Channels;
public class DataPipeline<TInput, TOutput>
{
private readonly Channel<TInput> _input;
private readonly Channel<TOutput> _output;
private readonly Func<TInput, Task<TOutput>> _processor;
private readonly int _workerCount;
private readonly CancellationTokenSource _cts = new();
public DataPipeline(
Func<TInput, Task<TOutput>> processor,
int capacity = 1000,
int workerCount = 4)
{
_processor = processor;
_workerCount = workerCount;
_input = Channel.CreateBounded<TInput>(new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleWriter = true
});
_output = Channel.CreateBounded<TOutput>(capacity);
}
public async Task StartAsync()
{
// 启动多个消费者
var workers = Enumerable.Range(0, _workerCount)
.Select(_ => WorkerAsync(_cts.Token))
.ToArray();
_ = Task.WhenAll(workers);
}
private async Task WorkerAsync(CancellationToken ct)
{
await foreach (var item in _input.Reader.ReadAllAsync(ct))
{
try
{
var result = await _processor(item);
await _output.Writer.WriteAsync(result, ct);
}
catch (Exception ex)
{
// 错误处理:记录日志,继续处理下一个
Console.Error.WriteLine($"处理失败: {ex.Message}");
}
}
}
public async Task ProduceAsync(TInput item)
{
await _input.Writer.WriteAsync(item, _cts.Token);
}
public void Complete() => _input.Writer.Complete();
public IAsyncEnumerable<TOutput> ConsumeAsync(CancellationToken ct = default)
{
return _output.Reader.ReadAllAsync(ct);
}
public async ValueTask DisposeAsync()
{
_cts.Cancel();
_cts.Dispose();
}
}
// 使用
var pipeline = new DataPipeline<string, string>(
processor: async (input) =>
{
await Task.Delay(10); // 模拟处理
return input.ToUpper();
},
capacity: 100,
workerCount: 4
);
await pipeline.StartAsync();
// 生产
for (int i = 0; i < 100; i++)
await pipeline.ProduceAsync($"item-{i}");
pipeline.Complete();
// 消费
await foreach (var result in pipeline.ConsumeAsync())
{
Console.WriteLine(result);
}