Task 并行库与 Dataflow
大约 11 分钟约 3300 字
Task 并行库与 Dataflow
简介
TPL(Task Parallel Library)是 .NET 的并行编程框架,提供 Task、Parallel、Dataflow 等组件。掌握并行编程可以充分利用多核 CPU,加速数据处理、图像处理、批量计算等 CPU 密集型任务。
并行编程的核心挑战不是"如何并行",而是"何时并行"。错误的并行化可能导致性能下降、死锁、竞态条件等问题。理解线程池、任务调度、同步原语的工作原理,是写出正确高效的并行代码的前提。
特点
并行编程决策树
// ==========================================
// 何时使用哪种并行方案?
// ==========================================
// CPU 密集型(计算、图像处理、加密)
// 数据量小 → Parallel.For / Parallel.ForEach
// 数据量大 → Parallel.For + Partitioner
// 需要返回值 → Parallel LINQ (PLINQ)
// I/O 密集型(网络请求、文件读写、数据库)
// 少量并发 → async/await + Task.WhenAll
// 大量并发 → SemaphoreSlim 限流 + Task.WhenAll
// 管道处理 → Channel<T> 或 TPL Dataflow
// 混合型(部分计算 + 部分 I/O)
// Dataflow 管道
// Parallel + async/await 组合
// 注意:不要在 I/O 密集型场景使用 Parallel.For
// Parallel.For 使用线程池线程,会阻塞等待 I/O
// I/O 密集型应该用 async/await 释放线程Parallel 类
并行循环
/// <summary>
/// Parallel.For / Parallel.ForEach
/// </summary>
// 并行 For
Parallel.For(0, 1000, i =>
{
ProcessData(i);
});
// 并行 ForEach(带选项)
var options = new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
CancellationToken = cts.Token
};
Parallel.ForEach(items, options, item =>
{
Transform(item);
});
// 带分区器(提升小任务性能)
Parallel.ForEach(Partitioner.Create(0, 1000000), range =>
{
for (int i = range.Item1; i < range.Item2; i++)
{
Compute(i);
}
});
// 并行 Invoke
Parallel.Invoke(
() => LoadUsers(),
() => LoadOrders(),
() => LoadProducts()
);Parallel 的高级用法
// ==========================================
// ParallelLoopResult — 检查并行循环状态
// ==========================================
var result = Parallel.For(0, 1000, (i, state) =>
{
if (i == 500)
{
state.Break(); // 停止当前分区,其他分区继续
// state.Stop(); // 停止所有分区
}
ProcessData(i);
});
Console.WriteLine($"是否完成: {result.IsCompleted}");
Console.WriteLine($"最低中断索引: {result.LowestBreakIteration}");
// ==========================================
// 带本地状态的并行循环
// ==========================================
// 每个线程有自己的本地变量,最后合并
var total = 0;
Parallel.ForEach(items,
// 本地初始化
() => 0,
// 循环体(threadLocal 是当前线程的本地值)
(item, state, threadLocal, subtotal) =>
{
return subtotal + item.Price;
},
// 合并各线程的本地结果
(subtotal) =>
{
Interlocked.Add(ref total, subtotal);
}
);
Console.WriteLine($"总价: {total}");
// ==========================================
// Parallel.For 的性能考量
// ==========================================
// Parallel.For 有调度开销(约 50-100 微秒/调用)
// 如果循环体执行时间 < 调度开销,并行反而更慢
// 反面示例 — 太小的任务不适合并行
Parallel.For(0, 100, i =>
{
array[i] = i * 2; // 每次只做一次乘法,调度开销远大于计算
});
// 正面示例 — 足够大的任务才值得并行
Parallel.For(0, 1000000, i =>
{
array[i] = ExpensiveComputation(i); // 每次计算耗时较长
});
// 经验法则:循环体执行时间 > 1ms 才考虑并行Task 并行模式
Task.Run 与 Task.WhenAll
// ==========================================
// Task.Run — 将 CPU 密集型工作放入线程池
// ==========================================
var result = await Task.Run(() =>
{
// CPU 密集型计算在线程池线程上执行
return HeavyComputation();
});
// ==========================================
// Task.WhenAll — 等待多个任务完成
// ==========================================
var tasks = urls.Select(async url =>
{
using var client = new HttpClient();
return await client.GetStringAsync(url);
});
var results = await Task.WhenAll(tasks);
// ==========================================
// Task.WhenAny — 等待任意一个完成
// ==========================================
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var downloadTask = DownloadFileAsync(url, cts.Token);
var timeoutTask = Task.Delay(5000);
var completedTask = await Task.WhenAny(downloadTask, timeoutTask);
if (completedTask == timeoutTask)
{
Console.WriteLine("下载超时");
cts.Cancel();
}
else
{
var content = await downloadTask;
Console.WriteLine($"下载完成: {content.Length} 字节");
}
// ==========================================
// 并行执行限流
// ==========================================
public static async Task<TResult[]> ParallelForEachAsync<TSource, TResult>(
IEnumerable<TSource> source,
Func<TSource, CancellationToken, Task<TResult>> func,
int maxConcurrency)
{
using var semaphore = new SemaphoreSlim(maxConcurrency);
var tasks = source.Select(async item =>
{
await semaphore.WaitAsync();
try
{
return await func(item, CancellationToken.None);
}
finally
{
semaphore.Release();
}
});
return await Task.WhenAll(tasks);
}
// 使用
var results = await ParallelForEachAsync(
orders,
async (order, ct) => await ProcessOrderAsync(order),
maxConcurrency: 10);Dataflow 数据流
生产者-消费者管道
/// <summary>
/// TPL Dataflow — 管道式并行处理
/// </summary>
using System.Threading.Tasks.Dataflow;
// 构建 ETL 管道
var downloadBlock = new TransformManyBlock<string, string>(async url =>
{
using var client = new HttpClient();
var html = await client.GetStringAsync(url);
return ExtractLinks(html);
});
var parseBlock = new TransformBlock<string, Article>(link =>
{
return ParseArticle(link);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
BoundedCapacity = 100
});
var saveBlock = new ActionBlock<Article>(async article =>
{
await SaveToDatabase(article);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 2
});
// 连接管道
downloadBlock.LinkTo(parseBlock, new DataflowLinkOptions { PropagateCompletion = true });
parseBlock.LinkTo(saveBlock, new DataflowLinkOptions { PropagateCompletion = true });
// 投入数据
await downloadBlock.SendAsync("https://example.com");
downloadBlock.Complete();
await saveBlock.Completion; // 等待整个管道完成批处理块
/// <summary>
/// BatchBlock — 将数据分批处理
/// </summary>
var batchBlock = new BatchBlock<int>(batchSize: 100);
var processBlock = new ActionBlock<int[]>(batch =>
{
BulkInsert(batch); // 批量插入数据库
});
batchBlock.LinkTo(processBlock, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var item in largeDataset)
{
batchBlock.Post(item);
}
batchBlock.Complete();
await processBlock.Completion;Dataflow 高级模式
// ==========================================
// 条件路由 — BroadcastBlock + 条件链接
// ==========================================
var broadcast = new BroadcastBlock<Order>(order => order);
// 普通订单走正常流程
var normalProcessor = new ActionBlock<Order>(ProcessNormalOrder);
broadcast.LinkTo(normalProcessor, order => order.Type == OrderType.Normal);
// 紧急订单走加急流程
var urgentProcessor = new ActionBlock<Order>(ProcessUrgentOrder);
broadcast.LinkTo(urgentProcessor, order => order.Type == OrderType.Urgent);
// 所有订单写入日志
var logger = new ActionBlock<Order>(LogOrder);
broadcast.LinkTo(logger); // 无条件的链接
// ==========================================
// JoinBlock — 等待多个输入
// ==========================================
var joinBlock = new JoinBlock<Order, Payment>(new GroupingDataflowBlockOptions
{
Greedy = false // 非贪婪模式,等待两边都有数据
});
joinBlock.LinkTo(new ActionBlock<Tuple<Order, Payment>>(tuple =>
{
var (order, payment) = tuple;
CompleteOrder(order, payment);
}));
// 订单和支付都到齐后才处理
orderSource.LinkTo(joinBlock.Target1);
paymentSource.LinkTo(joinBlock.Target2);
// ==========================================
// BatchedJoinBlock — 批量收集成功和失败
// ==========================================
var batchedJoin = new BatchedJoinBlock<Result, Error>(batchSize: 100);
batchedJoin.LinkTo(new ActionBlock<Tuple<IList<Result>, IList<Error>>>(batch =>
{
var (successes, failures) = batch;
Console.WriteLine($"成功: {successes.Count}, 失败: {failures.Count}");
// 批量写入成功结果
// 批量记录失败日志
}));线程安全集合
并发集合
/// <summary>
/// System.Collections.Concurrent
/// </summary>
// ConcurrentDictionary — 线程安全字典
var cache = new ConcurrentDictionary<string, User>();
cache.TryAdd("user:1", new User { Name = "张三" });
cache.AddOrUpdate("user:1",
key => new User { Name = "新用户" },
(key, old) => old with { Name = "更新用户" });
// ConcurrentQueue — 线程安全队列
var taskQueue = new ConcurrentQueue<string>();
taskQueue.Enqueue("任务1");
if (taskQueue.TryDequeue(out var task))
Process(task);
// ConcurrentBag — 线程安全无序集合
var results = new ConcurrentBag<int>();
Parallel.For(0, 1000, i =>
{
results.Add(Compute(i));
});
// BlockingCollection — 阻塞集合(生产者-消费者)
var collection = new BlockingCollection<string>(boundedCapacity: 10);
// 生产者
Task.Run(() =>
{
for (int i = 0; i < 100; i++)
collection.Add($"item_{i}");
collection.CompleteAdding();
});
// 消费者
Task.Run(() =>
{
foreach (var item in collection.GetConsumingEnumerable())
Process(item);
});ConcurrentDictionary 高级用法
// ==========================================
// GetOrAdd 的陷阱
// ==========================================
// 工厂方法可能被多次调用!
var count = cache.GetOrAdd("key", k => ExpensiveCompute(k));
// 如果两个线程同时调用 GetOrAdd,ExpensiveCompute 可能被并行执行两次
// 安全方式 — 使用 Lazy<T>
cache.GetOrAdd("key", k => new Lazy<string>(() => ExpensiveCompute(k))).Value;
// ==========================================
// 并发安全的缓存实现
// ==========================================
public class ConcurrentCache<TKey, TValue> where TKey : notnull
{
private readonly ConcurrentDictionary<TKey, Lazy<Task<TValue>>> _cache = new();
private readonly Func<TKey, Task<TValue>> _factory;
public ConcurrentCache(Func<TKey, Task<TValue>> factory) => _factory = factory;
public Task<TValue> GetOrAdd(TKey key)
{
return _cache.GetOrAdd(key, k => new Lazy<Task<TValue>>(() => _factory(k))).Value;
}
public bool TryRemove(TKey key) => _cache.TryRemove(key, out _);
public int Count => _cache.Count;
}
// ==========================================
// AddOrUpdate 的正确用法
// ==========================================
// AddOrUpdate 的 updateFactory 参数也可能被多次调用
// 安全的方式:使用委托参数
// 不安全
dict.AddOrUpdate("key", 1, (k, old) => old + 1);
// 安全(如果值类型不可变,这个写法是安全的)
dict.AddOrUpdate("key",
addValueFactory: _ => ComputeNewValue(),
updateValueFactory: (_, old) => ComputeNewValue(old));锁与同步
并发控制
/// <summary>
/// 线程同步机制
/// </summary>
// SemaphoreSlim — 异步锁
var semaphore = new SemaphoreSlim(1, 1);
public async Task SafeUpdateAsync()
{
await semaphore.WaitAsync();
try
{
await UpdateSharedState();
}
finally
{
semaphore.Release();
}
}
// ReaderWriterLockSlim — 读写锁
var rwLock = new ReaderWriterLockSlim();
public T Read()
{
rwLock.EnterReadLock();
try { return _data; }
finally { rwLock.ExitReadLock(); }
}
public void Write(T value)
{
rwLock.EnterWriteLock();
try { _data = value; }
finally { rwLock.ExitWriteLock(); }
}
// Interlocked — 原子操作
private int _counter = 0;
Interlocked.Increment(ref _counter);
Interlocked.Add(ref _total, amount);
var current = Interlocked.CompareExchange(ref _state, newState, expectedState);
// ==========================================
// 无锁编程 — 简单场景
// ==========================================
// 使用 volatile 关键字
private volatile bool _shouldStop;
public void Stop() => _shouldStop = true;
// 使用 Interlocked 做简单的计数
private int _activeCount;
public void Increment() => Interlocked.Increment(ref _activeCount);
public void Decrement() => Interlocked.Decrement(ref _activeCount);
public int ActiveCount => Interlocked.CompareExchange(ref _activeCount, 0, 0);死锁预防
// ==========================================
// 死锁的常见原因和预防
// ==========================================
// 原因 1: 锁的顺序不一致
// 线程A: lock(A) → lock(B)
// 线程B: lock(B) → lock(A) → 死锁!
// 预防: 始终按固定顺序获取锁
public class DeadlockSafe
{
private readonly object _lockA = new();
private readonly object _lockB = new();
// 始终先 A 后 B
public void Method1()
{
lock (_lockA)
{
lock (_lockB) { /* ... */ }
}
}
// 始终先 A 后 B(即使只需要 B 也要注意顺序)
public void Method2()
{
lock (_lockA)
{
lock (_lockB) { /* ... */ }
}
}
}
// 原因 2: 异步代码中使用 lock
// lock 不可用于异步代码,但 SemaphoreSlim 也有死锁风险
// 如果 SemaphoreSlim 的等待和释放不在同一个方法中,可能死锁
// 预防: 使用 using 模式确保释放
// using (await _lock.LockAsync()) { ... }
// 原因 3: 嵌套过深
// 预防: 尽量减少锁的粒度和嵌套层级
// ==========================================
// 超时获取锁 — 防止无限等待
// ==========================================
var rwLock = new ReaderWriterLockSlim();
try
{
rwLock.EnterWriteLock(TimeSpan.FromSeconds(5));
// 获取写锁成功
}
catch (LockRecursionException)
{
Console.WriteLine("锁递归");
}
catch (LockNotRecursionException)
{
Console.WriteLine("不支持递归");
}优点
缺点
总结
并行编程核心:CPU 密集用 Parallel.For/ForEach,I/O 密集用 async/await + Task.WhenAll,流式处理用 Dataflow 管道。共享状态用 ConcurrentDictionary/Queue,限流用 SemaphoreSlim,批量处理用 BatchBlock。关键是避免过度并行和共享状态竞争。
核心原则:
- 先测量再并行 — 确认瓶颈是 CPU 后再优化
- 任务粒度适中 — 每个任务至少 1ms 执行时间
- 避免共享状态 — 优先使用不可变数据或线程安全集合
- 统一取消机制 — 全链路传播 CancellationToken
- 异常必须处理 — Task.WhenAll 的异常是 AggregateException
关键知识点
- 先明确这个主题影响的是语法层、运行时层,还是性能与可维护性层。
- 学习时要同时关注语言表面写法和编译器、JIT、GC 等底层行为。
- 真正有价值的是知道"为什么这样写"和"在什么边界下不能这样写"。
项目落地视角
- 把示例改成最小可运行样例,并观察编译输出、运行结果和异常行为。
- 如果它会进入团队代码规范,最好同步补充命名约定、禁用场景和替代方案。
- 涉及性能结论时,优先用 Benchmark 或实际热点链路验证,而不是凭感觉判断。
常见误区
- 只记语法糖,不知道底层成本。
- 把适用于小样例的写法直接搬到高并发或大对象场景里。
- 忽略框架版本、语言版本和运行时差异,导致结论失真。
- 在 I/O 密集型场景使用 Parallel.For(应该用 async/await)。
- 过度并行导致线程池饥饿和上下文切换风暴。
- 锁的顺序不一致导致死锁。
- 忘记处理 Task.WhenAll 中的 AggregateException。
- 并行循环中访问非线程安全的集合。
进阶路线
- 继续向源码、IL、JIT 行为和 BCL 实现层深入。
- 把知识点和代码评审、性能诊断、面试复盘结合起来。
- 把同类主题做横向对比,例如值类型与引用类型、迭代器与 async 状态机、反射与 Source Generator。
- 学习
Channel<T>作为 Dataflow 的现代替代方案。 - 了解 .NET 线程池的工作原理和配置。
适用场景
- 当你准备把《Task 并行库与 Dataflow》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合在需要理解语言特性、运行时行为或 API 边界时阅读。
- 当代码开始出现性能瓶颈、可维护性问题或语义歧义时,这类主题会直接影响实现质量。
落地建议
- 先写最小可运行样例,再把结论迁移到真实业务代码。
- 同时记录这个特性的收益、限制和替代方案,避免为了"高级"而使用。
- 涉及内存、并发或序列化时,最好配合调试器或基准测试验证。
排错清单
- 先确认问题属于编译期、运行期还是语义误用。
- 检查是否存在隐式转换、装箱拆箱、闭包捕获或上下文切换等隐藏成本。
- 查看异常栈、日志和最小复现代码,优先排除使用姿势问题。
- 使用 Concurrency Visualizer 分析线程竞争和阻塞。
复盘问题
- 如果把《Task 并行库与 Dataflow》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《Task 并行库与 Dataflow》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《Task 并行库与 Dataflow》最大的收益和代价分别是什么?
