Server-Sent Events 推送
大约 14 分钟约 4084 字
Server-Sent Events 推送
简介
Server-Sent Events(SSE)是一种基于 HTTP/1.1 的服务端单向推送技术,通过 text/event-stream 内容类型在持久化 HTTP 连接上向客户端持续发送事件。与 WebSocket 的全双工通信不同,SSE 是单向的(服务端到客户端),但实现更简单、兼容性更好、自动重连更可靠。在 .NET 8 中,ASP.NET Core 原生支持 SSE 端点,结合 Channel<T> 和 IAsyncEnumerable 可以构建高性能的事件推送系统。深入理解 SSE 协议格式、连接管理、重连机制和多客户端广播,有助于在通知推送、实时日志、股票行情、AI 流式输出等场景中做出正确的技术选型。
特点
SSE 协议格式
消息结构
SSE 协议非常简单,基于纯文本格式:
基本消息格式:
data: 消息内容\n\n
带事件类型的消息:
event: order_created\n
data: {"orderId": 123, "total": 99.99}\n\n
带 ID 的消息:
id: 42\n
data: 消息内容\n\n
多条 data 组成一条消息:
data: 第一行\n
data: 第二行\n
data: 第三行\n\n
完整示例(服务端响应):
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
id: 1
event: notification
data: {"message": "Hello World", "timestamp": "2024-01-15T10:00:00Z"}
id: 2
event: alert
data: {"level": "warning", "message": "CPU usage > 80%"}
id: 3
data: heartbeat
注意:每条消息以空行(\n\n)分隔
每个字段以冒号分隔
data 字段可以出现多次(自动用换行拼接)SSE vs WebSocket 对比
SSE WebSocket
────────────────────────────────────────────────────────────────────
协议 HTTP/1.1(长连接) 独立协议(ws://)
方向 单向(服务端→客户端) 全双工(双向)
数据格式 文本 文本 + 二进制
自动重连 浏览器原生支持 需自行实现
断点续传 Last-Event-ID 需自行实现
代理兼容 标准 HTTP,穿透代理 可能被代理阻断
连接数限制 HTTP/1.1: 6 个/域名 无限制
浏览器 API EventSource WebSocket
服务端复杂度 低 中
适用场景 通知、日志、行情、AI 流式 聊天、协作、游戏
选择建议:
- 只需要服务端推送 → SSE(更简单)
- 需要双向通信 → WebSocket
- 需要穿透企业代理 → SSE
- 需要传输二进制数据 → WebSocket
- 客户端是浏览器 → SSE(自动重连)
- 客户端是后端服务 → 都可以ASP.NET Core SSE 实现
基础端点
var app = builder.Build();
// 最简 SSE 端点
app.MapGet("/sse/time", async (HttpContext ctx, CancellationToken ct) =>
{
// 设置 SSE 必需的响应头
ctx.Response.ContentType = "text/event-stream";
ctx.Response.Headers.CacheControl = "no-cache";
ctx.Response.Headers["X-Accel-Buffering"] = "no"; // 禁用 Nginx 缓冲
// 心跳 — 保持连接活跃,检测断线
while (!ct.IsCancellationRequested)
{
var timeMessage = $"data: {{\"time\":\"{DateTime.UtcNow:HH:mm:ss}\"}}\n\n";
await ctx.Response.WriteAsync(timeMessage, ct);
await ctx.Response.Body.FlushAsync(ct); // 必须手动刷新
await Task.Delay(1000, ct);
}
});带事件类型和 ID 的端点
// SSE 端点 — 支持事件类型、消息 ID 和断点续传
app.MapGet("/sse/notifications", async (HttpContext ctx, CancellationToken ct) =>
{
ctx.Response.ContentType = "text/event-stream";
ctx.Response.Headers.CacheControl = "no-cache";
ctx.Response.Headers["X-Accel-Buffering"] = "no";
// 读取客户端的 Last-Event-ID(断点续传)
var lastEventId = ctx.Request.Headers["Last-Event-ID"].FirstOrDefault();
var startId = string.IsNullOrEmpty(lastEventId) ? 0 : int.Parse(lastEventId) + 1;
var eventId = startId;
var random = new Random();
// 模拟不同类型的事件
var eventTypes = new[] { "notification", "alert", "system", "progress" };
while (!ct.IsCancellationRequested)
{
var eventType = eventTypes[random.Next(eventTypes.Length)];
var message = eventType switch
{
"notification" => $"{{\"message\":\"新消息 #{eventId}\",\"type\":\"info\"}}",
"alert" => $"{{\"message\":\"系统告警\",\"level\":\"warning\"}}",
"system" => $"{{\"status\":\"running\",\"uptime\":{eventId * 60}}}",
"progress" => $"{{\"completed\":{Math.Min(eventId * 10, 100)},\"total\":100}}",
_ => "{}"
};
// 格式:id + event + data + 空行
var sseMessage = $"id: {eventId}\nevent: {eventType}\ndata: {message}\n\n";
await ctx.Response.WriteAsync(sseMessage, ct);
await ctx.Response.Body.FlushAsync(ct);
eventId++;
await Task.Delay(3000, ct);
}
});事件广播服务
基于 Channel 的通知服务
// 使用 Channel<T> 实现多消费者模式
// Channel 是线程安全的异步队列,非常适合 SSE 场景
public class SseEvent
{
public long Id { get; set; }
public string Type { get; set; } = "message";
public string Data { get; set; } = "";
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}
public class NotificationService
{
// 无界 Channel(生产环境中应设置容量限制)
private readonly Channel<SseEvent> _channel;
private long _eventIdCounter;
public NotificationService()
{
// 有界 Channel:防止慢消费者导致内存膨胀
var options = new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.DropOldest, // 满时丢弃最旧的消息
SingleReader = false, // 多个消费者
SingleWriter = false // 多个生产者
};
_channel = Channel.CreateBounded<SseEvent>(options);
}
/// <summary>
/// 发布事件(从任何地方调用)
/// </summary>
public async Task PublishAsync(string type, string data)
{
var eventId = Interlocked.Increment(ref _eventIdCounter);
var evt = new SseEvent { Id = eventId, Type = type, Data = data };
// 如果 Channel 满了,TryWrite 会丢弃(或等待)
if (!_channel.Writer.TryWrite(evt))
{
Console.WriteLine($"[SSE] 事件队列已满,丢弃事件 {eventId}");
}
}
/// <summary>
/// 订阅事件流(返回 IAsyncEnumerable)
/// </summary>
public async IAsyncEnumerable<SseEvent> SubscribeAsync(
[EnumeratorCancellation] CancellationToken ct)
{
await foreach (var evt in _channel.Reader.ReadAllAsync(ct))
{
yield return evt;
}
}
}
// 注册服务
builder.Services.AddSingleton<NotificationService>();SSE 端点使用通知服务
app.MapGet("/sse/stream", async (
HttpContext ctx,
NotificationService notifier,
CancellationToken ct) =>
{
ctx.Response.ContentType = "text/event-stream";
ctx.Response.Headers.CacheControl = "no-cache";
ctx.Response.Headers["X-Accel-Buffering"] = "no";
try
{
await foreach (var evt in notifier.SubscribeAsync(ct))
{
var sseMessage = $"id: {evt.Id}\nevent: {evt.Type}\ndata: {evt.Data}\n\n";
await ctx.Response.WriteAsync(sseMessage, ct);
await ctx.Response.Body.FlushAsync(ct);
}
}
catch (OperationCanceledException)
{
// 客户端断开连接,正常退出
Console.WriteLine("[SSE] 客户端断开连接");
}
});
// 从其他端点发布事件
app.MapPost("/api/notifications", async (NotificationService notifier, NotificationRequest req) =>
{
await notifier.PublishAsync(req.Type, JsonSerializer.Serialize(req.Data));
return Results.Ok(new { published = true });
});
public record NotificationRequest(string Type, object Data);连接管理与心跳
心跳保活机制
// SSE 心跳的作用:
// 1. 保持连接活跃(防火墙/代理可能超时断开空闲连接)
// 2. 检测客户端是否在线
// 3. 防止中间代理缓冲响应(某些代理会缓冲不完整的响应)
// 心跳实现方案 1:服务端定时发送心跳
app.MapGet("/sse/heartbeat", async (HttpContext ctx, CancellationToken ct) =>
{
ctx.Response.ContentType = "text/event-stream";
ctx.Response.Headers.CacheControl = "no-cache";
ctx.Response.Headers["X-Accel-Buffering"] = "no";
// 使用注释行作为心跳(浏览器会忽略以 : 开头的行)
var heartbeatTimer = new PeriodicTimer(TimeSpan.FromSeconds(15));
var eventTimer = new PeriodicTimer(TimeSpan.FromSeconds(5));
while (!ct.IsCancellationRequested)
{
// 优先发送事件
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
var heartbeatTask = heartbeatTimer.WaitForNextTickAsync(cts.Token);
var eventTask = eventTimer.WaitForNextTickAsync(cts.Token);
var completedTask = await Task.WhenAny(heartbeatTask, eventTask);
if (completedTask == eventTask)
{
// 发送实际事件
var message = $"event: update\ndata: {{\"value\":{DateTime.UtcNow.Ticks % 1000}}}\n\n";
await ctx.Response.WriteAsync(message, ct);
await ctx.Response.Body.FlushAsync(ct);
}
else
{
// 发送心跳(SSE 注释,不会触发 EventSource.onmessage)
await ctx.Response.WriteAsync(": heartbeat\n\n", ct);
await ctx.Response.Body.FlushAsync(ct);
}
}
});
// 心跳实现方案 2:带事件的心跳
// 注意:使用 SSE 注释(以 : 开头)作为心跳不会触发客户端的 onmessage
// 使用 data: 作为心跳会触发客户端的 onmessage客户端连接数监控
// 跟踪 SSE 连接数
public class SseConnectionManager
{
private int _connectionCount;
private readonly ILogger<SseConnectionManager> _logger;
public SseConnectionManager(ILogger<SseConnectionManager> logger)
{
_logger = logger;
}
public int CurrentConnections => _connectionCount;
public void OnConnect()
{
var count = Interlocked.Increment(ref _connectionCount);
_logger.LogInformation("[SSE] 新连接,当前连接数: {Count}", count);
}
public void OnDisconnect()
{
var count = Interlocked.Decrement(ref _connectionCount);
_logger.LogInformation("[SSE] 连接断开,当前连接数: {Count}", count);
}
}
builder.Services.AddSingleton<SseConnectionManager>();
// 端点中使用
app.MapGet("/sse/events", async (
HttpContext ctx,
NotificationService notifier,
SseConnectionManager connectionManager,
CancellationToken ct) =>
{
connectionManager.OnConnect();
ctx.Response.ContentType = "text/event-stream";
ctx.Response.Headers.CacheControl = "no-cache";
try
{
await foreach (var evt in notifier.SubscribeAsync(ct))
{
var sseMessage = $"id: {evt.Id}\nevent: {evt.Type}\ndata: {evt.Data}\n\n";
await ctx.Response.WriteAsync(sseMessage, ct);
await ctx.Response.Body.FlushAsync(ct);
}
}
catch (OperationCanceledException) { }
finally
{
connectionManager.OnDisconnect();
}
});
// 监控端点
app.MapGet("/sse/stats", (SseConnectionManager manager) =>
Results.Ok(new { connections = manager.CurrentConnections }));AI 流式输出(ChatGPT 风格)
SSE 实现流式响应
// 模拟 AI 流式输出 — 逐 token 发送
public class AiStreamService
{
public async IAsyncEnumerable<string> StreamCompletionAsync(
string prompt,
[EnumeratorCancellation] CancellationToken ct)
{
// 模拟 AI 逐 token 生成
var response = $"根据您的问题「{prompt}」,以下是我的回答:这是一段模拟的 AI 流式输出文本。每个 token 逐步发送到客户端,实现打字机效果。";
foreach (var token in response)
{
ct.ThrowIfCancellationRequested();
await Task.Delay(30, ct); // 模拟生成延迟
yield return token.ToString();
}
}
}
// SSE 端点 — AI 流式响应
app.MapGet("/sse/ai/stream", async (
HttpContext ctx,
[FromQuery] string prompt,
AiStreamService aiService,
CancellationToken ct) =>
{
ctx.Response.ContentType = "text/event-stream";
ctx.Response.Headers.CacheControl = "no-cache";
ctx.Response.Headers["X-Accel-Buffering"] = "no";
var tokenId = 0;
await foreach (var token in aiService.StreamCompletionAsync(prompt, ct))
{
var data = JsonSerializer.Serialize(new { token, done = false });
var message = $"id: {tokenId++}\nevent: token\ndata: {data}\n\n";
await ctx.Response.WriteAsync(message, ct);
await ctx.Response.Body.FlushAsync(ct);
}
// 发送完成事件
var doneMessage = $"id: {tokenId}\nevent: done\ndata: {{\"done\":true}}\n\n";
await ctx.Response.WriteAsync(doneMessage, ct);
await ctx.Response.Body.FlushAsync(ct);
});
// 实际接入 OpenAI API 的 SSE 流式转发
app.MapPost("/sse/ai/chat", async (
HttpContext ctx,
ChatRequest request,
IOpenAiService openAi,
CancellationToken ct) =>
{
ctx.Response.ContentType = "text/event-stream";
ctx.Response.Headers.CacheControl = "no-cache";
ctx.Response.Headers["X-Accel-Buffering"] = "no";
// 直接转发 OpenAI 的 SSE 流
await foreach (var chunk in openAi.StreamChatAsync(request, ct))
{
var message = $"data: {JsonSerializer.Serialize(chunk)}\n\n";
await ctx.Response.WriteAsync(message, ct);
await ctx.Response.Body.FlushAsync(ct);
}
await ctx.Response.WriteAsync("data: [DONE]\n\n", ct);
await ctx.Response.Body.FlushAsync(ct);
});客户端实现
// 前端 SSE 客户端
const eventSource = new EventSource('/sse/notifications');
// 监听所有消息
eventSource.onmessage = (event) => {
console.log('收到消息:', event.data);
console.log('事件 ID:', event.lastEventId);
};
// 监听特定事件类型
eventSource.addEventListener('notification', (event) => {
const data = JSON.parse(event.data);
console.log('通知:', data.message);
});
eventSource.addEventListener('alert', (event) => {
const data = JSON.parse(event.data);
alert(`告警: ${data.message}`);
});
eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
updateProgressBar(data.completed, data.total);
});
// AI 流式输出客户端
function streamAiResponse(prompt) {
const source = new EventSource(`/sse/ai/stream?prompt=${encodeURIComponent(prompt)}`);
let fullResponse = '';
source.addEventListener('token', (event) => {
const { token } = JSON.parse(event.data);
fullResponse += token;
document.getElementById('output').innerText = fullResponse;
});
source.addEventListener('done', () => {
source.close();
});
source.onerror = () => {
source.close();
};
}
// 错误处理与重连
eventSource.onerror = (event) => {
if (event.readyState === EventSource.CLOSED) {
console.log('连接已关闭');
} else if (event.readyState === EventSource.CONNECTING) {
console.log('正在重连...');
}
};
// 手动关闭连接
// eventSource.close();
// 断线重连(浏览器自动)
// EventSource 会在连接断开后自动重连
// 重连时会发送 Last-Event-ID 请求头
// 服务端可以读取该头,从上次断开的位置继续发送反向代理配置
Nginx 配置
# SSE 需要禁用缓冲,否则 Nginx 会等到响应完成才转发
# 方式 1:在 location 级别禁用缓冲
location /sse/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_set_header X-Accel-Buffering no; # 禁用 Nginx 缓冲
proxy_buffering off; # 禁用缓冲
proxy_cache off; # 禁用缓存
proxy_read_timeout 86400s; # 超时时间设为 24 小时
chunked_transfer_encoding on; # 启用分块传输
}
# 方式 2:通过 X-Accel-Buffering 响应头控制
# 服务端在 SSE 响应中添加:
# ctx.Response.Headers["X-Accel-Buffering"] = "no";
# Apache 配置
# ProxyPass /sse/ http://backend/sse/
# ProxyPassReverse /sse/ http://backend/sse/
# SetEnv nokeepalive
# ProxyHTTPEngine on性能考量
连接数与资源
// SSE 连接数限制:
// HTTP/1.1: 浏览器限制同一域名 6 个并发连接
// → 其中一些被普通 HTTP 请求占用
// → 实际可用的 SSE 连接可能只有 4-5 个
// HTTP/2: 无限制(多路复用)
// → SSE 在 HTTP/2 上可以建立更多连接
// 服务端资源:
// 每个 SSE 连接占用:
// - 1 个 TCP 连接
// - 1 个线程(异步模式下不占用线程池线程)
// - 少量内存(HttpContext + Response)
// 10,000 个 SSE 连接的估算:
// 内存: ~500MB(每个连接 ~50KB)
// CPU: 极低(大部分时间在等待)
// 线程: ~0(异步模式不占用线程)
// 优化建议:
// 1. 使用 HTTP/2 减少连接数限制
// 2. 设置合理的超时和心跳间隔
// 3. 监控连接数,设置上限
// 4. 使用 Channel 而非阻塞队列SSE 端点的限流
// 限制 SSE 连接数
public class SseRateLimitMiddleware
{
private const int MaxConnections = 1000;
private static int _currentConnections;
private readonly RequestDelegate _next;
public SseRateLimitMiddleware(RequestDelegate next) => _next = next;
public async Task InvokeAsync(HttpContext context)
{
if (context.Request.Path.StartsWithSegments("/sse"))
{
var current = Interlocked.Increment(ref _currentConnections);
if (current > MaxConnections)
{
Interlocked.Decrement(ref _currentConnections);
context.Response.StatusCode = StatusCodes.Status429TooManyRequests;
await context.Response.WriteAsJsonAsync(new
{
error = "SSE 连接数已达上限",
maxConnections = MaxConnections
});
return;
}
try
{
await _next(context);
}
finally
{
Interlocked.Decrement(ref _currentConnections);
}
}
else
{
await _next(context);
}
}
}常见问题
排错清单
// 问题 1:SSE 连接立即断开
// 原因:响应头未设置 text/event-stream
// 解决:ctx.Response.ContentType = "text/event-stream"
// 问题 2:客户端收不到消息
// 原因:没有手动 Flush
// 解决:await ctx.Response.Body.FlushAsync(ct)
// 注意:SSE 依赖手动刷新,中间件缓冲可能导致消息延迟
// 问题 3:Nginx 代理下 SSE 不工作
// 原因:Nginx 默认缓冲响应
// 解决:添加 X-Accel-Buffering: no 响应头或配置 proxy_buffering off
// 问题 4:连接超时断开
// 原因:防火墙/代理/负载均衡器超时
// 解决:添加心跳机制,每 15-30 秒发送心跳
// 问题 5:EventSource 不支持 POST 请求
// 原因:EventSource API 只支持 GET 请求
// 解决:客户端到服务端的消息通过普通 HTTP POST 发送,
// 服务端到客户端的推送通过 SSE
// 问题 6:连接被 Kestrel 超时断开
// 解决:配置 Kestrel 的 KeepAlive 超时
// builder.WebHost.ConfigureKestrel(o => o.Limits.KeepAliveTimeout = TimeSpan.FromMinutes(30));优点
缺点
总结
SSE 基于 HTTP 长连接实现服务端单向推送,协议格式为 text/event-stream,每条消息以空行分隔,支持 id、event、data 等字段。ASP.NET Core 中通过设置 ContentType = "text/event-stream" + Body.FlushAsync() 实现 SSE 端点。Channel<T> 是实现多消费者广播的最佳选择,配合 IAsyncEnumerable 可以优雅地消费事件流。心跳机制(SSE 注释 : heartbeat)保持连接活跃并检测断线。Nginx 反向代理需要设置 X-Accel-Buffering: no 禁用缓冲。适合通知推送、实时日志、AI 流式输出等单向推送场景,需要双向通信时选择 WebSocket。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
- 框架能力的真正重点是它在请求链路中的位置和对上下游的影响。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
- 画清执行顺序、入参来源、失败返回和日志记录点。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
- 知道 API 名称,却不知道它应该放在请求链路的哪个位置。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
- 继续补齐协议选型、网关治理、端点可观测性和契约演进策略。
适用场景
- 当你准备把《Server-Sent Events 推送》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《Server-Sent Events 推送》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《Server-Sent Events 推送》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《Server-Sent Events 推送》最大的收益和代价分别是什么?
