异步流与 IAsyncEnumerable 深入
大约 17 分钟约 5235 字
异步流与 IAsyncEnumerable 深入
简介
IAsyncEnumerable<T> 是 C# 8.0 引入的异步流接口,它结合了 IEnumerable<T> 的迭代器模式和 async/await 的异步编程模型,使得按需异步产生和消费数据序列成为可能。本文将深入探讨异步流的原理、实践模式、与 Channel 的配合使用、背压控制以及在实时数据管道中的应用。
特点
IAsyncEnumerable 基础
异步迭代器方法
using System.Runtime.CompilerServices;
/// <summary>
/// IAsyncEnumerable 基本用法
/// </summary>
public class AsyncEnumerableBasics
{
/// <summary>
/// 最简单的异步迭代器:使用 yield return
/// </summary>
public async IAsyncEnumerable<int> GenerateNumbersAsync(
int count,
[EnumeratorCancellation] CancellationToken ct = default)
{
for (int i = 0; i < count; i++)
{
ct.ThrowIfCancellationRequested();
await Task.Delay(100, ct); // 模拟异步操作
yield return i;
}
}
/// <summary>
/// 从数据库分页读取的典型模式
/// </summary>
public async IAsyncEnumerable<User> ReadUsersPagedAsync(
int pageSize,
[EnumeratorCancellation] CancellationToken ct = default)
{
int page = 0;
bool hasMore = true;
while (hasMore)
{
ct.ThrowIfCancellationRequested();
var users = await FetchPageAsync(page, pageSize, ct);
if (users.Count == 0)
{
hasMore = false;
continue;
}
foreach (var user in users)
{
ct.ThrowIfCancellationRequested();
yield return user;
}
page++;
hasMore = users.Count == pageSize; // 不足一页说明是最后一页
}
}
/// <summary>
/// 使用 await foreach 消费异步流
/// </summary>
public async Task ConsumeAsync(CancellationToken ct)
{
// 基本消费方式
await foreach (int number in GenerateNumbersAsync(10, ct))
{
Console.WriteLine($"收到: {number}");
}
// 使用 ConfigureAwait 控制上下文
await foreach (int number in GenerateNumbersAsync(10, ct)
.ConfigureAwait(false))
{
// 在非原始同步上下文上执行
Console.WriteLine($"处理: {number}");
}
// 使用 WithCancellation 传入取消令牌
await foreach (var user in ReadUsersPagedAsync(100)
.WithCancellation(ct)
.ConfigureAwait(false))
{
ProcessUser(user);
}
}
private async Task<List<User>> FetchPageAsync(int page, int size, CancellationToken ct)
{
await Task.Delay(50, ct);
return new List<User>();
}
private void ProcessUser(User user) { }
}
public class User
{
public int Id { get; set; }
public string Name { get; set; } = "";
}手动实现 IAsyncEnumerator
using System.Runtime.CompilerServices;
/// <summary>
/// 手动实现 IAsyncEnumerable 和 IAsyncEnumerator
/// 适用于需要精细控制迭代的场景
/// </summary>
public class AsyncRange : IAsyncEnumerable<int>
{
private readonly int _start;
private readonly int _count;
private readonly int _delayMs;
public AsyncRange(int start, int count, int delayMs = 100)
{
_start = start;
_count = count;
_delayMs = delayMs;
}
public IAsyncEnumerator<int> GetAsyncEnumerator(CancellationToken ct = default)
{
return new AsyncRangeEnumerator(_start, _count, _delayMs, ct);
}
private class AsyncRangeEnumerator : IAsyncEnumerator<int>
{
private readonly int _start;
private readonly int _count;
private readonly int _delayMs;
private readonly CancellationToken _ct;
private int _current;
private int _index = -1;
public AsyncRangeEnumerator(int start, int count, int delayMs, CancellationToken ct)
{
_start = start;
_count = count;
_delayMs = delayMs;
_ct = ct;
_current = start;
}
public int Current => _current;
public async ValueTask<bool> MoveNextAsync()
{
_ct.ThrowIfCancellationRequested();
_index++;
if (_index >= _count)
return false;
if (_delayMs > 0)
await Task.Delay(_delayMs, _ct);
_current = _start + _index;
return true;
}
public ValueTask DisposeAsync()
{
// 清理资源
return ValueTask.CompletedTask;
}
}
}Async LINQ (System.Linq.Async)
常用异步 LINQ 操作符
using System.Linq;
/// <summary>
/// System.Linq.Async 提供的异步 LINQ 操作符
/// 需要安装 NuGet 包: System.Linq.Async
/// </summary>
public class AsyncLinqExamples
{
/// <summary>
/// 过滤与投影
/// </summary>
public async Task FilterAndProjectAsync()
{
var source = GetSensorDataAsync();
// Where: 异步过滤(支持异步谓词)
var highValues = source.WhereAwait(async (x, ct) =>
{
await Task.Delay(1, ct); // 模拟异步判断
return x.Value > 50;
});
// Select: 异步投影
var projections = highValues.SelectAwait(async (x, ct) =>
{
var enriched = await EnrichDataAsync(x, ct);
return new { Original = x, Enriched = enriched };
});
await foreach (var item in projections)
{
Console.WriteLine($"原始: {item.Original.Value}, 增强: {item.Enriched}");
}
}
/// <summary>
/// 聚合操作
/// </summary>
public async Task AggregationAsync()
{
var source = GetSensorDataAsync();
// Count
int count = await source.CountAsync();
Console.WriteLine($"总数: {count}");
// Sum(需要重新获取源)
int sum = await GetSensorDataAsync().SumAsync(x => x.Value);
Console.WriteLine($"总和: {sum}");
// Average
double avg = await GetSensorDataAsync().AverageAsync(x => x.Value);
Console.WriteLine($"平均: {avg:F2}");
// Min/Max
int min = await GetSensorDataAsync().MinAsync(x => x.Value);
int max = await GetSensorDataAsync().MaxAsync(x => x.Value);
}
/// <summary>
/// 排序与分组
/// </summary>
public async Task OrderingAsync()
{
var source = GetSensorDataAsync();
// OrderBy
var sorted = source.OrderBy(x => x.Timestamp);
// GroupBy(收集到分组中)
var grouped = GetSensorDataAsync().GroupBy(x => x.SensorId);
await foreach (var group in grouped)
{
Console.WriteLine($"传感器 {group.Key}: {await group.CountAsync()} 条数据");
}
// Distinct
var distinctSensors = await GetSensorDataAsync()
.Select(x => x.SensorId)
.Distinct()
.ToListAsync();
}
/// <summary>
/// 批处理与窗口操作
/// </summary>
public async Task BatchingAsync()
{
var source = GetSensorDataAsync();
// Chunk: 按固定大小分批
var chunks = source.Chunk(100);
await foreach (var chunk in chunks)
{
Console.WriteLine($"批次大小: {chunk.Length}");
await BulkInsertAsync(chunk);
}
// Take / Skip
var first10 = GetSensorDataAsync().Take(10);
var skip10 = GetSensorDataAsync().Skip(10);
// TakeWhile / SkipWhile
var beforeThreshold = GetSensorDataAsync()
.TakeWhile(x => x.Value < 100);
}
/// <summary>
/// 合并与拼接
/// </summary>
public async Task CombiningAsync()
{
var source1 = GetSensorDataAsync();
var source2 = GetSensorDataAsync();
// Concat: 顺序连接两个异步流
var combined = source1.Concat(source2);
// Zip: 配对合并
var zipped = source1.Zip(source2, (a, b) => new
{
Sensor1 = a.Value,
Sensor2 = b.Value,
Diff = Math.Abs(a.Value - b.Value)
});
await foreach (var pair in zipped)
{
Console.WriteLine($"差值: {pair.Diff}");
}
}
private async IAsyncEnumerable<SensorData> GetSensorDataAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
var rng = new Random(42);
for (int i = 0; i < 1000; i++)
{
ct.ThrowIfCancellationRequested();
await Task.Delay(10, ct);
yield return new SensorData
{
SensorId = rng.Next(1, 10),
Value = rng.Next(0, 200),
Timestamp = DateTime.UtcNow.AddSeconds(i)
};
}
}
private async Task<string> EnrichDataAsync(SensorData data, CancellationToken ct)
{
await Task.Delay(1, ct);
return $"Sensor-{data.SensorId}@{data.Timestamp:HH:mm:ss}";
}
private async Task BulkInsertAsync(SensorData[] chunk) { }
}
public class SensorData
{
public int SensorId { get; set; }
public int Value { get; set; }
public DateTime Timestamp { get; set; }
}Channel 与异步流
Channel 作为生产者-消费者桥梁
using System.Threading.Channels;
/// <summary>
/// 使用 Channel + IAsyncEnumerable 构建生产者-消费者模式
/// </summary>
public class ChannelProducerConsumer
{
/// <summary>
/// 基础示例:Channel 天然支持 IAsyncEnumerable
/// </summary>
public async Task BasicChannelAsync()
{
// 创建有界 Channel(背压支持)
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait, // 满时等待
SingleReader = true, // 单消费者优化
SingleWriter = true // 单生产者优化
});
// 生产者任务
var producerTask = Task.Run(async () =>
{
for (int i = 0; i < 1000; i++)
{
// WriteAsync 在满时会异步等待(背压)
await channel.Writer.WriteAsync($"消息-{i}");
}
channel.Writer.Complete(); // 标记完成
});
// 消费者:Channel.Reader 实现了 IAsyncEnumerable
await foreach (var message in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"消费: {message}");
}
await producerTask;
}
/// <summary>
/// 多生产者-单消费者模式
/// </summary>
public async Task MultipleProducersAsync()
{
var channel = Channel.CreateUnbounded<MarketTick>(
new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false // 多生产者
});
// 启动多个生产者
var producers = Enumerable.Range(0, 5).Select(symbolId =>
ProduceMarketDataAsync(channel.Writer, symbolId));
// 消费者:实时处理
var consumerTask = ConsumeMarketDataAsync(channel.Reader);
// 等待所有生产者完成
await Task.WhenAll(producers);
channel.Writer.Complete();
await consumerTask;
}
private async Task ProduceMarketDataAsync(
ChannelWriter<MarketTick> writer, int symbolId)
{
var rng = new Random();
for (int i = 0; i < 100; i++)
{
var tick = new MarketTick
{
SymbolId = symbolId,
Price = 100 + rng.NextDouble() * 10,
Volume = rng.Next(100, 10000),
Timestamp = DateTime.UtcNow
};
// TryWrite 不会阻塞,适合高频场景
if (!writer.TryWrite(tick))
{
await writer.WriteAsync(tick);
}
await Task.Delay(10);
}
}
private async Task ConsumeMarketDataAsync(ChannelReader<MarketTick> reader)
{
await foreach (var tick in reader.ReadAllAsync())
{
// 实时处理行情数据
CalculateMovingAverage(tick);
}
}
private void CalculateMovingAverage(MarketTick tick) { }
}
public class MarketTick
{
public int SymbolId { get; set; }
public double Price { get; set; }
public int Volume { get; set; }
public DateTime Timestamp { get; set; }
}高级 Channel 模式
using System.Threading.Channels;
/// <summary>
/// 高级 Channel 使用模式
/// </summary>
public class AdvancedChannelPatterns
{
/// <summary>
/// 模式1:带超时的消费
/// </summary>
public async Task ConsumeWithTimeoutAsync(
ChannelReader<Job> reader, CancellationToken ct)
{
while (await reader.WaitToReadAsync(ct))
{
// 使用 CancellationTokenSource 实现单条消息超时
using var itemCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
itemCts.CancelAfter(TimeSpan.FromSeconds(30));
if (reader.TryRead(out Job? job))
{
try
{
await ProcessJobAsync(job, itemCts.Token);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
Console.WriteLine($"任务超时: {job.Id}");
}
}
}
}
/// <summary>
/// 模式2:广播(多个消费者各收到一份)
/// </summary>
public async Task BroadcastPatternAsync()
{
var broadcaster = new ChannelBroadcaster<string>();
// 多个订阅者
var tasks = new List<Task>();
for (int i = 0; i < 3; i++)
{
int consumerId = i;
var subscription = broadcaster.Subscribe();
tasks.Add(Task.Run(async () =>
{
await foreach (var msg in subscription.ReadAllAsync())
{
Console.WriteLine($"消费者 {consumerId}: {msg}");
}
}));
}
// 广播消息
for (int i = 0; i < 10; i++)
{
await broadcaster.PublishAsync($"广播-{i}");
}
broadcaster.Complete();
await Task.WhenAll(tasks);
}
/// <summary>
/// 模式3:流水线(多阶段处理)
/// </summary>
public async Task PipelinePatternAsync()
{
// 阶段1 -> 阶段2 -> 阶段3
var stage1Output = Channel.CreateBounded<RawData>(100);
var stage2Output = Channel.CreateBounded<ProcessedData>(100);
var stage3Output = Channel.CreateBounded<FinalResult>(100);
// 阶段1:解析原始数据
var stage1 = Task.Run(async () =>
{
await foreach (var raw in stage1Output.Reader.ReadAllAsync())
{
var processed = ParseRawData(raw);
await stage2Output.Writer.WriteAsync(processed);
}
stage2Output.Writer.Complete();
});
// 阶段2:数据增强
var stage2 = Task.Run(async () =>
{
await foreach (var data in stage2Output.Reader.ReadAllAsync())
{
var enriched = await EnrichDataAsync(data);
await stage3Output.Writer.WriteAsync(enriched);
}
stage3Output.Writer.Complete();
});
// 阶段3:输出最终结果
var stage3 = Task.Run(async () =>
{
await foreach (var result in stage3Output.Reader.ReadAllAsync())
{
Console.WriteLine($"最终结果: {result}");
}
});
// 填充输入
for (int i = 0; i < 100; i++)
{
await stage1Output.Writer.WriteAsync(new RawData { Payload = $"data-{i}" });
}
stage1Output.Writer.Complete();
await Task.WhenAll(stage1, stage2, stage3);
}
private ProcessedData ParseRawData(RawData raw) => new();
private async Task<FinalResult> EnrichDataAsync(ProcessedData data) => new();
private async Task ProcessJobAsync(Job job, CancellationToken ct) { }
}
/// <summary>
/// 广播器实现
/// </summary>
public class ChannelBroadcaster<T>
{
private readonly List<Channel<T>> _channels = new();
private readonly object _lock = new();
public ChannelReader<T> Subscribe()
{
var channel = Channel.CreateUnbounded<T>();
lock (_lock)
{
_channels.Add(channel);
}
return channel.Reader;
}
public async ValueTask PublishAsync(T item)
{
List<Channel<T>> snapshot;
lock (_lock)
{
snapshot = _channels.ToList();
}
foreach (var channel in snapshot)
{
await channel.Writer.WriteAsync(item);
}
}
public void Complete()
{
lock (_lock)
{
foreach (var channel in _channels)
{
channel.Writer.Complete();
}
}
}
}
public class RawData { public string Payload { get; set; } = ""; }
public class ProcessedData { }
public class FinalResult { }
public class Job { public int Id { get; set; } }取消与 ConfigureAwait
取消的最佳实践
using System.Runtime.CompilerServices;
/// <summary>
/// 异步流中的取消机制最佳实践
/// </summary>
public class CancellationBestPractices
{
/// <summary>
/// 使用 [EnumeratorCancellation] 使 WithCancellation 生效
/// </summary>
public async IAsyncEnumerable<string> StreamWithCancellation(
[EnumeratorCancellation] CancellationToken ct = default)
{
var response = await HttpClientShared.GetStringAsync("https://api.example.com/stream");
// 模拟流式响应
for (int i = 0; i < 1000; i++)
{
ct.ThrowIfCancellationRequested();
await Task.Delay(100, ct);
yield return $"Line {i}: {response.Substring(0, Math.Min(20, response.Length))}";
}
}
/// <summary>
/// 优雅取消:在取消时返回已收集的数据
/// </summary>
public async IAsyncEnumerable<int> GracefulCancellation(
[EnumeratorCancellation] CancellationToken ct = default)
{
int count = 0;
try
{
while (!ct.IsCancellationRequested)
{
await Task.Delay(100, ct);
count++;
yield return count;
}
}
catch (OperationCanceledException)
{
// 取消时优雅结束,不抛异常
Console.WriteLine($"流在 {count} 条后被取消");
}
}
/// <summary>
/// 超时取消
/// </summary>
public async Task ConsumeWithTimeoutAsync()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
try
{
await foreach (var item in StreamWithCancellation()
.WithCancellation(cts.Token))
{
Console.WriteLine(item);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("5 秒超时取消");
}
}
/// <summary>
/// ConfigureAwait 在异步流中的使用
/// </summary>
public async Task ConfigureAwaitUsage()
{
// 在 ASP.NET Core 中通常不需要 ConfigureAwait(false)
// 因为没有 SynchronizationContext
// 在 WPF/WinForms 中,ConfigureAwait(true) 可回到 UI 线程
await foreach (var item in StreamWithCancellation()
.ConfigureAwait(false))
{
// 这里在线程池线程上执行
}
}
private static readonly HttpClient HttpClientShared = new();
}异步迭代器陷阱
常见陷阱与解决方案
/// <summary>
/// 异步迭代器常见陷阱
/// </summary>
public class AsyncIteratorPitfalls
{
/// <summary>
/// 陷阱1:资源泄漏 — 迭代器未完全消费
/// </summary>
public async IAsyncEnumerable<string> ReadFileLinesAsync(
string path,
[EnumeratorCancellation] CancellationToken ct = default)
{
var reader = new StreamReader(path);
try
{
while (!reader.EndOfStream)
{
ct.ThrowIfCancellationRequested();
var line = await reader.ReadLineAsync(ct);
if (line != null) yield return line;
}
}
finally
{
// 关键:finally 确保即使消费者提前退出也会释放资源
await reader.DisposeAsync();
}
}
/// <summary>
/// 陷阱2:异常后无法继续迭代
/// </summary>
public async Task ExceptionHandlingDemo()
{
// 错误做法:异常后 foreach 退出,无法继续
IAsyncEnumerable<int> source = FailingStreamAsync();
await foreach (var item in source)
{
Console.WriteLine(item); // 遇到异常就会终止
}
// 正确做法:在迭代器内部处理异常
await foreach (var item in SafeStreamAsync())
{
Console.WriteLine(item);
}
}
private async IAsyncEnumerable<int> FailingStreamAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
for (int i = 0; i < 10; i++)
{
if (i == 5) throw new InvalidOperationException("第5个元素出错");
yield return i;
}
}
private async IAsyncEnumerable<int> SafeStreamAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
for (int i = 0; i < 10; i++)
{
try
{
if (i == 5)
{
// 跳过出错的元素,而不是抛出异常
continue;
}
yield return i;
}
catch (Exception ex)
{
Console.WriteLine($"处理元素 {i} 时出错: {ex.Message}");
}
}
}
/// <summary>
/// 陷阱3:多次枚举导致重复执行
/// </summary>
public async Task MultipleEnumerationDemo()
{
IAsyncEnumerable<int> stream = ExpensiveOperationAsync();
// 错误:每次 await foreach 都会重新执行
int count = await stream.CountAsync(); // 执行一次
int sum = await stream.SumAsync(); // 又执行一次!
// 正确:缓存结果
List<int> cached = await stream.ToListAsync();
int countCached = cached.Count;
int sumCached = cached.Sum();
}
/// <summary>
/// 陷阱4:在 using 声明中使用异步流
/// </summary>
public async Task UsingDeclarationDemo()
{
// IAsyncEnumerable 不实现 IAsyncDisposable
// 如果需要确保清理,使用 await using
await using (var enumerator = GetDataAsync().GetAsyncEnumerator())
{
while (await enumerator.MoveNextAsync())
{
Console.WriteLine(enumerator.Current);
}
} // 自动调用 DisposeAsync
// 或者确保在 finally 中完成清理
}
private async IAsyncEnumerable<int> ExpensiveOperationAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
Console.WriteLine("执行昂贵的操作...");
await Task.Delay(1000, ct);
yield return 1;
yield return 2;
yield return 3;
}
private async IAsyncEnumerable<int> GetDataAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
for (int i = 0; i < 10; i++)
{
await Task.Delay(50, ct);
yield return i;
}
}
}背压与批处理
背压控制策略
using System.Threading.Channels;
/// <summary>
/// 异步流中的背压控制
/// </summary>
public class BackpressureStrategies
{
/// <summary>
/// 策略1:使用 BoundedChannel 实现背压
/// </summary>
public async Task BoundedChannelBackpressureAsync()
{
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(50)
{
FullMode = BoundedChannelFullMode.Wait, // 满时等待消费者
SingleWriter = true,
SingleReader = true
});
// 快速生产者
var producer = Task.Run(async () =>
{
for (int i = 0; i < 1000; i++)
{
// 当 Channel 满时,WriteAsync 会等待
await channel.Writer.WriteAsync(i);
Console.WriteLine($"生产: {i}");
}
channel.Writer.Complete();
});
// 慢速消费者(自动产生背压效果)
var consumer = Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
await Task.Delay(50); // 模拟慢速处理
Console.WriteLine($" 消费: {item}");
}
});
await Task.WhenAll(producer, consumer);
}
/// <summary>
/// 策略2:手动批处理(每 N 条或每 T 时间)
/// </summary>
public async IAsyncEnumerable<IReadOnlyList<T>> BatchBySizeAndTime<T>(
IAsyncEnumerable<T> source,
int batchSize,
TimeSpan maxWaitTime,
[EnumeratorCancellation] CancellationToken ct = default)
{
var batch = new List<T>(batchSize);
using var timerCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
await foreach (var item in source.WithCancellation(ct))
{
batch.Add(item);
if (batch.Count >= batchSize)
{
yield return batch;
batch = new List<T>(batchSize);
}
}
// 返回最后不满一批的数据
if (batch.Count > 0)
{
yield return batch;
}
}
/// <summary>
/// 策略3:滑动窗口
/// </summary>
public async IAsyncEnumerable<IReadOnlyList<double>> SlidingWindowAsync(
IAsyncEnumerable<double> source,
int windowSize,
[EnumeratorCancellation] CancellationToken ct = default)
{
var window = new Queue<double>(windowSize);
await foreach (var value in source.WithCancellation(ct))
{
window.Enqueue(value);
if (window.Count > windowSize)
window.Dequeue();
if (window.Count == windowSize)
{
yield return window.ToList();
}
}
}
/// <summary>
/// 批处理使用示例
/// </summary>
public async Task BatchingUsageAsync(CancellationToken ct)
{
var sensorDataStream = GetSensorStreamAsync();
// 每 100 条或最多等 1 秒
var batches = BatchBySizeAndTime(sensorDataStream, 100, TimeSpan.FromSeconds(1), ct);
await foreach (var batch in batches)
{
await SaveToDatabaseAsync(batch, ct);
Console.WriteLine($"保存批次: {batch.Count} 条");
}
}
private async IAsyncEnumerable<double> GetSensorStreamAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
var rng = new Random();
for (int i = 0; i < 10000; i++)
{
ct.ThrowIfCancellationRequested();
await Task.Delay(10, ct);
yield return rng.NextDouble() * 100;
}
}
private async Task SaveToDatabaseAsync<T>(IReadOnlyList<T> batch, CancellationToken ct) { }
}实时数据管道实战
SSE 到 IAsyncEnumerable 的适配
using System.Runtime.CompilerServices;
using System.Net.Http;
/// <summary>
/// 将 SSE(Server-Sent Events)流转为 IAsyncEnumerable
/// </summary>
public class SseClient : IAsyncDisposable
{
private readonly HttpClient _httpClient;
private CancellationTokenSource? _cts;
public SseClient(HttpClient httpClient)
{
_httpClient = httpClient;
}
/// <summary>
/// 连接 SSE 并返回异步流
/// </summary>
public async IAsyncEnumerable<SseEvent> ConnectAsync(
string url,
[EnumeratorCancellation] CancellationToken ct = default)
{
using var request = new HttpRequestMessage(HttpMethod.Get, url);
request.Headers.Accept.Add(new System.Net.Http.Headers.MediaTypeWithQualityHeaderValue("text/event-stream"));
using var response = await _httpClient.SendAsync(
request, HttpCompletionOption.ResponseHeadersRead, ct);
response.EnsureSuccessStatusCode();
using var stream = await response.Content.ReadAsStreamAsync(ct);
using var reader = new StreamReader(stream);
string? eventType = null;
var dataBuilder = new StringBuilder();
while (!ct.IsCancellationRequested)
{
var line = await reader.ReadLineAsync(ct);
if (line == null) break;
if (string.IsNullOrEmpty(line))
{
// 空行表示事件结束
if (dataBuilder.Length > 0)
{
yield return new SseEvent
{
EventType = eventType ?? "message",
Data = dataBuilder.ToString()
};
dataBuilder.Clear();
eventType = null;
}
continue;
}
if (line.StartsWith("event:"))
{
eventType = line["event:".Length..].Trim();
}
else if (line.StartsWith("data:"))
{
if (dataBuilder.Length > 0) dataBuilder.AppendLine();
dataBuilder.Append(line["data:".Length..].Trim());
}
// 忽略 id: 和 retry: 字段
}
}
public ValueTask DisposeAsync()
{
_cts?.Cancel();
_cts?.Dispose();
return ValueTask.CompletedTask;
}
}
public class SseEvent
{
public string EventType { get; set; } = "message";
public string Data { get; set; } = "";
}WebSocket 异步流封装
using System.Net.WebSockets;
/// <summary>
/// WebSocket 异步流封装
/// </summary>
public class WebSocketStream : IAsyncDisposable
{
private readonly ClientWebSocket _webSocket = new();
private readonly Uri _uri;
public WebSocketStream(string uri)
{
_uri = new Uri(uri);
}
public async Task ConnectAsync(CancellationToken ct = default)
{
await _webSocket.ConnectAsync(_uri, ct);
}
/// <summary>
/// 将 WebSocket 接收转为 IAsyncEnumerable
/// </summary>
public async IAsyncEnumerable<string> ReceiveMessagesAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
var buffer = new byte[4096];
while (_webSocket.State == WebSocketState.Open && !ct.IsCancellationRequested)
{
var result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), ct);
if (result.MessageType == WebSocketMessageType.Close)
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", ct);
yield break;
}
if (result.EndOfMessage)
{
var message = Encoding.UTF8.GetString(buffer, 0, result.Count);
yield return message;
}
else
{
// 处理大消息:拼接多帧
using var ms = new MemoryStream();
ms.Write(buffer, 0, result.Count);
while (!result.EndOfMessage)
{
result = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), ct);
ms.Write(buffer, 0, result.Count);
}
yield return Encoding.UTF8.GetString(ms.ToArray());
}
}
}
/// <summary>
/// 发送消息到 WebSocket
/// </summary>
public async Task SendAsync(string message, CancellationToken ct = default)
{
var bytes = Encoding.UTF8.GetBytes(message);
await _webSocket.SendAsync(new ArraySegment<byte>(bytes), WebSocketMessageType.Text, true, ct);
}
public async ValueTask DisposeAsync()
{
if (_webSocket.State == WebSocketState.Open)
{
try
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Disposing", CancellationToken.None);
}
catch
{
// 忽略关闭时的异常
}
}
_webSocket.Dispose();
}
}IAsyncEnumerable vs IObservable
拉取模型 vs 推送模型对比
using System.Runtime.CompilerServices;
/// <summary>
/// IAsyncEnumerable(拉取式)vs IObservable(推送式)
/// </summary>
public class PullVsPush
{
/// <summary>
/// IAsyncEnumerable: 消费者控制节奏(拉取式)
/// - 优点:自然支持背压,消费者可以决定何时获取下一条
/// - 优点:使用熟悉的 foreach 语法
/// - 缺点:延迟一个完整的 await 周期
/// </summary>
public async Task PullModelDemo()
{
// 消费者控制节奏
await foreach (var item in SlowProducerAsync())
{
// 消费者决定何时请求下一条
await Task.Delay(100); // 处理完才拉下一条
}
}
private async IAsyncEnumerable<int> SlowProducerAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
for (int i = 0; i < 10; i++)
{
await Task.Delay(50, ct);
yield return i; // 只有被请求时才产生下一个
}
}
/// <summary>
/// IObservable: 生产者控制节奏(推送式)
/// - 优点:更高吞吐,无需等待消费者请求
/// - 优点:丰富的 Rx 操作符
/// - 缺点:需要额外的背压机制
/// - 缺点:学习曲线陡峭
/// </summary>
public void PushModelDemo()
{
var observable = new System.Reactive.Subjects.Subject<int>();
observable
.Buffer(TimeSpan.FromSeconds(1))
.Subscribe(batch =>
{
Console.WriteLine($"收到 {batch.Count} 条数据");
});
// 推送数据
for (int i = 0; i < 100; i++)
{
observable.OnNext(i); // 不管消费者是否准备好
}
}
/// <summary>
/// 选择指南:
/// - IAsyncEnumerable: 数据库查询、文件读取、HTTP 流式响应
/// - IObservable: UI 事件、实时行情、传感器数据(高频推送)
/// - Channel: 需要缓冲的生产者-消费者场景
/// </summary>
}性能优化技巧
减少分配和开销
using System.Runtime.CompilerServices;
/// <summary>
/// 异步流性能优化技巧
/// </summary>
public class AsyncStreamPerformance
{
/// <summary>
/// 优化1:使用 ValueTask 减少分配
/// </summary>
public async IAsyncEnumerable<int> OptimizedProducer(
[EnumeratorCancellation] CancellationToken ct = default)
{
// IAsyncEnumerable 的 MoveNextAsync 返回 ValueTask<bool>
// 大多数情况下不会分配(同步完成时)
for (int i = 0; i < 100; i++)
{
ct.ThrowIfCancellationRequested();
yield return i; // 同步 yield 不产生堆分配
}
}
/// <summary>
/// 优化2:预分配集合 + 批量 yield
/// </summary>
public async IAsyncEnumerable<IReadOnlyList<T>> ChunkedReadAsync<T>(
IAsyncEnumerable<T> source,
int chunkSize,
[EnumeratorCancellation] CancellationToken ct = default)
{
var chunk = new List<T>(chunkSize);
await foreach (var item in source.WithCancellation(ct))
{
chunk.Add(item);
if (chunk.Count >= chunkSize)
{
yield return chunk;
chunk = new List<T>(chunkSize);
}
}
if (chunk.Count > 0)
yield return chunk;
}
/// <summary>
/// 优化3:避免在热路径中使用 async LINQ
/// </summary>
public async Task AvoidUnnecessaryAsyncLinq()
{
// 慢:多个 LINQ 操作符叠加,每层都有开销
var source = GetDataAsync();
var result = source
.Where(x => x > 0)
.Select(x => x * 2)
.OrderBy(x => x)
.Take(100);
// 快:手动循环 + 简单条件
var fastResult = new List<int>(100);
await foreach (var item in GetDataAsync())
{
if (item > 0)
{
fastResult.Add(item * 2);
if (fastResult.Count >= 100) break;
}
}
fastResult.Sort();
}
/// <summary>
/// 优化4:使用 Struct Enumerator 避免装箱
/// </summary>
public async Task StructEnumeratorDemo()
{
// Channel.Reader.ReadAllAsync() 返回结构体枚举器
// 避免了 IAsyncEnumerator 的接口调度开销
var channel = Channel.CreateBounded<int>(10);
for (int i = 0; i < 10; i++)
await channel.Writer.WriteAsync(i);
channel.Writer.Complete();
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine(item);
}
}
private async IAsyncEnumerable<int> GetDataAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
for (int i = 0; i < 1000; i++)
{
await Task.Yield();
yield return i;
}
}
}性能注意事项
- MoveNextAsync 开销:每次迭代都有 ValueTask 状态机开销,对极高吞吐场景需考虑批量处理
- async LINQ 链式调用:多个操作符叠加会增加多层状态机,热路径中应手动展开
- CancellationToken 传播:确保通过
[EnumeratorCancellation]正确传递取消令牌 - Channel 的 SingleReader/SingleWriter:设置后可以减少锁竞争,提升吞吐
- ConfigureAwait(false):在库代码中始终使用,避免不必要的上下文切换
- 资源释放:迭代器中的 finally 块确保资源释放,但消费者提前 break 也可能触发
总结
IAsyncEnumerable 是 .NET 异步编程的重要补充,它填补了 IEnumerable(同步阻塞)和 IObservable(推送式复杂)之间的空白。结合 Channel 可以构建强大的生产者-消费者管道,同时保持优雅的 foreach 语法。关键在于理解拉取模型的背压特性,并根据场景选择合适的异步流模式。
关键知识点
- IAsyncEnumerable 使用 yield return + async 组合创建异步迭代器
[EnumeratorCancellation]特性使 WithCancellation 生效- System.Linq.Async 提供了完整的异步 LINQ 操作符集
- Channel 天然支持 IAsyncEnumerable,通过 ReadAllAsync() 获取
- BoundedChannel 实现背压,UnboundedChannel 追求吞吐
- 迭代器方法中的 finally 块确保资源释放
- ConfigureAwait(false) 在库代码中减少上下文切换开销
- 手动实现 IAsyncEnumerator 适用于需要精细控制的场景
常见误区
| 误区 | 正确理解 |
|---|---|
| IAsyncEnumerable 可以替代所有异步集合操作 | 它适用于流式、按需的场景,不适用于需要随机访问的场景 |
| async LINQ 和同步 LINQ 一样高效 | async LINQ 每层操作都有状态机开销 |
| Channel 是线程安全的所以随意用 | 应根据读写模式设置 SingleReader/SingleWriter 优化 |
| yield return 之后的代码总会执行 | 取消或未完整消费时 finally 执行,但 yield 后的代码不一定 |
| IAsyncEnumerable 可以被多次枚举 | 每次枚举都会重新执行迭代器方法 |
| ConfigureAwait 在 ASP.NET Core 中很重要 | ASP.NET Core 没有 SynchronizationContext,影响极小 |
进阶路线
- 入门阶段:掌握 async yield return、await foreach 基本语法
- 进阶阶段:使用 System.Linq.Async 操作符、Channel 生产者-消费者
- 高级阶段:实现自定义 IAsyncEnumerator、流水线模式、背压控制
- 专家阶段:与 System.IO.Pipelines 集成、高性能二进制流解析
- 架构级别:构建事件驱动微服务、实时数据平台
适用场景
- 数据库分页查询结果的流式返回
- HTTP SSE/WebSocket 消息流
- 日志文件的实时尾随(tail -f)
- IoT 传感器数据的流式处理
- 股票/加密货币实时行情推送
- 大文件的逐行/逐块处理
- gRPC 服务端流式 RPC
落地建议
- 对外 API 优先考虑 IAsyncEnumerable 返回值(替代 List + offset 分页)
- Channel 作为服务间缓冲层,设置合理的容量和满策略
- 所有异步迭代器方法都接受 CancellationToken 参数
- 使用 ToListAsync/ToDictionaryAsync 而非多次枚举
- 在库代码中始终使用 ConfigureAwait(false)
排错清单
复盘问题
- 你的项目中哪些数据源适合从 List 返回改为 IAsyncEnumerable 返回?
- 当前系统中是否存在消费者无法跟上生产者的情况?如何引入背压?
- 异步流的错误处理策略应该怎样设计?重试还是跳过?
- Channel 的 SingleReader/SingleWriter 配置是否与实际使用模式匹配?
- 如何监控异步流管道中各阶段的吞吐和延迟?
