ASP.NET Core 实时推送方案
大约 13 分钟约 4013 字
ASP.NET Core 实时推送方案
简介
实时推送是现代 Web 应用的核心能力之一。从即时通讯、协作编辑到实时数据看板、在线竞价,用户期望数据变化时立即呈现在界面上,而无需手动刷新。ASP.NET Core 通过 SignalR 框架提供了强大且易用的实时通信抽象层,底层自动选择 WebSocket、Server-Sent Events(SSE)或 Long Polling 三种传输方式中最合适的一种。
本文将深入讲解 ASP.NET Core 环境下各种实时推送方案的原理、实现、扩展和生产实践。
特点
- 多传输协议自动协商:SignalR 自动选择最佳传输方式,开发者无需关心底层差异
- Hub 抽象模型:通过 Hub 模型统一管理连接、分组、消息推送
- 水平扩展支持:通过 Redis/Service Bus 底板实现多服务器消息广播
- 强类型客户端:支持通过接口定义实现类型安全的客户端代理
- 内置重连机制:客户端断线自动重连,服务端可感知连接状态变化
核心概念与实现
一、SignalR 基础 Hub 开发
// ============ 定义强类型客户端接口 ============
public interface INotificationClient
{
Task ReceiveNotification(NotificationMessage message);
Task ReceiveProgress(ProgressUpdate progress);
Task ReceiveAlert(AlertMessage alert);
}
// ============ Hub 实现 ============
[Authorize]
public class NotificationHub : Hub<INotificationClient>
{
private readonly ILogger<NotificationHub> _logger;
private readonly IConnectionMapping _connectionMapping;
public NotificationHub(ILogger<NotificationHub> logger, IConnectionMapping connectionMapping)
{
_logger = logger;
_connectionMapping = connectionMapping;
}
// 客户端连接时
public override async Task OnConnectedAsync()
{
var userId = Context.UserIdentifier!;
var connectionId = Context.ConnectionId;
// 记录用户-连接映射
_connectionMapping.Add(userId, connectionId);
// 将用户加入其所属的组
var userGroups = await GetUserGroupsAsync(userId);
foreach (var group in userGroups)
{
await Groups.AddToGroupAsync(connectionId, group);
}
_logger.LogInformation("用户 {UserId} 已连接,ConnectionId={ConnId}", userId, connectionId);
await base.OnConnectedAsync();
}
// 客户端断开时
public override async Task OnDisconnectedAsync(Exception? exception)
{
var userId = Context.UserIdentifier!;
var connectionId = Context.ConnectionId;
_connectionMapping.Remove(userId, connectionId);
if (exception != null)
{
_logger.LogWarning(exception, "用户 {UserId} 异常断开", userId);
}
await base.OnDisconnectedAsync(exception);
}
// 客户端可调用的方法(双向通信)
public async Task JoinRoom(string roomId)
{
await Groups.AddToGroupAsync(Context.ConnectionId, $"room:{roomId}");
_logger.LogInformation("用户 {UserId} 加入房间 {RoomId}", Context.UserIdentifier, roomId);
}
public async Task LeaveRoom(string roomId)
{
await Groups.RemoveFromGroupAsync(Context.ConnectionId, $"room:{roomId}");
}
private async Task<List<string>> GetUserGroupsAsync(string userId)
{
// 从数据库或缓存获取用户所属的组
return new List<string> { $"team:{userId}", "broadcast" };
}
}// ============ 连接映射管理(支持一个用户多个连接) ============
public interface IConnectionMapping
{
void Add(string userId, string connectionId);
void Remove(string userId, string connectionId);
IReadOnlyList<string> GetConnections(string userId);
int Count { get; }
}
public class ConnectionMapping : IConnectionMapping
{
private readonly ConcurrentDictionary<string, HashSet<string>> _connections = new();
private readonly object _lock = new();
public void Add(string userId, string connectionId)
{
lock (_lock)
{
if (!_connections.TryGetValue(userId, out var connections))
{
connections = new HashSet<string>();
_connections[userId] = connections;
}
connections.Add(connectionId);
}
}
public void Remove(string userId, string connectionId)
{
lock (_lock)
{
if (_connections.TryGetValue(userId, out var connections))
{
connections.Remove(connectionId);
if (connections.Count == 0)
_connections.TryRemove(userId, out _);
}
}
}
public IReadOnlyList<string> GetConnections(string userId)
{
if (_connections.TryGetValue(userId, out var connections))
return connections.ToList();
return Array.Empty<string>();
}
public int Count => _connections.Values.Sum(c => c.Count);
}二、SignalR 注册与配置
// Program.cs
var builder = WebApplication.CreateBuilder(args);
// 注册 SignalR
builder.Services.AddSignalR(options =>
{
options.EnableDetailedErrors = true; // 开发环境开启详细错误
options.KeepAliveInterval = TimeSpan.FromSeconds(15); // 心跳间隔
options.ClientTimeoutInterval = TimeSpan.FromSeconds(30); // 客户端超时
options.HandshakeTimeout = TimeSpan.FromSeconds(15); // 握手超时
options.MaximumReceiveMessageSize = 64 * 1024; // 最大消息 64KB
options.StreamBufferCapacity = 10; // 流缓冲区大小
});
// 注册连接映射
builder.Services.AddSingleton<IConnectionMapping, ConnectionMapping>();
var app = builder.Build();
// 映射 Hub 端点
app.MapHub<NotificationHub>("/hubs/notifications", options =>
{
// 允许的传输方式(默认全部启用)
options.Transports = Microsoft.AspNetCore.Http.Connections.HttpTransports.All;
});
app.Run();三、分组推送与定向推送
// ============ 消息推送服务 ============
public class PushService
{
private readonly IHubContext<NotificationHub, INotificationClient> _hubContext;
private readonly IConnectionMapping _connectionMapping;
private readonly ILogger<PushService> _logger;
public PushService(
IHubContext<NotificationHub, INotificationClient> hubContext,
IConnectionMapping connectionMapping,
ILogger<PushService> logger)
{
_hubContext = hubContext;
_connectionMapping = connectionMapping;
_logger = logger;
}
/// <summary>
/// 全体广播
/// </summary>
public async Task BroadcastAsync(NotificationMessage message)
{
_logger.LogInformation("广播消息: {Title}", message.Title);
await _hubContext.Clients.All.ReceiveNotification(message);
}
/// <summary>
/// 推送给指定用户
/// </summary>
public async Task SendToUserAsync(string userId, NotificationMessage message)
{
// SignalR 内置支持按 UserIdentifier 推送
await _hubContext.Clients.User(userId).ReceiveNotification(message);
_logger.LogInformation("推送消息给用户 {UserId}", userId);
}
/// <summary>
/// 推送给指定组
/// </summary>
public async Task SendToGroupAsync(string groupName, NotificationMessage message)
{
await _hubContext.Clients.Group(groupName).ReceiveNotification(message);
_logger.LogInformation("推送消息给组 {GroupName}", groupName);
}
/// <summary>
/// 推送给多个用户
/// </summary>
public async Task SendToUsersAsync(IEnumerable<string> userIds, NotificationMessage message)
{
await _hubContext.Clients.Users(userIds).ReceiveNotification(message);
}
/// <summary>
/// 除指定用户外的组内广播
/// </summary>
public async Task SendToGroupExceptAsync(string groupName, string excludedUserId,
NotificationMessage message)
{
var excludedConnections = _connectionMapping.GetConnections(excludedUserId);
await _hubContext.Clients.GroupExcept(groupName, excludedConnections)
.ReceiveNotification(message);
}
/// <summary>
/// 进度推送(流式)
/// </summary>
public async Task SendProgressAsync(string userId, string taskId, int percentage)
{
await _hubContext.Clients.User(userId).ReceiveProgress(new ProgressUpdate
{
TaskId = taskId,
Percentage = percentage,
Timestamp = DateTime.UtcNow
});
}
}四、WebSocket 水平扩展 — Redis 底板
// ============ 单机部署无需底板,多实例必须使用底板 ============
// 安装 NuGet: Microsoft.AspNetCore.SignalR.StackExchangeRedis
// Program.cs
builder.Services.AddSignalR()
.AddStackExchangeRedis(builder.Configuration.GetConnectionString("Redis")!, options =>
{
options.Configuration.ChannelPrefix = "myapp:signalr"; // Redis Key 前缀
options.Configuration.AbortOnConnectFail = false; // 连接失败时不中止
options.ConnectionFactory = async (writer, _) =>
{
var config = ConfigurationOptions.Parse(
builder.Configuration.GetConnectionString("Redis")!);
config.AbortOnConnectFail = false;
config.ConnectRetry = 3;
config.ReconnectRetryPolicy = new ExponentialRetry(5000);
return await ConnectionMultiplexer.ConnectAsync(config, writer);
};
});// ============ Redis 底板 + 分组优化 ============
// 对于大量分组的场景,可以使用 Redis Streams 替代 Pub/Sub
// 安装 NuGet: Microsoft.AspNetCore.SignalR.RedisStreams
builder.Services.AddSignalR()
.AddRedisStreams(builder.Configuration.GetConnectionString("Redis")!, options =>
{
options.ChannelPrefix = "signalr-streams";
options.MaxBatchSize = 100;
});Redis 底板原理:当服务器 A 上的客户端发送消息时,消息通过 Hub 广播到 Redis Pub/Sub 频道。所有订阅了该频道的服务器(B、C、D...)都会收到消息,然后各自推送给本机上的目标客户端。
五、Server-Sent Events (SSE) 方案
在某些简单场景下,SSE 比 SignalR 更轻量。
// ============ ASP.NET Core 原生 SSE 端点 ============
[ApiController]
[Route("api/events")]
public class SseController : ControllerBase
{
private readonly IEventService _eventService;
public SseController(IEventService eventService)
{
_eventService = eventService;
}
[HttpGet("stream")]
public async Task Stream(CancellationToken ct)
{
Response.ContentType = "text/event-stream";
Response.Headers.CacheControl = "no-cache";
Response.Headers.Connection = "keep-alive";
// 订阅事件源
var eventQueue = _eventService.Subscribe(User.Identity!.Name!);
try
{
await foreach (var evt in eventQueue.WithCancellation(ct))
{
// SSE 格式:data: {json}\n\n
var json = JsonSerializer.Serialize(evt);
var sseMessage = $"event: {evt.Type}\ndata: {json}\nid: {evt.Id}\n\n";
await Response.WriteAsync(sseMessage, ct);
await Response.Body.FlushAsync(ct);
}
}
catch (OperationCanceledException)
{
// 客户端断开连接
}
finally
{
_eventService.Unsubscribe(User.Identity!.Name!);
}
}
}
// ============ SSE 事件模型 ============
public class SseEvent
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string Type { get; set; } = "message";
public object? Data { get; set; }
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}
// ============ 事件服务 ============
public interface IEventService
{
IAsyncEnumerable<SseEvent> Subscribe(string userId);
void Unsubscribe(string userId);
Task PublishAsync(string userId, SseEvent evt);
}
public class EventService : IEventService
{
private readonly ConcurrentDictionary<string, Channel<SseEvent>> _channels = new();
public IAsyncEnumerable<SseEvent> Subscribe(string userId)
{
var channel = Channel.CreateUnbounded<SseEvent>(new UnboundedChannelOptions
{
SingleReader = true,
SingleWriter = false
});
_channels[userId] = channel;
return channel.Reader.ReadAllAsync();
}
public void Unsubscribe(string userId)
{
if (_channels.TryRemove(userId, out var channel))
{
channel.Writer.TryComplete();
}
}
public async Task PublishAsync(string userId, SseEvent evt)
{
if (_channels.TryGetValue(userId, out var channel))
{
await channel.Writer.WriteAsync(evt);
}
}
}六、传输方式对比
| 特性 | WebSocket | Server-Sent Events | Long Polling |
|---|---|---|---|
| 通信方向 | 双向 | 服务端 -> 客户端 | 双向(模拟) |
| 浏览器支持 | IE10+ | IE 不支持 | 全部 |
| 代理/防火墙 | 部分不支持 | 良好 | 良好 |
| 连接开销 | 低(长连接) | 低(长连接) | 高(频繁重建) |
| 最大并发连接 | 受浏览器限制 | HTTP/1.1 下 6 个 | 无额外限制 |
| 二进制数据 | 支持 | 不支持 | 支持 |
| 断线重连 | 需自行实现 | 内置 | N/A |
| 适用场景 | 实时双向通信 | 单向推送、通知 | 兼容性兜底 |
// ============ 强制使用特定传输方式 ============
app.MapHub<NotificationHub>("/hubs/notifications", options =>
{
// 仅允许 WebSocket
options.Transports = HttpTransportType.WebSockets;
// WebSocket 配置
options.WebSocketOptions = webSocketOptions =>
{
webSocketOptions.KeepAliveInterval = TimeSpan.FromSeconds(10);
};
});七、SignalR 认证与授权
// ============ JWT 认证配置 ============
builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
.AddJwtBearer(options =>
{
options.TokenValidationParameters = new TokenValidationParameters
{
ValidateIssuer = true,
ValidateAudience = true,
ValidateLifetime = true,
ValidateIssuerSigningKey = true,
ValidIssuer = builder.Configuration["Jwt:Issuer"],
ValidAudience = builder.Configuration["Jwt:Audience"],
IssuerSigningKey = new SymmetricSecurityKey(
Encoding.UTF8.GetBytes(builder.Configuration["Jwt:Key"]!))
};
// 处理 WebSocket 连接的 JWT(通过 query string 传递)
options.Events = new JwtBearerEvents
{
OnMessageReceived = context =>
{
var accessToken = context.Request.Query["access_token"];
var path = context.HttpContext.Request.Path;
if (!string.IsNullOrEmpty(accessToken) &&
path.StartsWithSegments("/hubs/notifications"))
{
context.Token = accessToken;
}
return Task.CompletedTask;
}
};
});
// ============ Hub 授权策略 ============
[Authorize]
public class NotificationHub : Hub<INotificationClient>
{
// 仅管理员可调用
[Authorize(Policy = "AdminOnly")]
public async Task BroadcastAlert(AlertMessage alert)
{
await Clients.All.ReceiveAlert(alert);
}
// 基于角色的组访问控制
public override async Task OnConnectedAsync()
{
if (Context.User!.IsInRole("Admin"))
{
await Groups.AddToGroupAsync(Context.ConnectionId, "admins");
}
await base.OnConnectedAsync();
}
}八、客户端重连策略
// ============ JavaScript 客户端重连配置 ============
// 在前端 JavaScript 中
/*
const connection = new signalR.HubConnectionBuilder()
.withUrl("/hubs/notifications", {
accessTokenFactory: () => localStorage.getItem('access_token')
})
.withAutomaticReconnect({
// 自定义重连间隔:0s, 2s, 5s, 10s, 30s...
nextRetryDelayInMilliseconds: (retryContext) => {
const delays = [0, 2000, 5000, 10000, 30000];
const index = Math.min(retryContext.previousRetryCount, delays.length - 1);
return delays[index];
}
})
.configureLogging(signalR.LogLevel.Information)
.build();
// 监听重连事件
connection.onreconnecting(error => {
console.warn('正在重连...', error);
// 显示 "连接中断" 提示
showConnectionStatus('reconnecting');
});
connection.onreconnected(connectionId => {
console.log('已重连, ConnectionId:', connectionId);
showConnectionStatus('connected');
// 重新订阅可能需要的数据
resubscribeData();
});
connection.onclose(error => {
console.error('连接已关闭', error);
showConnectionStatus('disconnected');
// 手动重连(超过自动重连上限时)
if (error) {
setTimeout(() => startConnection(), 5000);
}
});
*/// ============ .NET 客户端重连配置 ============
public class SignalRClientService : BackgroundService
{
private readonly HubConnection _connection;
private readonly ILogger<SignalRClientService> _logger;
public SignalRClientService(ILogger<SignalRClientService> logger, IConfiguration config)
{
_logger = logger;
_connection = new HubConnectionBuilder()
.WithUrl($"{config["ServiceA:BaseUrl"]}/hubs/notifications", options =>
{
options.AccessTokenProvider = () => GetAccessTokenAsync();
})
.WithAutomaticReconnect(new[] {
TimeSpan.Zero,
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(10),
TimeSpan.FromSeconds(30)
})
.AddMessagePackProtocol() // 使用 MessagePack 提升序列化性能
.Build();
// 注册客户端方法
_connection.On<NotificationMessage>("ReceiveNotification", msg =>
{
_logger.LogInformation("收到通知: {Title}", msg.Title);
});
_connection.Reconnecting += error =>
{
_logger.LogWarning(error, "SignalR 连接正在重连...");
return Task.CompletedTask;
};
_connection.Reconnected += connectionId =>
{
_logger.LogInformation("SignalR 已重连, ConnectionId={Id}", connectionId);
return Task.CompletedTask;
};
_connection.Closed += error =>
{
_logger.LogError(error, "SignalR 连接已关闭");
return Task.CompletedTask;
};
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await _connection.StartAsync(stoppingToken);
_logger.LogInformation("SignalR 已连接");
return; // 连接成功,退出循环
}
catch (Exception ex)
{
_logger.LogError(ex, "SignalR 连接失败,5秒后重试");
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
}
private async Task<string?> GetAccessTokenAsync()
{
// 获取或刷新 Token
return await Task.FromResult("your-jwt-token");
}
}九、生产环境扩展
// ============ 连接数限制与监控 ============
public class SignalRMetricsService : BackgroundService
{
private readonly IConnectionMapping _connectionMapping;
private readonly ILogger<SignalRMetricsService> _logger;
private readonly IMetricsCollector _metrics;
public SignalRMetricsService(
IConnectionMapping connectionMapping,
ILogger<SignalRMetricsService> logger,
IMetricsCollector metrics)
{
_connectionMapping = connectionMapping;
_logger = logger;
_metrics = metrics;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var connectionCount = _connectionMapping.Count;
// 记录指标
_metrics.RecordGauge("signalr.active_connections", connectionCount);
// 告警:连接数超过阈值
if (connectionCount > 10_000)
{
_logger.LogWarning("SignalR 活跃连接数超过阈值: {Count}", connectionCount);
}
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
}
}
}
// ============ 连接限流中间件 ============
public class ConnectionLimitMiddleware
{
private readonly RequestDelegate _next;
private readonly int _maxConnections;
private int _currentConnections;
public ConnectionLimitMiddleware(RequestDelegate next, IConfiguration config)
{
_next = next;
_maxConnections = config.GetValue("SignalR:MaxConnections", 5000);
}
public async Task InvokeAsync(HttpContext context)
{
if (context.Request.Path.StartsWithSegments("/hubs/"))
{
if (Interlocked.Increment(ref _currentConnections) > _maxConnections)
{
Interlocked.Decrement(ref _currentConnections);
context.Response.StatusCode = 503;
await context.Response.WriteAsync("连接数已满,请稍后再试");
return;
}
try
{
await _next(context);
}
finally
{
Interlocked.Decrement(ref _currentConnections);
}
}
else
{
await _next(context);
}
}
}// ============ 消息序列化优化 ============
// 使用 MessagePack 替代 JSON 以减少带宽
// 安装 NuGet: Microsoft.AspNetCore.SignalR.Protocols.MessagePack
builder.Services.AddSignalR(options =>
{
options.MaximumReceiveMessageSize = null; // 取消大小限制(流式场景)
})
.AddMessagePackProtocol(options =>
{
// 自定义 MessagePack 序列化选项
options.SerializerOptions = MessagePackSerializerOptions.Standard
.WithCompression(MessagePackCompression.Lz4Block);
});十、消息广播的高级模式
// ============ 基于 Redis Pub/Sub 的跨服务广播 ============
public class RedisBroadcastService : BackgroundService
{
private readonly IConnectionMultiplexer _redis;
private readonly IHubContext<NotificationHub, INotificationClient> _hubContext;
private readonly ILogger<RedisBroadcastService> _logger;
public RedisBroadcastService(
IConnectionMultiplexer redis,
IHubContext<NotificationHub, INotificationClient> hubContext,
ILogger<RedisBroadcastService> logger)
{
_redis = redis;
_hubContext = hubContext;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var subscriber = _redis.GetSubscriber();
await subscriber.SubscribeAsync("notifications:broadcast", (channel, message) =>
{
try
{
var notification = JsonSerializer.Deserialize<NotificationMessage>(message!);
if (notification != null)
{
// 根据 TargetType 决定推送范围
switch (notification.TargetType)
{
case "all":
_hubContext.Clients.All.ReceiveNotification(notification).GetAwaiter().GetResult();
break;
case "user":
_hubContext.Clients.User(notification.TargetId!)
.ReceiveNotification(notification).GetAwaiter().GetResult();
break;
case "group":
_hubContext.Clients.Group(notification.TargetId!)
.ReceiveNotification(notification).GetAwaiter().GetResult();
break;
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "处理广播消息失败");
}
});
}
/// <summary>
/// 从其他服务发布广播消息
/// </summary>
public async Task PublishAsync(NotificationMessage message)
{
var subscriber = _redis.GetSubscriber();
var json = JsonSerializer.Serialize(message);
await subscriber.PublishAsync("notifications:broadcast", json);
}
}// ============ 流式推送(大文件/数据流场景) ============
public class StreamHub : Hub
{
// 客户端向服务端上传流
public async Task UploadStream(IAsyncEnumerable<string> stream)
{
await foreach (var chunk in stream)
{
// 处理每个数据块
ProcessChunk(chunk);
}
await Clients.Caller.SendAsync("UploadComplete");
}
// 服务端向客户端推送流
public IAsyncEnumerable<DataPoint> SubscribeLiveData(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
return GenerateDataStream(cancellationToken);
}
private async IAsyncEnumerable<DataPoint> GenerateDataStream(
[EnumeratorCancellation] CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
yield return new DataPoint
{
Timestamp = DateTime.UtcNow,
Value = Random.Shared.NextDouble() * 100
};
await Task.Delay(1000, ct);
}
}
}优点
- 开发效率高:SignalR 封装了传输协商、心跳、重连等复杂逻辑
- 灵活性强:支持 WebSocket / SSE / Long Polling 三种传输方式自动降级
- 扩展性好:通过底板机制支持多实例水平扩展
- 类型安全:强类型 Hub 接口提供编译时检查
- 生态完善:提供 .NET / JavaScript / Java / Swift 多平台客户端
缺点
- 内存消耗:每个 WebSocket 连接占用约 10-20KB 内存,万级连接需要注意
- 底板瓶颈:Redis 底板在高消息量场景下可能成为瓶颈,需要考虑分片
- 调试困难:WebSocket 连接问题不如 HTTP 请求直观
- 运维复杂:有状态连接在滚动部署时需要考虑连接迁移
性能注意事项
- 消息大小:单条消息建议不超过 32KB,大消息应拆分或使用流式传输
- 序列化格式:MessagePack 比 JSON 节省约 30-50% 带宽
- 连接数规划:单台服务器通常支持 1-5 万 WebSocket 连接(取决于 CPU 和内存)
- Redis 底板优化:使用 Redis Cluster 分散 SignalR 频道的负载
- 反向代理配置:Nginx 需要配置
proxy_set_header Upgrade和proxy_http_version 1.1
# Nginx WebSocket 配置
location /hubs/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_cache off;
proxy_read_timeout 86400s; # WebSocket 长连接超时
proxy_send_timeout 86400s;
}总结
SignalR 是 ASP.NET Core 生态中最成熟的实时推送方案,它抽象了传输协议的复杂性,让开发者可以专注于业务逻辑。在中小规模场景下,单实例 SignalR 即可满足需求;在需要水平扩展的生产环境中,配合 Redis 底板可以实现多实例间的消息同步。对于简单的单向推送场景,SSE 是一个更轻量的替代方案。选择哪种方案取决于业务需求、技术栈和运维能力。
关键知识点
| 知识点 | 要点 |
|---|---|
| Hub 模型 | 服务端逻辑的核心抽象,管理连接和消息 |
| 强类型客户端 | 通过接口定义实现编译时类型安全 |
| 连接映射 | 一个用户可能有多个连接(多标签页) |
| Redis 底板 | 通过 Pub/Sub 实现跨实例消息广播 |
| SSE | 轻量级单向推送,适合通知类场景 |
| 重连策略 | 指数退避 + 自定义重连间隔 |
| 流式传输 | IAsyncEnumerable 实现数据流 |
| MessagePack | 二进制序列化,减少带宽消耗 |
常见误区
误区:SignalR 等于 WebSocket
- 事实:SignalR 是上层抽象,WebSocket 只是三种传输方式之一
误区:Hub 方法可以做重计算
- 事实:Hub 方法应尽快返回,耗时操作应放到后台服务中
误区:Redis 底板能无限扩展
- 事实:Redis Pub/Sub 有带宽瓶颈,超高消息量需要考虑分片或替换方案
误区:SSE 完全可替代 WebSocket
- 事实:SSE 是单向的,且不支持二进制数据
误区:WebSocket 连接不需要心跳
- 事实:代理和防火墙可能静默关闭空闲连接,心跳是必须的
进阶路线
- Azure SignalR Service:微软提供的全托管 SignalR 服务,自动处理扩展和底板
- gRPC-Web:适合 .NET 到 .NET 的实时通信,基于 HTTP/2
- GraphQL Subscription:基于 WebSocket 的 GraphQL 实时数据订阅
- NATS / Dapr Pub/Sub:云原生的消息广播方案
- HTTP/3 QUIC:下一代传输协议,天生支持长连接和多路复用
适用场景
| 场景 | 推荐方案 |
|---|---|
| 即时通讯 | SignalR + Redis 底板 |
| 实时数据看板 | SSE 或 SignalR |
| 协作编辑 | SignalR + OT/CRDT 算法 |
| 在线竞价 | SignalR + 低延迟优化 |
| 进度推送 | SSE(单向即可) |
| 日志流 | SSE + EventSource |
| 游戏 | WebSocket 原生 + 二进制协议 |
落地建议
- 第一步:基础 Hub。建立 Hub 项目结构,实现基本的连接管理和消息推送
- 第二步:认证集成。将 JWT 认证集成到 WebSocket 连接中
- 第三步:连接映射。实现用户-连接映射,支持定向推送
- 第四步:重连策略。前后端都配置合理的重连机制
- 第五步:水平扩展。部署 Redis 底板,验证多实例消息同步
- 第六步:监控告警。建立连接数、消息量的监控仪表盘
- 第七步:性能优化。引入 MessagePack、连接限流等优化措施
排错清单
复盘问题
- 当前系统的实时推送延迟是多少?从事件发生到客户端接收的端到端延迟?
- WebSocket 连接的稳定性如何?重连率是多少?
- 当单台服务器需要承载超过 1 万连接时,我们的架构能支撑吗?
- Redis 底板的消息吞吐量是否足够?高峰期的延迟是多少?
- 在滚动部署时,已有的 WebSocket 连接如何处理?
