流式响应与 Server-Sent Events
大约 10 分钟约 2875 字
流式响应与 Server-Sent Events
简介
流式响应允许服务器逐步发送数据,而不是等待全部数据准备好后一次性发送。ASP.NET Core 支持多种流式技术:IAsyncEnumerable、Server-Sent Events(SSE)、StreamWriter 和 PipeWriter,适用于实时数据推送、大文件下载和进度报告等场景。
特点
IAsyncEnumerable 响应
异步流式返回
// IAsyncEnumerable — 逐项生成并发送响应
// 适用于:数据库游标、实时数据、日志流
// Minimal API 直接返回 IAsyncEnumerable
app.MapGet("/api/numbers", async (CancellationToken ct) =>
{
// 每秒生成一个数字,直到取消
return GetNumbersAsync(ct);
});
async IAsyncEnumerable<int> GetNumbersAsync([EnumeratorCancellation] CancellationToken ct)
{
for (int i = 0; i < 100; i++)
{
ct.ThrowIfCancellationRequested();
await Task.Delay(1000, ct);
yield return i;
}
}
// JSON 数组流式响应
app.MapGet("/api/users/stream", async (AppDbContext db, CancellationToken ct) =>
{
// EF Core 的 AsAsyncEnumerable 逐行读取
return db.Users
.AsNoTracking()
.Where(u => u.Active)
.OrderBy(u => u.Name)
.AsAsyncEnumerable();
});
// 带条件的流式响应
app.MapGet("/api/notifications", (INotificationService notifications, CancellationToken ct) =>
{
return notifications.SubscribeAsync(ct);
});
// INotificationService 实现
public class NotificationService : INotificationService
{
private readonly Channel<Notification> _channel = Channel.CreateBounded<Notification>(100);
public async IAsyncEnumerable<Notification> SubscribeAsync(
[EnumeratorCancellation] CancellationToken ct)
{
await foreach (var notification in _channel.Reader.ReadAllAsync(ct))
{
yield return notification;
}
}
public async Task PublishAsync(Notification notification)
{
await _channel.Writer.WriteAsync(notification);
}
}Server-Sent Events(SSE)
SSE 推送实现
// SSE 是单向推送协议(服务器 → 客户端)
// 基于 HTTP 长连接,Content-Type: text/event-stream
// 比 WebSocket 简单,不需要协议升级
app.MapGet("/sse/notifications", async (HttpContext context, CancellationToken ct) =>
{
context.Response.ContentType = "text/event-stream";
context.Response.Headers.Append("Cache-Control", "no-cache");
context.Response.Headers.Append("Connection", "keep-alive");
// SSE 格式:
// field: value\n\n
// 例如:
// data: Hello World\n\n
// event: message\n
// data: {"text":"Hello"}\n\n
var notificationService = context.RequestServices.GetRequiredService<INotificationService>();
await foreach (var notification in notificationService.SubscribeAsync(ct))
{
var json = JsonSerializer.Serialize(notification);
// SSE 事件格式
await context.Response.WriteAsync($"event: notification\n", ct);
await context.Response.WriteAsync($"data: {json}\n\n", ct);
await context.Response.Body.FlushAsync(ct);
}
});
// 封装 SSE 辅助方法
public static class SseExtensions
{
public static async Task SendSseEventAsync(this HttpResponse response,
string eventType, object data, CancellationToken ct)
{
var json = JsonSerializer.Serialize(data);
await response.WriteAsync($"event: {eventType}\n", ct);
await response.WriteAsync($"data: {json}\n\n", ct);
await response.Body.FlushAsync(ct);
}
public static async Task SendSseCommentAsync(this HttpResponse response,
string comment, CancellationToken ct)
{
await response.WriteAsync($": {comment}\n\n", ct);
await response.Body.FlushAsync(ct);
}
}
// 使用
app.MapGet("/sse/prices", async (HttpContext context, CancellationToken ct) =>
{
context.Response.ContentType = "text/event-stream";
// 发送心跳注释
_ = Task.Run(async () =>
{
while (!ct.IsCancellationRequested)
{
await Task.Delay(15000, ct);
await context.Response.SendSseCommentAsync("heartbeat", ct);
}
}, ct);
while (!ct.IsCancellationRequested)
{
var price = GetLatestPrice();
await context.Response.SendSseEventAsync("price", price, ct);
await Task.Delay(1000, ct);
}
});SSE 客户端(JavaScript)
// JavaScript EventSource API
const eventSource = new EventSource('/sse/notifications');
eventSource.addEventListener('notification', (event) => {
const data = JSON.parse(event.data);
console.log('通知:', data);
updateUI(data);
});
eventSource.addEventListener('price', (event) => {
const price = JSON.parse(event.data);
updatePriceDisplay(price);
});
eventSource.onerror = (event) => {
console.log('SSE 连接错误,自动重连...');
// EventSource 会自动重连
};
// 关闭连接
// eventSource.close();流式下载
大文件流式传输
// 流式下载(不加载整个文件到内存)
app.MapGet("/download/{fileId}", async (string fileId, HttpContext context, CancellationToken ct) =>
{
var filePath = GetFilePath(fileId);
if (!File.Exists(filePath))
return Results.NotFound();
var fileInfo = new FileInfo(filePath);
context.Response.ContentType = "application/octet-stream";
context.Response.Headers.Append("Content-Disposition",
$"attachment; filename=\"{fileInfo.Name}\"");
context.Response.ContentLength = fileInfo.Length;
// 支持断点续传(Range 请求)
var rangeHeader = context.Request.Headers.Range;
if (string.IsNullOrEmpty(rangeHeader))
{
// 完整传输
await using var fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read,
FileShare.Read, bufferSize: 81920, useAsync: true);
await fileStream.CopyToAsync(context.Response.Body, ct);
}
else
{
// Range 传输
var range = ParseRange(rangeHeader!, fileInfo.Length);
context.Response.StatusCode = 206; // Partial Content
context.Response.Headers.Append("Content-Range",
$"bytes {range.Start}-{range.End}/{fileInfo.Length}");
context.Response.ContentLength = range.Length;
await using var fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read,
FileShare.Read, bufferSize: 81920, useAsync: true);
fileStream.Position = range.Start;
await fileStream.CopyToAsync(context.Response.Body, ct);
}
return Results.Empty;
});
// 流式生成 CSV(不占用内存)
app.MapGet("/export/users", async (HttpContext context, AppDbContext db, CancellationToken ct) =>
{
context.Response.ContentType = "text/csv";
context.Response.Headers.Append("Content-Disposition", "attachment; filename=users.csv");
await context.Response.WriteAsync("Id,Name,Email,CreatedAt\n", ct);
await foreach (var user in db.Users.AsNoTracking().AsAsyncEnumerable().WithCancellation(ct))
{
var line = $"{user.Id},\"{user.Name}\",\"{user.Email}\",{user.CreatedAt:yyyy-MM-dd}\n";
await context.Response.WriteAsync(line, ct);
}
});进度报告
实时任务进度
Pipelines 高性能 IO
System.IO.Pipelines
// Pipelines 提供高性能的内存管理,避免频繁分配
// 适用于:自定义协议解析、大文件处理、实时数据流
/// <summary>
/// 使用 Pipelines 处理大文件上传
/// </summary>
public class FileUploadPipeline
{
public async Task<long> ProcessUploadedFileAsync(
Stream inputStream,
string outputPath,
CancellationToken ct)
{
await using var outputStream = new FileStream(
outputPath, FileMode.Create, FileAccess.Write, FileShare.None,
bufferSize: 65536, useAsync: true);
var pipe = new Pipe();
var writingTask = FillPipeAsync(inputStream, pipe.Writer, ct);
var readingTask = ReadPipeAsync(pipe.Reader, outputStream, ct);
await Task.WhenAll(writingTask, readingTask);
return outputStream.Length;
}
private async Task FillPipeAsync(
Stream source, PipeWriter writer, CancellationToken ct)
{
const int bufferSize = 81920;
while (true)
{
var memory = writer.GetMemory(bufferSize);
try
{
var bytesRead = await source.ReadAsync(memory, ct);
if (bytesRead == 0) break;
writer.Advance(bytesRead);
}
catch
{
break;
}
var result = await writer.FlushAsync(ct);
if (result.IsCompleted) break;
}
await writer.CompleteAsync();
}
private async Task ReadPipeAsync(
PipeReader reader, Stream destination, CancellationToken ct)
{
while (true)
{
var result = await reader.ReadAsync(ct);
var buffer = result.Buffer;
if (buffer.IsSingleSegment)
{
await destination.WriteAsync(buffer.First, ct);
}
else
{
foreach (var segment in buffer)
{
await destination.WriteAsync(segment, ct);
}
}
reader.AdvanceTo(buffer.End);
if (result.IsCompleted) break;
}
await reader.CompleteAsync();
}
}SSE 与反向代理兼容
Nginx/CDN 配置
// SSE 在反向代理环境下的注意事项:
// 1. Nginx 默认会缓冲响应,需要关闭
// 2. 负载均衡可能因超时断开长连接
// 3. CDN 通常不支持 SSE
// Nginx 配置:
// location /sse/ {
// proxy_pass http://backend;
// proxy_http_version 1.1;
// proxy_set_header Connection '';
// proxy_buffering off; # 关键:关闭缓冲
// proxy_cache off; # 关闭缓存
// proxy_read_timeout 86400s; # 超时 24 小时
// chunked_transfer_encoding on;
// }
// 客户端断线重连
app.MapGet("/sse/events", async (HttpContext context, CancellationToken ct) =>
{
context.Response.ContentType = "text/event-stream";
context.Response.Headers.Append("X-Accel-Buffering", "no"); // Nginx 关闭缓冲
// Last-Event-ID 处理(断线重连后从上次位置继续)
var lastEventId = context.Request.Headers["Last-Event-ID"].FirstOrDefault();
if (!string.IsNullOrEmpty(lastEventId))
{
var lastId = long.Parse(lastEventId);
// 从 lastId 之后继续发送
_logger.LogInformation("客户端重连,从事件 {LastEventId} 继续", lastId);
}
long eventId = 0;
if (!string.IsNullOrEmpty(lastEventId))
eventId = long.Parse(lastEventId);
while (!ct.IsCancellationRequested)
{
eventId++;
var data = JsonSerializer.Serialize(new { Id = eventId, Time = DateTime.UtcNow });
await context.Response.WriteAsync($"id: {eventId}\n", ct);
await context.Response.WriteAsync($"data: {data}\n\n", ct);
await context.Response.Body.FlushAsync(ct);
await Task.Delay(1000, ct);
}
});WebSocket 作为流式方案
WebSocket 双向流
// WebSocket 适合双向实时通信
app.Map("/ws", (WebSocket webSocket, HttpContext context) =>
{
return HandleWebSocketAsync(webSocket, context);
});
async Task HandleWebSocketAsync(WebSocket webSocket, HttpContext context)
{
var buffer = new byte[1024 * 4];
var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
while (!receiveResult.CloseStatus.HasValue)
{
// 接收消息
var message = Encoding.UTF8.GetString(buffer, 0, receiveResult.Count);
_logger.LogInformation("收到 WebSocket 消息: {Message}", message);
// 发送响应
var response = Encoding.UTF8.GetBytes($"Echo: {message}");
await webSocket.SendAsync(
new ArraySegment<byte>(response),
WebSocketMessageType.Text,
endOfMessage: true,
CancellationToken.None);
receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}
await webSocket.CloseAsync(
receiveResult.CloseStatus.Value,
receiveResult.CloseStatusDescription,
CancellationToken.None);
}
// SSE vs WebSocket 对比:
// SSE: 单向(服务器→客户端),自动重连,HTTP 协议,简单
// WebSocket: 双向,需要手动重连,需要协议升级,功能更强
// 选择建议:如果只需要服务器推送数据,用 SSE;如果需要双向通信,用 WebSocket 或 SignalR流式响应的内存管理
避免内存泄漏
/// <summary>
/// 流式响应的常见陷阱与最佳实践
/// </summary>
// 1. 始终接受 CancellationToken
app.MapGet("/api/stream", async (CancellationToken ct) =>
{
// ✅ 正确:接受 CancellationToken
return GetStreamAsync(ct);
});
// 2. 使用 Channel 而不是 BlockingCollection
app.MapGet("/api/events", async (CancellationToken ct) =>
{
var channel = Channel.CreateBounded<EventData>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.DropOldest, // 背压处理
SingleReader = true,
SingleWriter = false
});
// 生产者(后台任务)
_ = Task.Run(async () =>
{
try
{
while (!ct.IsCancellationRequested)
{
var data = await ProduceEventAsync();
if (!await channel.Writer.WaitToWriteAsync(ct))
break;
await channel.Writer.WriteAsync(data, ct);
}
}
finally
{
channel.Writer.Complete();
}
}, ct);
// 消费者(HTTP 响应流)
var stream = channel.Reader.ReadAllAsync(ct);
return stream;
});
// 3. 大数据集分页流式导出
app.MapGet("/api/export/large-dataset", async (HttpContext context, AppDbContext db, CancellationToken ct) =>
{
context.Response.ContentType = "text/csv";
context.Response.Headers.Append("Content-Disposition", "attachment; filename=export.csv");
// 使用 AsAsyncEnumerable 逐行读取,避免全量加载到内存
await context.Response.WriteAsync("Id,Name,Email,Status\n", ct);
int count = 0;
await foreach (var item in db.Users
.AsNoTracking()
.OrderBy(u => u.Id)
.AsAsyncEnumerable()
.WithCancellation(ct))
{
var line = $"{item.Id},\"{item.Name}\",\"{item.Email}\",\"{item.Status}\"\n";
await context.Response.WriteAsync(line, ct);
// 定期刷新缓冲区
if (++count % 100 == 0)
{
await context.Response.Body.FlushAsync(ct);
}
}
});
// 4. 设置超时防止资源泄漏
app.MapGet("/api/stream/timeout", async (HttpContext context) =>
{
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
context.RequestAborted.Register(() => cts.Cancel());
try
{
await foreach (var item in GetStreamAsync(cts.Token))
{
await context.Response.WriteAsync(JsonSerializer.Serialize(item) + "\n", cts.Token);
await context.Response.Body.FlushAsync(cts.Token);
}
}
catch (OperationCanceledException)
{
// 客户端断开连接,正常结束
}
});// 使用 SSE 推送任务进度
app.MapGet("/tasks/{taskId}/progress", async (string taskId, HttpContext context, CancellationToken ct) =>
{
context.Response.ContentType = "text/event-stream";
context.Response.Headers.Append("Cache-Control", "no-cache");
var taskService = context.RequestServices.GetRequiredService<ITaskService>();
var progress = taskService.GetProgress(taskId);
while (!ct.IsCancellationRequested)
{
var status = progress.GetStatus();
await context.Response.SendSseEventAsync("progress", new
{
TaskId = taskId,
status.PercentComplete,
status.CurrentStep,
status.TotalSteps,
status.Message,
status.IsCompleted
}, ct);
if (status.IsCompleted) break;
await Task.Delay(500, ct);
}
});
// 长时间运行任务(后台)
app.MapPost("/tasks/import", async (ITaskService taskService) =>
{
var taskId = taskService.StartImportTask();
return Results.Accepted($"/tasks/{taskId}/progress", new { TaskId = taskId });
});
public class ImportTaskService : ITaskService
{
private readonly ConcurrentDictionary<string, TaskProgress> _progress = new();
public string StartImportTask()
{
var taskId = Guid.NewGuid().ToString();
_progress[taskId] = new TaskProgress();
_ = Task.Run(async () =>
{
var progress = _progress[taskId];
for (int i = 0; i < 100; i++)
{
await Task.Delay(100);
progress.Update(i + 1, 100, $"处理第 {i + 1} 条");
}
progress.Complete();
});
return taskId;
}
}优点
缺点
总结
IAsyncEnumerable<T> 是 ASP.NET Core 流式响应的核心,框架自动逐项序列化并发送。SSE 基于 HTTP 长连接,使用 text/event-stream 格式,支持事件类型和数据字段。大文件流式下载使用 FileStream.CopyToAsync 分块传输,支持 Range 请求实现断点续传。进度报告通过 SSE 推送任务执行状态。SSE 比 WebSocket 简单(不需要协议升级),但只支持单向推送。心跳注释保持连接活跃,防止中间代理超时。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
适用场景
- 当你准备把《流式响应与 Server-Sent Events》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《流式响应与 Server-Sent Events》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《流式响应与 Server-Sent Events》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《流式响应与 Server-Sent Events》最大的收益和代价分别是什么?
