线程同步原语深入
大约 13 分钟约 3796 字
线程同步原语深入
简介
多线程编程中,同步原语用于协调线程间的执行顺序和资源共享。C# 提供了从轻量级(Interlocked、Volatile)到重量级(Monitor、Mutex、Semaphore)的多层次同步机制。理解各同步原语的原理和适用场景是编写正确并发程序的关键。
特点
原子操作与内存模型
Interlocked 类
// Interlocked — 硬件级原子操作(CAS: Compare-And-Swap)
// 所有操作都是原子的,不需要锁
// 原子递增/递减
int counter = 0;
Interlocked.Increment(ref counter); // counter++ (原子)
Interlocked.Decrement(ref counter); // counter-- (原子)
Interlocked.Add(ref counter, 10); // counter += 10 (原子)
// 原子交换
int old = Interlocked.Exchange(ref counter, 100); // 设为100,返回旧值
// 条件交换(CAS)
int expected = 100;
int newValue = 200;
// 如果 counter == expected,设为 newValue
int result = Interlocked.CompareExchange(ref counter, newValue, expected);
Console.WriteLine(result); // 100(旧值)
// 使用 CAS 实现自旋锁
public class SimpleSpinLock
{
private int _locked = 0;
public void Enter()
{
while (Interlocked.CompareExchange(ref _locked, 1, 0) != 0)
{
// 自旋等待(可以加 Thread.SpinWait 优化)
Thread.SpinWait(10);
}
}
public void Exit()
{
Volatile.Write(ref _locked, 0); // 确保写入对所有线程可见
}
}
// 原子更新模式(在循环中 CAS)
int AddIfPositive(ref int location, int addition)
{
int current, newValue;
do
{
current = Volatile.Read(ref location);
if (current <= 0) return current;
newValue = current + addition;
}
while (Interlocked.CompareExchange(ref location, newValue, current) != current);
return newValue;
}volatile 与内存可见性
// volatile — 保证读写的可见性,不保证原子性
// 1. volatile 读:acquire 语义(之后的读写不会重排到之前)
// 2. volatile 写:release 语义(之前的读写不会重排到之后)
// 没有 volatile 的问题
class BrokenFlag
{
private bool _stop = false; // ❌ 没有 volatile
public void Worker()
{
while (!_stop) // 可能永远看不到 _stop 的变化!
{ // JIT 可能优化为:if (!_stop) while (true) { }
DoWork();
}
}
public void Stop() => _stop = true;
}
// ✅ 使用 volatile
class CorrectFlag
{
private volatile bool _stop = false;
public void Worker()
{
while (!_stop) // volatile 确保每次从内存读取
{
DoWork();
}
}
public void Stop() => _stop = true;
}
// Volatile.Read / Volatile.Write(方法级控制)
class FineGrainedVolatile
{
private int _value;
public int ReadValue() => Volatile.Read(ref _value); // acquire
public void WriteValue(int v) => Volatile.Write(ref _value, v); // release
}
// 注意:volatile 不能用于所有类型
// ❌ volatile 不能用于:double, long (32位), decimal, 自定义 struct
// ✅ volatile 可以用于:int, bool, float, byte, reference typesMonitor 与 lock
lock 语句深入
// lock 语句是 Monitor.Enter/Exit 的语法糖
object _lockObj = new();
lock (_lockObj)
{
// 临界区
}
// 等价于:
bool lockTaken = false;
try
{
Monitor.Enter(_lockObj, ref lockTaken);
// 临界区
}
finally
{
if (lockTaken) Monitor.Exit(_lockObj);
}
// Monitor 高级功能
// TryEnter — 尝试获取锁(超时)
if (Monitor.TryEnter(_lockObj, TimeSpan.FromSeconds(5)))
{
try { /* 获得锁 */ }
finally { Monitor.Exit(_lockObj); }
}
else
{
// 获取锁超时
}
// Wait/Pulse — 等待/通知模式
class ProducerConsumer<T>
{
private readonly Queue<T> _queue = new();
private readonly int _maxSize;
private readonly object _lock = new();
public void Produce(T item)
{
lock (_lock)
{
while (_queue.Count >= _maxSize)
Monitor.Wait(_lock); // 释放锁并等待
_queue.Enqueue(item);
Monitor.Pulse(_lock); // 通知一个等待的线程
}
}
public T Consume()
{
lock (_lock)
{
while (_queue.Count == 0)
Monitor.Wait(_lock);
T item = _queue.Dequeue();
Monitor.Pulse(_lock);
return item;
}
}
}
// 注意事项:
// 1. lock 对象不要是 this、typeof(X)、string
// 2. 不要在 lock 内调用外部代码(可能死锁)
// 3. lock 不支持跨进程ReaderWriterLock
读写锁
// ReaderWriterLockSlim — 多读单锁
// 适合读多写少的场景
class ThreadSafeCache<TKey, TValue> where TKey : notnull
{
private readonly Dictionary<TKey, TValue> _cache = new();
private readonly ReaderWriterLockSlim _rwLock = new();
public TValue? Get(TKey key)
{
_rwLock.EnterReadLock();
try
{
_cache.TryGetValue(key, out var value);
return value;
}
finally
{
_rwLock.ExitReadLock();
}
}
public void Set(TKey key, TValue value)
{
_rwLock.EnterWriteLock();
try
{
_cache[key] = value;
}
finally
{
_rwLock.ExitWriteLock();
}
}
// 升级锁(读锁 → 写锁)
public TValue GetOrAdd(TKey key, Func<TKey, TValue> factory)
{
_rwLock.EnterUpgradeableReadLock();
try
{
if (_cache.TryGetValue(key, out var value))
return value;
_rwLock.EnterWriteLock();
try
{
// Double-check(可能在等写锁时其他线程已添加)
if (_cache.TryGetValue(key, out value))
return value;
value = factory(key);
_cache[key] = value;
return value;
}
finally
{
_rwLock.ExitWriteLock();
}
}
finally
{
_rwLock.ExitUpgradeableReadLock();
}
}
}信号机制
EventWaitHandle 家族
// AutoResetEvent — 一次通知一个等待者
var are = new AutoResetEvent(false); // 初始无信号
// 等待线程
ThreadPool.QueueUserWorkItem(_ =>
{
Console.WriteLine("等待中...");
are.WaitOne(); // 阻塞直到收到信号
Console.WriteLine("收到信号!");
});
Thread.Sleep(1000);
are.Set(); // 释放一个等待者
// ManualResetEvent — 通知所有等待者
var mre = new ManualResetEvent(false);
// mre.Set() 后所有等待线程都通过
// mre.Reset() 重置为无信号
// CountdownEvent — 等待 N 个操作完成
var countdown = new CountdownEvent(5);
for (int i = 0; i < 5; i++)
{
ThreadPool.QueueUserWorkItem(_ =>
{
DoWork();
countdown.Signal(); // 完成一个
});
}
countdown.Wait(); // 等待全部完成
Console.WriteLine("全部完成");
// Barrier — 多线程汇合点
var barrier = new Barrier(3, b =>
{
Console.WriteLine($"第 {b.CurrentPhaseNumber} 阶段完成");
});
for (int i = 0; i < 3; i++)
{
var thread = new Thread(id =>
{
for (int phase = 0; phase < 3; phase++)
{
DoPhaseWork(phase);
barrier.SignalAndWait(); // 等待所有线程到达
}
});
thread.Start(i);
}SemaphoreSlim 异步等待
// SemaphoreSlim — 支持异步的信号量
// 限制并发访问数量
class RateLimiter
{
private readonly SemaphoreSlim _semaphore;
public RateLimiter(int maxConcurrency) =>
_semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
public async Task<T> ExecuteAsync<T>(Func<Task<T>> operation)
{
await _semaphore.WaitAsync();
try
{
return await operation();
}
finally
{
_semaphore.Release();
}
}
public async Task<bool> TryExecuteAsync(Func<Task> operation, TimeSpan timeout)
{
if (!await _semaphore.WaitAsync(timeout))
return false;
try
{
await operation();
return true;
}
finally
{
_semaphore.Release();
}
}
}
// 使用
var limiter = new RateLimiter(10); // 最多 10 并发
var tasks = urls.Select(url => limiter.ExecuteAsync(() => FetchAsync(url)));
await Task.WhenAll(tasks);Mutex 与跨进程同步
Mutex — 跨进程互斥
// Mutex — 操作系统级互斥锁,可跨进程
// 常见用途:单实例应用程序
class SingleInstanceApp
{
private static Mutex? _mutex;
public static bool TryAcquire(string appName)
{
_mutex = new Mutex(true, $"Global\\{appName}", out bool createdNew);
if (!createdNew)
{
// 已有实例在运行
return false;
}
return true;
}
public static void Release()
{
_mutex?.ReleaseMutex();
_mutex?.Dispose();
}
}
// 使用示例
if (!SingleInstanceApp.TryAcquire("MyApp"))
{
Console.WriteLine("应用程序已在运行");
return;
}
// Mutex 带超时的获取
using var mutex = new Mutex(false, "Global\\MyAppLock");
try
{
if (!mutex.WaitOne(TimeSpan.FromSeconds(10)))
{
Console.WriteLine("获取锁超时");
return;
}
// 临界区操作
}
finally
{
mutex.ReleaseMutex();
}
// 注意:Mutex 是操作系统对象,开销比 Monitor 大很多
// AbandonedMutexException — 持有 Mutex 的进程崩溃时触发Semaphore — 跨进程信号量
// Semaphore — 跨进程的计数信号量
// 场景:限制多进程对共享资源的并发访问
using var semaphore = new Semaphore(3, 3, "Global\\DbConnectionPool");
for (int i = 0; i < 10; i++)
{
ThreadPool.QueueUserWorkItem(_ =>
{
semaphore.WaitOne();
try
{
Console.WriteLine($"{Environment.ProcessId} 正在访问资源...");
Thread.Sleep(1000);
}
finally
{
semaphore.Release();
}
});
}
// 命名信号量 — 跨进程共享
// 使用 Global\\ 前缀在终端服务会话间共享
// 不使用前缀则仅在当前会话内共享高级并发模式
双检锁(Double-Checked Locking)
// 双检锁 — 延迟初始化的经典模式
public class LazySingleton
{
private static volatile LazySingleton? _instance;
private static readonly object _lock = new();
public static LazySingleton Instance
{
get
{
if (_instance is null) // 第一次检查(无锁)
{
lock (_lock)
{
if (_instance is null) // 第二次检查(有锁)
{
_instance = new LazySingleton();
}
}
}
return _instance;
}
}
private LazySingleton() { }
}
// 推荐:直接使用 Lazy<T>(内部已实现双检锁)
public class LazySingleton2
{
private static readonly Lazy<LazySingleton2> _instance =
new(() => new LazySingleton2(), LazyThreadSafetyMode.ExecutionAndPublication);
public static LazySingleton2 Instance => _instance.Value;
private LazySingleton2() { }
}
// LazyThreadSafetyMode 说明:
// ExecutionAndPublication — 完整加锁(默认,最安全)
// PublicationOnly — 多线程可能同时创建,但只有一个被发布
// None — 无线程安全(单线程场景)异步锁模式(AsyncLock)
//lock 语句不支持 await,需要自定义异步锁
public sealed class AsyncLock
{
private readonly SemaphoreSlim _semaphore = new(1, 1);
public async Task<IDisposable> LockAsync()
{
await _semaphore.WaitAsync();
return new Releaser(_semaphore);
}
public Task<IDisposable> LockAsync(TimeSpan timeout)
{
return LockAsyncCore(timeout);
}
private async Task<IDisposable> LockAsyncCore(TimeSpan timeout)
{
if (await _semaphore.WaitAsync(timeout))
return new Releaser(_semaphore);
throw new TimeoutException("获取异步锁超时");
}
private sealed class Releaser : IDisposable
{
private readonly SemaphoreSlim _sem;
public Releaser(SemaphoreSlim sem) => _sem = sem;
public void Dispose() => _sem.Release();
}
}
// 使用
class AsyncCache<TKey, TValue> where TKey : notnull
{
private readonly Dictionary<TKey, TValue> _cache = new();
private readonly AsyncLock _lock = new();
public async Task<TValue> GetOrAddAsync(TKey key, Func<Task<TValue>> factory)
{
// 先快速检查(无锁读)
lock (_cache)
{
if (_cache.TryGetValue(key, out var value))
return value;
}
// 异步锁保护写操作
using (await _lock.LockAsync())
{
// Double-check
if (_cache.TryGetValue(key, out var value))
return value;
value = await factory();
_cache[key] = value;
return value;
}
}
}线程安全集合与生产者消费者模式
// ConcurrentDictionary — 并发安全字典
var dict = new ConcurrentDictionary<string, int>();
// 原子添加或更新
dict.AddOrUpdate("key", 1, (_, old) => old + 1);
// 原子获取或添加
var value = dict.GetOrAdd("key", _ => ExpensiveComputation());
// 使用 Channel(推荐替代 BlockingCollection)
// System.Threading.Channels — 高性能异步生产者消费者
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = true,
Capacity = 100
});
// 生产者
async Task ProduceAsync()
{
for (int i = 0; i < 1000; i++)
{
await channel.Writer.WriteAsync($"item-{i}");
}
channel.Writer.Complete(); // 标记完成
}
// 消费者
async Task ConsumeAsync()
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
ProcessItem(item);
}
}
// 多消费者模式
async Task ConsumeWithCancellationAsync(CancellationToken ct)
{
while (await channel.Reader.WaitToReadAsync(ct))
{
if (channel.Reader.TryRead(out var item))
{
await ProcessItemAsync(item);
}
}
}SpinLock 与自旋优化
// SpinLock — 适用于极短临界区的用户态锁
// 不进行内核切换,适合持有时间 < 几十纳秒的场景
struct SpinLockExample
{
private SpinLock _lock = new();
public void Increment(ref int counter)
{
bool lockTaken = false;
try
{
_lock.Enter(ref lockTaken);
counter++;
}
finally
{
if (lockTaken) _lock.Exit();
}
}
}
// SpinWait — 自旋等待策略
// 前几次自旋(CPU密集),之后让出时间片(yield)
void SpinWaitExample()
{
var sw = new SpinWait();
while (!IsReady())
{
sw.SpinOnce(); // 自适应等待
// SpinOnce 内部策略:
// 前 10 次:Thread.SpinWait (CPU 自旋)
// 11-20 次:Thread.Yield (让出时间片)
// 之后:Thread.Sleep(1) (短暂休眠)
}
}
// 注意:SpinLock 是结构体,不要装箱(boxing 会破坏锁语义)
// 正确做法:使用 readonly 字段或静态字段死锁分析与预防
死锁检测
// 常见死锁模式:锁顺序不一致
class DeadlockExample
{
private readonly object _lockA = new();
private readonly object _lockB = new();
// 线程1:先锁 A 再锁 B
public void Method1()
{
lock (_lockA)
{
Thread.Sleep(100);
lock (_lockB) // ❌ 可能死锁
{
DoWork();
}
}
}
// 线程2:先锁 B 再锁 A
public void Method2()
{
lock (_lockB)
{
Thread.Sleep(100);
lock (_lockA) // ❌ 可能死锁
{
DoWork();
}
}
}
}
// 解决方案1:统一锁顺序
class FixedLockOrder
{
private readonly object _lockA = new();
private readonly object _lockB = new();
public void Method1()
{
lock (_lockA) lock (_lockB) { DoWork(); }
}
public void Method2()
{
lock (_lockA) lock (_lockB) { DoWork(); } // 相同顺序
}
}
// 解决方案2:使用 Monitor.TryEnter 超时
class TimeoutLock
{
private readonly object _lockA = new();
private readonly object _lockB = new();
public bool MethodWithTimeout()
{
if (!Monitor.TryEnter(_lockA, TimeSpan.FromSeconds(1)))
return false;
try
{
if (!Monitor.TryEnter(_lockB, TimeSpan.FromSeconds(1)))
return false;
try
{
DoWork();
return true;
}
finally { Monitor.Exit(_lockB); }
}
finally { Monitor.Exit(_lockA); }
}
}
// 解决方案3:lock leveling(锁层级)
// 给每个锁分配层级,只能从高层级锁获取低层级锁deadlock 调试工具
// dotnet-dump — .NET 死锁分析
// 1. 捕获 dump
dotnet-dump collect -p <pid>
// 2. 分析死锁
dotnet-dump analyze <dump-file>
> clrstack -all // 查看所有线程调用栈
> syncblk // 查看锁持有情况
> dumpheap -stat // 查看堆统计
// dotnet-trace — 运行时追踪
dotnet-trace collect -p <pid> --profile cpu-sampling
// 在 Visual Studio 中:
// Debug > Windows > Threads — 查看线程状态
// 并行监视窗口 — 查看任务和锁状态性能对比与基准测试
同步原语性能基准
// BenchmarkDotNet 性能对比示例
[MemoryDiagnoser]
public class LockBenchmark
{
private int _counter;
private readonly object _lock = new();
private SpinLock _spinLock = new();
private readonly ReaderWriterLockSlim _rwLock = new();
private SemaphoreSlim _semaphore = new(1, 1);
[Benchmark(Baseline = true)]
public void InterlockedIncrement()
{
for (int i = 0; i < 1_000_000; i++)
Interlocked.Increment(ref _counter);
}
[Benchmark]
public void LockIncrement()
{
for (int i = 0; i < 1_000_000; i++)
lock (_lock) { _counter++; }
}
[Benchmark]
public void SpinLockIncrement()
{
for (int i = 0; i < 1_000_000; i++)
{
bool taken = false;
try { _spinLock.Enter(ref taken); _counter++; }
finally { if (taken) _spinLock.Exit(); }
}
}
}
// 典型结果(单线程递增 100 万次):
// Interlocked: ~3ms (最快,无锁)
// SpinLock: ~8ms (极短临界区接近 lock)
// lock/Monitor: ~12ms (通用锁)
// ReaderWriterLockSlim: ~18ms (读操作可并行)
// SemaphoreSlim: ~25ms (异步友好但有额外开销)
// Mutex: ~80ms+ (跨进程,内核切换开销大)锁的选择指南
同步原语对比
| 原语 | 开销 | 跨进程 | 异步支持 | 适用场景 |
|--------------------|------|--------|----------|----------------------|
| Interlocked | 最低 | 否 | 不需要 | 简单计数器、标志 |
| volatile | 最低 | 否 | 不需要 | 标志位、简单状态 |
| lock/Monitor | 低 | 否 | 否 | 通用临界区保护 |
| SpinLock | 低 | 否 | 否 | 极短临界区 |
| ReaderWriterLockSlim | 中 | 否 | 否 | 读多写少 |
| SemaphoreSlim | 中 | 否 | 是 | 异步限流 |
| Mutex | 高 | 是 | 否 | 跨进程互斥 |
| Semaphore | 高 | 是 | 否 | 跨进程限流 |
| AutoResetEvent | 中 | 是 | 否 | 线程通知 |
| CountdownEvent | 中 | 否 | 否 | 等待 N 个操作 |
| Barrier | 中 | 否 | 否 | 多线程阶段同步 |优点
缺点
总结
同步原语分层:Interlocked(CAS 原子操作)最轻量,适合简单计数器和标志。volatile 保证可见性但不保证原子性。lock/Monitor 是通用锁,内置 Wait/Pulse 实现等待通知。ReaderWriterLockSlim 适合读多写少。SemaphoreSlim 是唯一支持异步等待的信号量。AutoResetEvent/ManualResetEvent 用于线程通知。CountdownEvent 用于等待 N 个任务完成。Barrier 用于多线程阶段同步。选择原则:优先轻量级、避免嵌套锁、异步代码用异步锁。
关键知识点
- 先明确这个主题影响的是语法层、运行时层,还是性能与可维护性层。
- 学习时要同时关注语言表面写法和编译器、JIT、GC 等底层行为。
- 真正有价值的是知道“为什么这样写”和“在什么边界下不能这样写”。
项目落地视角
- 把示例改成最小可运行样例,并观察编译输出、运行结果和异常行为。
- 如果它会进入团队代码规范,最好同步补充命名约定、禁用场景和替代方案。
- 涉及性能结论时,优先用 Benchmark 或实际热点链路验证,而不是凭感觉判断。
常见误区
- 只记语法糖,不知道底层成本。
- 把适用于小样例的写法直接搬到高并发或大对象场景里。
- 忽略框架版本、语言版本和运行时差异,导致结论失真。
进阶路线
- 继续向源码、IL、JIT 行为和 BCL 实现层深入。
- 把知识点和代码评审、性能诊断、面试复盘结合起来。
- 把同类主题做横向对比,例如值类型与引用类型、迭代器与 async 状态机、反射与 Source Generator。
适用场景
- 当你准备把《线程同步原语深入》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合在需要理解语言特性、运行时行为或 API 边界时阅读。
- 当代码开始出现性能瓶颈、可维护性问题或语义歧义时,这类主题会直接影响实现质量。
落地建议
- 先写最小可运行样例,再把结论迁移到真实业务代码。
- 同时记录这个特性的收益、限制和替代方案,避免为了“高级”而使用。
- 涉及内存、并发或序列化时,最好配合调试器或基准测试验证。
排错清单
- 先确认问题属于编译期、运行期还是语义误用。
- 检查是否存在隐式转换、装箱拆箱、闭包捕获或上下文切换等隐藏成本。
- 查看异常栈、日志和最小复现代码,优先排除使用姿势问题。
复盘问题
- 如果把《线程同步原语深入》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《线程同步原语深入》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《线程同步原语深入》最大的收益和代价分别是什么?
