并发编程设计模式
大约 14 分钟约 4055 字
并发编程设计模式
简介
并发编程是现代软件开发中的核心挑战之一。多线程环境下,多个执行流共享内存、争抢资源,如果不加以正确的设计和同步,就会产生数据竞争、死锁、活锁等问题。并发编程设计模式提供了经过验证的解决方案,帮助开发者构建正确、高效、可维护的并发系统。
从传统的锁模式到现代的 async/await 模式,从共享内存模型到消息传递模型(Actor),不同的并发模式适用于不同的场景。理解这些模式的原理、适用场景和陷阱,是编写高质量并发代码的关键。
特点
- 线程安全:保证共享资源在多线程环境下的正确访问
- 性能优化:减少锁争用,最大化并行度
- 可组合性:并发原语可以组合使用构建复杂逻辑
- 可测试性:良好的并发模式应便于测试和验证
- 死锁预防:通过设计模式避免常见的死锁场景
实现
Producer-Consumer 模式
生产者-消费者模式是最经典的并发模式,解耦了数据的生产和消费。
// 使用 BlockingCollection 实现
public class ProducerConsumer<T>
{
private readonly BlockingCollection<T> _queue;
private readonly List<Task> _consumerTasks;
private readonly CancellationTokenSource _cts;
private readonly int _consumerCount;
public ProducerConsumer(int boundedCapacity = 100, int consumerCount = 4)
{
_queue = new BlockingCollection<T>(boundedCapacity);
_cts = new CancellationTokenSource();
_consumerCount = consumerCount;
_consumerTasks = new List<Task>();
}
public void Start(Action<T> consumeAction)
{
for (int i = 0; i < _consumerCount; i++)
{
var task = Task.Run(() =>
{
foreach (var item in _queue.GetConsumingEnumerable(_cts.Token))
{
try
{
consumeAction(item);
}
catch (Exception ex)
{
Console.WriteLine($"消费异常: {ex.Message}");
}
}
}, _cts.Token);
_consumerTasks.Add(task);
}
}
public void Produce(T item)
{
_queue.Add(item);
}
public async Task StopAsync()
{
_queue.CompleteAdding();
await Task.WhenAll(_consumerTasks);
_queue.Dispose();
}
}
// 使用示例
var pc = new ProducerConsumer<Order>(boundedCapacity: 1000, consumerCount: 4);
pc.Start(order => ProcessOrder(order));
foreach (var order in incomingOrders)
{
pc.Produce(order);
}
await pc.StopAsync();Reader-Writer Lock 模式
读写锁允许多个读取者同时访问,但写入者需要独占访问。
// 使用 ReaderWriterLockSlim
public class ThreadSafeCache<TKey, TValue>
{
private readonly Dictionary<TKey, TValue> _cache = new();
private readonly ReaderWriterLockSlim _lock = new();
public TValue Get(TKey key)
{
_lock.EnterReadLock();
try
{
return _cache.TryGetValue(key, out var value) ? value : default;
}
finally
{
_lock.ExitReadLock();
}
}
public void Set(TKey key, TValue value)
{
_lock.EnterWriteLock();
try
{
_cache[key] = value;
}
finally
{
_lock.ExitWriteLock();
}
}
// 使用可升级的读锁:先读,必要时升级为写锁
public TValue GetOrAdd(TKey key, Func<TKey, TValue> factory)
{
_lock.EnterUpgradeableReadLock();
try
{
if (_cache.TryGetValue(key, out var value))
{
return value;
}
_lock.EnterWriteLock();
try
{
// Double-check:其他线程可能在我们等待写锁时已写入
if (_cache.TryGetValue(key, out value))
{
return value;
}
value = factory(key);
_cache[key] = value;
return value;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
public void Dispose()
{
_lock.Dispose();
}
}Thread Pool 模式
// 线程池管理器
public class CustomThreadPool
{
private readonly BlockingCollection<Action> _workQueue;
private readonly Thread[] _workers;
private readonly CancellationTokenSource _cts;
private int _activeCount;
public int ActiveCount => _activeCount;
public int PendingCount => _workQueue.Count;
public CustomThreadPool(int threadCount, int queueCapacity = 1000)
{
_workQueue = new BlockingCollection<Action>(queueCapacity);
_cts = new CancellationTokenSource();
_workers = new Thread[threadCount];
for (int i = 0; i < threadCount; i++)
{
_workers[i] = new Thread(WorkerLoop)
{
IsBackground = true,
Name = $"Worker-{i}"
};
_workers[i].Start();
}
}
public bool Enqueue(Action work, TimeSpan timeout = default)
{
if (timeout == default)
{
_workQueue.Add(work);
return true;
}
return _workQueue.TryAdd(work, timeout);
}
public Task<TResult> Enqueue<TResult>(Func<TResult> work)
{
var tcs = new TaskCompletionSource<TResult>();
_workQueue.Add(() =>
{
try
{
var result = work();
tcs.SetResult(result);
}
catch (Exception ex)
{
tcs.SetException(ex);
}
});
return tcs.Task;
}
private void WorkerLoop()
{
foreach (var work in _workQueue.GetConsumingEnumerable(_cts.Token))
{
Interlocked.Increment(ref _activeCount);
try
{
work();
}
catch (Exception ex)
{
Console.WriteLine($"工作线程异常: {ex.Message}");
}
finally
{
Interlocked.Decrement(ref _activeCount);
}
}
}
public void Shutdown(bool waitForCompletion = true)
{
_workQueue.CompleteAdding();
if (waitForCompletion)
{
foreach (var worker in _workers)
{
worker.Join();
}
}
_workQueue.Dispose();
}
}Active Object 模式
Active Object 模式将方法调用与方法执行解耦,每个对象拥有自己的线程和消息队列。
// Active Object 模式
public abstract class ActiveObject : IDisposable
{
private readonly BlockingCollection<Action> _messageQueue = new();
private readonly Task _processingTask;
private readonly CancellationTokenSource _cts = new();
protected ActiveObject()
{
_processingTask = Task.Run(ProcessMessages);
}
// 将调用调度到对象自己的线程
protected Task<T> Enqueue<T>(Func<T> action)
{
var tcs = new TaskCompletionSource<T>();
_messageQueue.Add(() =>
{
try
{
tcs.SetResult(action());
}
catch (Exception ex)
{
tcs.SetException(ex);
}
});
return tcs.Task;
}
protected Task Enqueue(Action action)
{
return Enqueue(() => { action(); return true; });
}
private void ProcessMessages()
{
foreach (var action in _messageQueue.GetConsumingEnumerable(_cts.Token))
{
action();
}
}
public void Dispose()
{
_messageQueue.CompleteAdding();
_processingTask.Wait(TimeSpan.FromSeconds(5));
_cts.Dispose();
_messageQueue.Dispose();
}
}
// 使用示例:线程安全的日志记录器
public class AsyncLogger : ActiveObject
{
private readonly StreamWriter _writer;
public AsyncLogger(string filePath)
{
_writer = new StreamWriter(filePath, true) { AutoFlush = true };
}
public Task LogAsync(string message)
{
return Enqueue(() =>
{
_writer.WriteLine($"[{DateTime.UtcNow:O}] {message}");
});
}
public Task<List<string>> ReadRecentAsync(int count)
{
return Enqueue(() =>
{
// 所有操作在同一线程执行,无需锁
var lines = new List<string>();
// ... 读取逻辑
return lines;
});
}
}Monitor 模式
// Monitor 模式:使用 lock 关键字实现
public class BankAccount
{
private readonly object _lock = new();
private decimal _balance;
public void Deposit(decimal amount)
{
lock (_lock)
{
_balance += amount;
}
}
public bool Withdraw(decimal amount)
{
lock (_lock)
{
if (_balance < amount) return false;
_balance -= amount;
return true;
}
}
public decimal GetBalance()
{
lock (_lock)
{
return _balance;
}
}
// 转账:同时锁两个账户(避免死锁)
public static bool Transfer(BankAccount from, BankAccount to, decimal amount)
{
// 按照账户 ID 排序获取锁,避免死锁
var accounts = new[] { from, to }.OrderBy(a => a.GetHashCode()).ToArray();
lock (accounts[0])
{
lock (accounts[1])
{
if (from._balance < amount) return false;
from._balance -= amount;
to._balance += amount;
return true;
}
}
}
}
// 使用 Monitor 类实现等待/通知
public class BoundedBuffer<T>
{
private readonly Queue<T> _queue = new();
private readonly object _lock = new();
private readonly int _capacity;
public BoundedBuffer(int capacity)
{
_capacity = capacity;
}
public void Put(T item)
{
lock (_lock)
{
while (_queue.Count >= _capacity)
{
Monitor.Wait(_lock); // 缓冲区满,等待
}
_queue.Enqueue(item);
Monitor.PulseAll(_lock); // 通知消费者
}
}
public T Take()
{
lock (_lock)
{
while (_queue.Count == 0)
{
Monitor.Wait(_lock); // 缓冲区空,等待
}
var item = _queue.Dequeue();
Monitor.PulseAll(_lock); // 通知生产者
return item;
}
}
}Double-Checked Locking 模式
// 线程安全的单例模式
public sealed class Singleton
{
private static volatile Singleton _instance;
private static readonly object _lock = new();
public static Singleton Instance
{
get
{
// 第一次检查(无锁)
if (_instance == null)
{
lock (_lock)
{
// 第二次检查(有锁)
if (_instance == null)
{
_instance = new Singleton();
}
}
}
return _instance;
}
}
private Singleton() { }
}
// .NET 推荐方式:使用 Lazy<T>
public sealed class SingletonWithLazy
{
private static readonly Lazy<SingletonWithLazy> _instance =
new(() => new SingletonWithLazy());
public static SingletonWithLazy Instance => _instance.Value;
private SingletonWithLazy() { }
}
// 带初始化参数的单例
public class ConfigurationManager
{
private static readonly Lazy<ConfigurationManager> _instance =
new(() => new ConfigurationManager(), LazyThreadSafetyMode.ExecutionAndPublication);
private readonly Dictionary<string, string> _settings;
public static ConfigurationManager Instance => _instance.Value;
private ConfigurationManager()
{
_settings = LoadSettings();
}
private Dictionary<string, string> LoadSettings()
{
// 加载配置
return new Dictionary<string, string>();
}
public string GetSetting(string key)
{
return _settings.TryGetValue(key, out var value) ? value : null;
}
}Thread-Local Storage 模式
// ThreadLocal<T> 存储线程专属数据
public class ThreadLocalRandom
{
private static readonly ThreadLocal<Random> _random =
new(() => new Random(
Interlocked.Increment(ref _seed)));
private static int _seed = Environment.TickCount;
public static Random Current => _random.Value;
}
// 使用示例:并行计算中使用线程安全的随机数
public class ParallelStatistics
{
public double[] ComputeMonteCarlo(int iterations)
{
var partialResults = new double[Environment.ProcessorCount];
Parallel.For(0, Environment.ProcessorCount, threadId =>
{
var random = ThreadLocalRandom.Current;
double sum = 0;
int perThread = iterations / Environment.ProcessorCount;
for (int i = 0; i < perThread; i++)
{
double x = random.NextDouble();
double y = random.NextDouble();
if (x * x + y * y <= 1.0)
{
sum += 1.0;
}
}
partialResults[threadId] = sum / perThread * 4;
});
return partialResults;
}
}
// AsyncLocal<T>:异步流中的上下文传递
public class AsyncContext
{
private static readonly AsyncLocal<string> _correlationId = new();
private static readonly AsyncLocal<string> _userId = new();
public static string CorrelationId
{
get => _correlationId.Value;
set => _correlationId.Value = value;
}
public static string UserId
{
get => _userId.Value;
set => _userId.Value = value;
}
}
// 在 Web 请求管道中使用
public class ContextMiddleware
{
public async Task InvokeAsync(HttpContext context, RequestDelegate next)
{
AsyncContext.CorrelationId = Guid.NewGuid().ToString();
AsyncContext.UserId = context.User?.Identity?.Name;
await next(context);
}
}Async/Await 模式
// 异步编程最佳实践
public class AsyncService
{
// 并行执行多个异步操作
public async Task<List<Data>> FetchAllDataAsync(List<string> sources)
{
var tasks = sources.Select(source => FetchFromSourceAsync(source));
var results = await Task.WhenAll(tasks);
return results.ToList();
}
// 带超时的异步操作
public async Task<T> WithTimeout<T>(Task<T> task, TimeSpan timeout)
{
using var cts = new CancellationTokenSource(timeout);
try
{
return await task.WaitAsync(cts.Token);
}
catch (OperationCanceledException)
{
throw new TimeoutException($"操作在 {timeout.TotalSeconds} 秒后超时");
}
}
// 带取消的异步操作
public async Task ProcessWithCancellationAsync(
Func<CancellationToken, Task> operation,
CancellationToken externalToken)
{
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(externalToken);
linkedCts.CancelAfter(TimeSpan.FromSeconds(30));
await operation(linkedCts.Token);
}
// 限制并发度的并行执行
public async Task<List<TResult>> ParallelWithLimitAsync<TSource, TResult>(
IEnumerable<TSource> sources,
Func<TSource, Task<TResult>> selector,
int maxConcurrency)
{
var semaphore = new SemaphoreSlim(maxConcurrency);
var tasks = sources.Select(async source =>
{
await semaphore.WaitAsync();
try
{
return await selector(source);
}
finally
{
semaphore.Release();
}
});
return (await Task.WhenAll(tasks)).ToList();
}
private async Task<Data> FetchFromSourceAsync(string source)
{
// 模拟异步操作
await Task.Delay(100);
return new Data { Source = source };
}
}
// 异步锁
public class AsyncLock
{
private readonly SemaphoreSlim _semaphore = new(1, 1);
public async Task<IDisposable> LockAsync()
{
await _semaphore.WaitAsync();
return new Releaser(_semaphore);
}
private class Releaser : IDisposable
{
private readonly SemaphoreSlim _semaphore;
public Releaser(SemaphoreSlim semaphore)
{
_semaphore = semaphore;
}
public void Dispose()
{
_semaphore.Release();
}
}
}
// 使用异步锁
public class AsyncConcurrentDictionary<TKey, TValue>
{
private readonly Dictionary<TKey, TValue> _dict = new();
private readonly AsyncLock _lock = new();
public async Task<TValue> GetOrAddAsync(TKey key, Func<TKey, Task<TValue>> factory)
{
using (await _lock.LockAsync())
{
if (_dict.TryGetValue(key, out var value))
{
return value;
}
value = await factory(key);
_dict[key] = value;
return value;
}
}
}Actor 模式
// 简单的 Actor 实现
public abstract class Actor
{
private readonly Channel<object> _mailbox;
private readonly CancellationTokenSource _cts;
protected Actor(int mailboxCapacity = 1000)
{
_mailbox = Channel.CreateBounded<object>(new BoundedChannelOptions(mailboxCapacity)
{
FullMode = BoundedChannelFullMode.DropOldest,
SingleReader = true,
SingleWriter = false
});
_cts = new CancellationTokenSource();
// 启动消息处理循环
_ = ProcessMessagesAsync();
}
protected void Send(object message)
{
_mailbox.Writer.TryWrite(message);
}
private async Task ProcessMessagesAsync()
{
await foreach (var message in _mailbox.Reader.ReadAllAsync(_cts.Token))
{
try
{
await HandleMessageAsync(message);
}
catch (Exception ex)
{
OnError(ex);
}
}
}
protected abstract Task HandleMessageAsync(object message);
protected virtual void OnError(Exception ex)
{
Console.WriteLine($"Actor 错误: {ex.Message}");
}
public void Stop()
{
_mailbox.Writer.Complete();
_cts.Cancel();
}
}
// 具体 Actor:订单处理器
public class OrderActor : Actor
{
private readonly List<Order> _pendingOrders = new();
public record CreateOrder(Order Order);
public record ProcessOrder(int OrderId);
public record GetOrderStats(ReplyChannel<OrderStats> Reply);
public void CreateNewOrder(Order order) => Send(new CreateOrder(order));
public void ProcessExistingOrder(int orderId) => Send(new ProcessOrder(orderId));
public Task<OrderStats> GetStatsAsync()
{
var reply = new ReplyChannel<OrderStats>();
Send(new GetOrderStats(reply));
return reply.GetResultAsync();
}
protected override Task HandleMessageAsync(object message)
{
switch (message)
{
case CreateOrder cmd:
_pendingOrders.Add(cmd.Order);
break;
case ProcessOrder cmd:
var order = _pendingOrders.FirstOrDefault(o => o.Id == cmd.OrderId);
order?.MarkAsProcessed();
break;
case GetOrderStats cmd:
cmd.Reply.SetResult(new OrderStats
{
Total = _pendingOrders.Count,
Pending = _pendingOrders.Count(o => !o.IsProcessed)
});
break;
}
return Task.CompletedTask;
}
}
// 回复通道
public class ReplyChannel<T>
{
private readonly TaskCompletionSource<T> _tcs = new();
public void SetResult(T result) => _tcs.SetResult(result);
public Task<T> GetResultAsync() => _tcs.Task;
}Channel 模式
// 使用 System.Threading.Channels
public class Pipeline<TInput, TOutput>
{
private readonly Channel<TInput> _inputChannel;
private readonly Channel<TOutput> _outputChannel;
private readonly List<Func<TInput, TOutput>> _stages;
private readonly int _workerCount;
public Pipeline(int workerCount = 4, int capacity = 1000)
{
_workerCount = workerCount;
_stages = new List<Func<TInput, TOutput>>();
_inputChannel = Channel.CreateBounded<TInput>(capacity);
_outputChannel = Channel.CreateBounded<TOutput>(capacity);
}
public Pipeline<TInput, TOutput> AddStage(Func<TInput, TOutput> stage)
{
_stages.Add(stage);
return this;
}
public async Task StartAsync(CancellationToken ct = default)
{
var workers = Enumerable.Range(0, _workerCount)
.Select(_ => Task.Run(async () =>
{
await foreach (var input in _inputChannel.Reader.ReadAllAsync(ct))
{
var current = input;
foreach (var stage in _stages)
{
current = stage(current);
}
await _outputChannel.Writer.WriteAsync(current, ct);
}
}, ct))
.ToArray();
_ = Task.Run(async () =>
{
await Task.WhenAll(workers);
_outputChannel.Writer.Complete();
}, ct);
}
public ChannelReader<TInput> Input => _inputChannel.Reader;
public ChannelWriter<TInput> InputWriter => _inputChannel.Writer;
public ChannelReader<TOutput> Output => _outputChannel.Reader;
}
// 多阶段数据处理管道
public class DataProcessingPipeline
{
public async Task ProcessAsync(CancellationToken ct = default)
{
// 阶段 1: 读取
var readChannel = Channel.CreateBounded<RawData>(1000);
// 阶段 2: 解析
var parseChannel = Channel.CreateBounded<ParsedData>(1000);
// 阶段 3: 转换
var transformChannel = Channel.CreateBounded<TransformedData>(1000);
// 阶段 4: 存储
var storeChannel = Channel.CreateBounded<StoredData>(1000);
// 启动各阶段消费者
var tasks = new List<Task>();
// 解析阶段
tasks.Add(Task.Run(async () =>
{
await foreach (var raw in readChannel.Reader.ReadAllAsync(ct))
{
var parsed = Parse(raw);
await parseChannel.Writer.WriteAsync(parsed, ct);
}
parseChannel.Writer.Complete();
}, ct));
// 转换阶段
tasks.Add(Task.Run(async () =>
{
await foreach (var parsed in parseChannel.Reader.ReadAllAsync(ct))
{
var transformed = Transform(parsed);
await transformChannel.Writer.WriteAsync(transformed, ct);
}
transformChannel.Writer.Complete();
}, ct));
// 存储阶段
tasks.Add(Task.Run(async () =>
{
await foreach (var data in transformChannel.Reader.ReadAllAsync(ct))
{
await StoreAsync(data);
}
}, ct));
// 生产数据
foreach (var rawData in ReadAllSources())
{
await readChannel.Writer.WriteAsync(rawData, ct);
}
readChannel.Writer.Complete();
await Task.WhenAll(tasks);
}
private ParsedData Parse(RawData raw) => throw new NotImplementedException();
private TransformedData Transform(ParsedData parsed) => throw new NotImplementedException();
private Task StoreAsync(TransformedData data) => throw new NotImplementedException();
private IEnumerable<RawData> ReadAllSources() => throw new NotImplementedException();
}并发集合选择
// 并发集合对比和选择指南
public class ConcurrentCollectionsDemo
{
// ConcurrentDictionary:高并发读写
private readonly ConcurrentDictionary<string, object> _cache = new();
public void AddToCache(string key, Func<object> factory)
{
// 线程安全的 GetOrAdd
_cache.GetOrAdd(key, _ => factory());
}
public bool UpdateCache(string key, Func<string, object, object> updateFactory)
{
// 线程安全的 AddOrUpdate
_cache.AddOrUpdate(key,
_ => new object(),
(k, old) => updateFactory(k, old));
return true;
}
// ConcurrentQueue:FIFO 生产者-消费者
private readonly ConcurrentQueue<Task> _taskQueue = new();
public void EnqueueTask(Task task) => _taskQueue.Enqueue(task);
public bool TryDequeue(out Task task) => _taskQueue.TryDequeue(out task);
// ConcurrentBag:无序并发集合(适合工作窃取)
private readonly ConcurrentBag<double> _results = new();
public void AddResult(double result) => _results.Add(result);
// ConcurrentStack:LIFO 并发集合
private readonly ConcurrentStack<Buffer> _bufferPool = new();
public Buffer RentBuffer()
{
if (_bufferPool.TryPop(out var buffer)) return buffer;
return new Buffer(4096);
}
public void ReturnBuffer(Buffer buffer) => _bufferPool.Push(buffer);
}
public record RawData;
public record ParsedData;
public record TransformedData;
public record StoredData;
public record OrderStats { public int Total { get; init; } public int Pending { get; init; } }
public class Buffer { public Buffer(int size) { } }死锁预防
// 死锁预防策略
public class DeadlockPrevention
{
// 策略 1:锁排序 —— 按固定顺序获取锁
private readonly object _lockA = new();
private readonly object _lockB = new();
public void SafeOperation()
{
// 始终先锁 A 再锁 B
lock (_lockA)
{
lock (_lockB)
{
// 安全操作
}
}
}
// 策略 2:使用 Monitor.TryEnter 避免无限等待
public bool TrySafeOperation(TimeSpan timeout)
{
if (!Monitor.TryEnter(_lockA, timeout)) return false;
try
{
if (!Monitor.TryEnter(_lockB, timeout))
{
Monitor.Exit(_lockA);
return false;
}
try
{
// 安全操作
return true;
}
finally
{
Monitor.Exit(_lockB);
}
}
finally
{
Monitor.Exit(_lockA);
}
}
// 策略 3:锁超时检测器
public static async Task WithDeadlockDetection(
Action action,
TimeSpan warningThreshold,
string lockName)
{
var task = Task.Run(action);
var delay = Task.Delay(warningThreshold);
if (delay == await Task.WhenAny(task, delay))
{
Console.WriteLine($"警告: 锁 {lockName} 持有时间超过 {warningThreshold.TotalSeconds}s");
await task; // 继续等待
}
}
}
// 死锁检测工具
public class LockTracker
{
private readonly ConcurrentDictionary<int, LockInfo> _activeLocks = new();
private readonly ThreadLocal<Stack<int>> _heldLocks = new(() => new Stack<int>());
public void Acquire(object lockObj)
{
var lockId = lockObj.GetHashCode();
var threadId = Thread.CurrentThread.ManagedThreadId;
_heldLocks.Value.Push(lockId);
_activeLocks[lockId] = new LockInfo
{
LockId = lockId,
ThreadId = threadId,
AcquiredAt = DateTime.UtcNow,
HeldLocks = _heldLocks.Value.ToList()
};
// 检查是否存在循环等待
DetectPotentialDeadlock(lockId, threadId);
}
public void Release(object lockObj)
{
var lockId = lockObj.GetHashCode();
_activeLocks.TryRemove(lockId, out _);
_heldLocks.Value.Pop();
}
private void DetectPotentialDeadlock(int lockId, int threadId)
{
// 简化:检查是否有其他线程持有当前线程需要的锁
foreach (var kvp in _activeLocks)
{
if (kvp.Value.ThreadId != threadId &&
kvp.Value.HeldLocks.Contains(lockId))
{
Console.WriteLine(
$"潜在死锁检测: 线程 {threadId} 等待锁 {lockId}," +
$"但线程 {kvp.Value.ThreadId} 持有该锁");
}
}
}
}
public record LockInfo
{
public int LockId { get; init; }
public int ThreadId { get; init; }
public DateTime AcquiredAt { get; init; }
public List<int> HeldLocks { get; init; }
}优点
- 正确性保证:经过验证的模式减少并发 Bug
- 性能提升:合理利用多核处理器能力
- 可维护性:模式化的代码更容易理解和维护
- 可测试性:良好的抽象支持单元测试
缺点
- 复杂性增加:并发代码比顺序代码更难理解和调试
- 难以测试:并发 Bug 往往难以复现
- 性能开销:同步原语本身有性能开销
- 过度并发:不合理的并发反而降低性能
性能注意事项
- 锁粒度:锁的范围尽可能小,但保证正确性
- 避免 false sharing:多线程频繁修改相邻内存位置
- 线程池饱和:避免在 ThreadPool 上执行长时间阻塞操作
- async/await 开销:状态机分配和上下文切换有成本
- Channel vs BlockingCollection:Channel 基于 async,性能更好
- 锁争用:高争用场景考虑无锁数据结构或分区
总结
并发编程设计模式提供了从底层同步原语到高层抽象的完整工具箱。Monitor/lock 适合简单的互斥场景,Reader-Writer Lock 适合读多写少场景,Channel 适合数据流管道,Actor 模式适合高并发消息处理。async/await 是 .NET 异步编程的基础,理解其底层原理对于编写正确的并发代码至关重要。
关键知识点
- 线程安全级别:不可变 > ThreadLocal > 同步访问 > 无保护
- 锁排序:多个锁始终按同一顺序获取
- async/wait 不等于多线程:async 可能完全不创建新线程
- CancellationToken:异步操作应始终支持取消
- ConfigureAwait(false):库代码中避免捕获同步上下文
- Channel 是推荐的生产者-消费者实现:优于 BlockingCollection
常见误区
- lock(this) 或 lock(typeof(MyClass)):应使用私有只读对象
- async void:仅用于事件处理器,其他场景使用 async Task
- .Result 或 .Wait():可能导致死锁,应使用 await
- 忽略 CancellationToken:长时间操作无法取消
- 过度使用 lock:单线程场景不需要锁
- 混用异步和同步:不要在异步代码中调用同步阻塞操作
进阶路线
- 无锁编程:Interlocked、CAS 操作、Memory Barrier
- 数据并行:Parallel.For/ForEach、PLINQ
- 响应式编程:System.Reactive (Rx)
- Orleans Actor:分布式 Actor 框架
- 并行算法:并行排序、并行归约、MapReduce
- GPU 计算:CUDA、OpenCL、ILGPU
适用场景
- 服务器应用:处理大量并发请求
- 桌面应用:后台任务不阻塞 UI 线程
- 数据处理:并行处理大数据集
- 实时系统:生产者-消费者管道
- 游戏开发:Actor 模型管理游戏实体
- 网络编程:异步 I/O 操作
落地建议
- 优先使用 async/await:现代 .NET 的首选并发模型
- 优先使用 Channel:生产者-消费者场景的首选
- 避免自己实现锁:使用框架提供的并发原语
- 压力测试:并发代码必须经过压力测试
- 静态分析:使用 Roslyn Analyzers 检测并发问题
- 日志关联:使用 AsyncLocal 传递 CorrelationId
排错清单
复盘问题
- 你的应用中有哪些共享状态需要同步保护?
- 有没有可能产生死锁的场景?如何预防?
- 异步代码是否一致地使用了 async/await?有没有 sync-over-async?
- 生产环境中的并发瓶颈在哪里?
- 并发代码的测试覆盖率如何?有没有压力测试?
- 有没有考虑过使用 Actor 模型简化并发逻辑?
