SignalR 实时通信
大约 11 分钟约 3400 字
SignalR 实时通信
简介
SignalR 是 ASP.NET Core 官方提供的实时通信框架,用于在服务端和客户端之间建立持续连接并主动推送消息。相比自己直接处理原生 WebSocket,SignalR 提供了更完整的 Hub 模型、自动协商传输方式、分组推送、用户定向推送和横向扩展能力,非常适合聊天、通知中心、在线协作、监控看板和设备状态推送场景。
SignalR 的传输方式
传输协商优先级:
1. WebSocket -- 全双工,最低延迟,最优性能
2. Server-Sent Events (SSE) -- 服务端推送,单向,兼容性好
3. Long Polling -- 兼容性最好,性能最差,作为兜底
协商过程:
客户端 -> GET /negotiate -> 服务端返回可用传输方式
客户端 -> 尝试 WebSocket -> 成功则使用
客户端 -> 失败则尝试 SSE -> 成功则使用
客户端 -> 失败则使用 Long Polling
不同传输方式对比:
| 传输方式 | 方向 | 延迟 | 兼容性 | 适用 |
|---------|------|------|--------|------|
| WebSocket | 双向 | 极低 | 现代浏览器 | 聊天、协作 |
| SSE | 服务端->客户端 | 低 | 大部分浏览器 | 通知、状态推送 |
| Long Polling | 请求-响应 | 高 | 所有浏览器 | 兼容性兜底 |SignalR 架构
客户端 服务端 后端
| | |
|--- 连接 /negotiate ----->| |
|--- WebSocket 升级 ------->| |
| | |
|--- Invoke("SendMessage")>| |
| |-- 业务处理 |
| |--- 通知服务 ----------->|
| |<--- 确认 ---------------|
|<-- SendAsync("Receive")--| |
| | |
|--- 断开 ---------------->| |特点
实现
基础接入与 Hub 定义
// ============================================
// Program.cs — SignalR 基础配置
// ============================================
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSignalR(options =>
{
// 开发环境启用详细错误
options.EnableDetailedErrors = builder.Environment.IsDevelopment();
// 心跳间隔(服务端每 15 秒发送一次 Ping)
options.KeepAliveInterval = TimeSpan.FromSeconds(15);
// 客户端超时(客户端 30 秒内未收到 Ping 则认为断开)
options.ClientTimeoutInterval = TimeSpan.FromSeconds(30);
// 最大接收消息大小(默认 32KB,可根据需求调整)
options.MaximumReceiveMessageSize = 64 * 1024; // 64KB
// 消息缓冲区大小
options.StreamBufferCapacity = 10;
// 握手超时
options.HandshakeTimeout = TimeSpan.FromSeconds(15);
});
// CORS 配置(SignalR 需要 AllowCredentials)
builder.Services.AddCors(options =>
{
options.AddPolicy("SignalRPolicy", policy =>
{
policy.WithOrigins("http://localhost:5173", "https://app.example.com")
.AllowAnyHeader()
.AllowAnyMethod()
.AllowCredentials(); // 必须启用
});
});
var app = builder.Build();
app.UseCors("SignalRPolicy");
app.MapHub<ChatHub>("/hubs/chat");
app.MapHub<NotificationHub>("/hubs/notifications");
app.Run();// ============================================
// ChatHub — 完整的聊天 Hub 实现
// ============================================
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.Authorization;
[Authorize]
public class ChatHub : Hub
{
private readonly ILogger<ChatHub> _logger;
private readonly IConnectionManager _connectionManager;
private readonly IChatService _chatService;
public ChatHub(
ILogger<ChatHub> logger,
IConnectionManager connectionManager,
IChatService chatService)
{
_logger = logger;
_connectionManager = connectionManager;
_chatService = chatService;
}
/// <summary>
/// 发送消息到房间
/// </summary>
public async Task SendMessage(string room, string content)
{
var user = Context.User?.Identity?.Name ?? "anonymous";
var userId = Context.User?.FindFirst(ClaimTypes.NameIdentifier)?.Value;
if (string.IsNullOrWhiteSpace(content))
{
throw new HubException("消息内容不能为空");
}
// 验证用户是否在房间中
if (!_connectionManager.IsInRoom(Context.ConnectionId, room))
{
throw new HubException("你不在该房间中");
}
// 保存消息到数据库
var message = await _chatService.SaveMessageAsync(
userId: userId ?? "anonymous",
room: room,
content: content);
_logger.LogInformation(
"SignalR 消息发送: Room={Room}, User={User}, Length={Length}",
room, user, content.Length);
// 推送给房间内所有用户(包括发送者)
await Clients.Group(room).SendAsync("ReceiveMessage", new ChatMessageDto
{
Id = message.Id,
User = user,
Content = content,
SentAt = message.CreatedAt
});
}
/// <summary>
/// 加入房间
/// </summary>
public async Task JoinRoom(string room)
{
var user = Context.User?.Identity?.Name ?? "anonymous";
// 记录连接与房间的映射
_connectionManager.JoinRoom(Context.ConnectionId, room);
await Groups.AddToGroupAsync(Context.ConnectionId, room);
// 通知房间内其他用户
await Clients.GroupExcept(room, Context.ConnectionId)
.SendAsync("UserJoined", new
{
Room = room,
User = user,
ConnectionId = Context.ConnectionId,
JoinedAt = DateTimeOffset.UtcNow
});
// 发送历史消息给新加入的用户
var history = await _chatService.GetRecentMessagesAsync(room, limit: 50);
await Clients.Caller.SendAsync("RoomHistory", history);
_logger.LogInformation(
"用户加入房间: Room={Room}, User={User}, ConnectionId={ConnectionId}",
room, user, Context.ConnectionId);
}
/// <summary>
/// 离开房间
/// </summary>
public async Task LeaveRoom(string room)
{
_connectionManager.LeaveRoom(Context.ConnectionId, room);
await Groups.RemoveFromGroupAsync(Context.ConnectionId, room);
await Clients.Group(room).SendAsync("UserLeft", new
{
Room = room,
ConnectionId = Context.ConnectionId,
LeftAt = DateTimeOffset.UtcNow
});
}
/// <summary>
/// 连接建立
/// </summary>
public override async Task OnConnectedAsync()
{
var user = Context.User?.Identity?.Name ?? "anonymous";
var userId = Context.User?.FindFirst(ClaimTypes.NameIdentifier)?.Value;
_connectionManager.AddConnection(Context.ConnectionId, userId ?? "anonymous");
_logger.LogInformation(
"SignalR 连接建立: User={User}, ConnectionId={ConnectionId}",
user, Context.ConnectionId);
// 更新在线用户列表
var onlineUsers = _connectionManager.GetOnlineUsers();
await Clients.All.SendAsync("OnlineUsersUpdated", onlineUsers);
await base.OnConnectedAsync();
}
/// <summary>
/// 连接断开
/// </summary>
public override async Task OnDisconnectedAsync(Exception? exception)
{
var userId = Context.User?.FindFirst(ClaimTypes.NameIdentifier)?.Value ?? "anonymous";
_connectionManager.RemoveConnection(Context.ConnectionId);
_logger.LogInformation(
"SignalR 连接断开: ConnectionId={ConnectionId}, Error={Error}",
Context.ConnectionId, exception?.Message);
// 清理该连接的所有房间
var rooms = _connectionManager.GetConnectionRooms(Context.ConnectionId);
foreach (var room in rooms)
{
await Groups.RemoveFromGroupAsync(Context.ConnectionId, room);
await Clients.Group(room).SendAsync("UserLeft", new
{
Room = room,
ConnectionId = Context.ConnectionId,
LeftAt = DateTimeOffset.UtcNow
});
}
// 更新在线用户列表
var onlineUsers = _connectionManager.GetOnlineUsers();
await Clients.All.SendAsync("OnlineUsersUpdated", onlineUsers);
await base.OnDisconnectedAsync(exception);
}
}强类型 Hub
// ============================================
// 强类型 Hub — 编译时检查方法名和参数
// ============================================
// 定义客户端接口
public interface IChatClient
{
Task ReceiveMessage(ChatMessageDto message);
Task UserJoined(UserJoinedEvent user);
Task UserLeft(UserLeftEvent user);
Task OnlineUsersUpdated(List<OnlineUser> users);
Task RoomHistory(List<ChatMessageDto> messages);
Task TypingIndicator(TypingEvent typing);
}
public class TypedChatHub : Hub<IChatClient>
{
public async Task SendMessage(string room, string content)
{
var user = Context.User?.Identity?.Name ?? "anonymous";
await Clients.Group(room).ReceiveMessage(new ChatMessageDto
{
User = user,
Content = content,
SentAt = DateTimeOffset.UtcNow
});
}
public async Task Typing(string room)
{
await Clients.GroupExcept(room, Context.ConnectionId)
.TypingIndicator(new TypingEvent
{
User = Context.User?.Identity?.Name ?? "anonymous",
Room = room
});
}
}
// 数据模型
public class ChatMessageDto
{
public long Id { get; set; }
public string User { get; set; } = string.Empty;
public string Content { get; set; } = string.Empty;
public DateTimeOffset SentAt { get; set; }
}
public record UserJoinedEvent(string Room, string User, string ConnectionId, DateTimeOffset JoinedAt);
public record UserLeftEvent(string Room, string ConnectionId, DateTimeOffset LeftAt);
public record TypingEvent(string User, string Room);
public record OnlineUser(string UserId, string UserName, int ConnectionCount);Hub 外推送
// ============================================
// Hub 外推送 — 后台任务/服务层推送消息
// ============================================
public class NotificationPusher
{
private readonly IHubContext<ChatHub> _hubContext;
private readonly ILogger<NotificationPusher> _logger;
public NotificationPusher(
IHubContext<ChatHub> hubContext,
ILogger<NotificationPusher> logger)
{
_hubContext = hubContext;
_logger = logger;
}
/// <summary>
/// 推送到指定房间
/// </summary>
public async Task NotifyRoomAsync(string room, string event, object data)
{
_logger.LogInformation("推送消息到房间: Room={Room}, Event={Event}", room, event);
await _hubContext.Clients.Group(room).SendAsync(event, data);
}
/// <summary>
/// 推送到指定用户(所有连接)
/// </summary>
public async Task NotifyUserAsync(string userId, string event, object data)
{
_logger.LogInformation("推送消息给用户: UserId={UserId}, Event={Event}", userId, event);
await _hubContext.Clients.User(userId).SendAsync(event, data);
}
/// <summary>
/// 广播给所有连接
/// </summary>
public async Task BroadcastAsync(string event, object data)
{
await _hubContext.Clients.All.SendAsync(event, data);
}
}
// ============================================
// 在订单服务中推送状态变化
// ============================================
public class OrderService
{
private readonly IHubContext<ChatHub> _hubContext;
public OrderService(IHubContext<ChatHub> hubContext)
{
_hubContext = hubContext;
}
public async Task UpdateOrderStatusAsync(string orderId, string newStatus)
{
// 更新数据库...
// 推送状态变化
await _hubContext.Clients.Group($"order:{orderId}")
.SendAsync("OrderStatusChanged", new
{
OrderId = orderId,
Status = newStatus,
UpdatedAt = DateTimeOffset.UtcNow
});
}
}连接管理器
// ============================================
// 连接管理 — 跟踪连接、用户和房间的映射
// ============================================
public interface IConnectionManager
{
void AddConnection(string connectionId, string userId);
void RemoveConnection(string connectionId);
void JoinRoom(string connectionId, string room);
void LeaveRoom(string connectionId, string room);
bool IsInRoom(string connectionId, string room);
List<string> GetConnectionRooms(string connectionId);
List<OnlineUser> GetOnlineUsers();
int GetConnectionCount();
}
public class ConnectionManager : IConnectionManager
{
// 连接 ID -> 用户 ID
private readonly ConcurrentDictionary<string, string> _connections = new();
// 用户 ID -> 连接 ID 集合(一个用户可能有多个连接)
private readonly ConcurrentDictionary<string, HashSet<string>> _userConnections = new();
// 房间 -> 连接 ID 集合
private readonly ConcurrentDictionary<string, HashSet<string>> _roomConnections = new();
// 连接 ID -> 房间集合
private readonly ConcurrentDictionary<string, HashSet<string>> _connectionRooms = new();
public void AddConnection(string connectionId, string userId)
{
_connections[connectionId] = userId;
var connections = _userConnections.GetOrAdd(userId, _ => new HashSet<string>());
lock (connections) { connections.Add(connectionId); }
}
public void RemoveConnection(string connectionId)
{
if (_connections.TryRemove(connectionId, out var userId))
{
if (_userConnections.TryGetValue(userId, out var connections))
{
lock (connections) { connections.Remove(connectionId); }
if (connections.Count == 0)
_userConnections.TryRemove(userId, out _);
}
}
// 清理房间
if (_connectionRooms.TryRemove(connectionId, out var rooms))
{
foreach (var room in rooms)
{
if (_roomConnections.TryGetValue(room, out var roomConns))
{
lock (roomConns) { roomConns.Remove(connectionId); }
}
}
}
}
public void JoinRoom(string connectionId, string room)
{
var roomConns = _roomConnections.GetOrAdd(room, _ => new HashSet<string>());
lock (roomConns) { roomConns.Add(connectionId); }
var userRooms = _connectionRooms.GetOrAdd(connectionId, _ => new HashSet<string>());
lock (userRooms) { userRooms.Add(room); }
}
public void LeaveRoom(string connectionId, string room)
{
if (_roomConnections.TryGetValue(room, out var roomConns))
{
lock (roomConns) { roomConns.Remove(connectionId); }
}
if (_connectionRooms.TryGetValue(connectionId, out var userRooms))
{
lock (userRooms) { userRooms.Remove(room); }
}
}
public bool IsInRoom(string connectionId, string room)
{
return _connectionRooms.TryGetValue(connectionId, out var rooms)
&& rooms.Contains(room);
}
public List<string> GetConnectionRooms(string connectionId)
{
return _connectionRooms.TryGetValue(connectionId, out var rooms)
? rooms.ToList() : new List<string>();
}
public List<OnlineUser> GetOnlineUsers()
{
return _userConnections.Select(kv => new OnlineUser(
kv.Key,
kv.Key, // 实际应查询用户名
kv.Value.Count
)).ToList();
}
public int GetConnectionCount() => _connections.Count;
}前端客户端接入
// ============================================
// 前端 SignalR 客户端
// ============================================
import * as signalR from '@microsoft/signalr'
// 创建连接
const connection = new signalR.HubConnectionBuilder()
.withUrl('https://api.example.com/hubs/chat', {
// JWT Token 认证
accessTokenFactory: () => localStorage.getItem('token') || ''
})
// 自动重连策略:[0ms, 2s, 5s, 10s, 30s]
.withAutomaticReconnect([0, 2000, 5000, 10000, 30000])
// 日志级别
.configureLogging(signalR.LogLevel.Information)
// 超时配置
.withServerTimeout(30000) // 服务端超时 30 秒
.withKeepAliveInterval(15000) // 心跳间隔 15 秒
.build()
// 注册事件处理
connection.on('ReceiveMessage', (message: ChatMessage) => {
console.log('收到消息:', message)
// 更新 UI
})
connection.on('UserJoined', (payload) => {
console.log('用户加入:', payload)
})
connection.on('UserLeft', (payload) => {
console.log('用户离开:', payload)
})
connection.on('OnlineUsersUpdated', (users) => {
console.log('在线用户:', users)
})
// 连接生命周期
connection.onreconnecting(error => {
console.warn('正在重连...', error)
showNotification('连接中断,正在尝试重新连接...')
})
connection.onreconnected(connectionId => {
console.log('重连成功:', connectionId)
showNotification('连接已恢复')
// 重连后重新加入房间
connection.invoke('JoinRoom', currentRoom)
})
connection.onclose(error => {
console.error('连接关闭:', error)
showNotification('连接已断开,请刷新页面')
})
// 启动连接
async function startConnection() {
try {
await connection.start()
console.log('SignalR 连接成功')
} catch (err) {
console.error('SignalR 连接失败:', err)
// 3 秒后重试
setTimeout(startConnection, 3000)
}
}
startConnection()
// 发送消息
async function sendMessage(room: string, content: string) {
try {
await connection.invoke('SendMessage', room, content)
} catch (err) {
console.error('发送失败:', err)
}
}
// 页面卸载时关闭连接
window.addEventListener('beforeunload', () => {
connection.stop()
})认证与授权
// ============================================
// Hub 认证授权
// ============================================
// 全局 Hub 授权
app.MapHub<ChatHub>("/hubs/chat").RequireAuthorization();
// 方式 A:在 Hub 方法级别授权
[Authorize(Roles = "Admin")]
public async Task AdminMessage(string message)
{
await Clients.All.SendAsync("AdminBroadcast", message);
}
// 方式 B:基于资源的授权
public async Task SendMessage(string room, string content)
{
// 检查用户是否有权限发送到该房间
var userId = Context.UserIdentifier;
var hasPermission = await _roomService.CanAccessRoomAsync(userId!, room);
if (!hasPermission)
{
throw new HubException("你没有权限访问该房间");
}
// ...
}
// ============================================
// JWT 认证配置
// ============================================
builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
.AddJwtBearer(options =>
{
options.TokenValidationParameters = new TokenValidationParameters
{
ValidateIssuer = true,
ValidateAudience = true,
ValidateLifetime = true,
ValidateIssuerSigningKey = true,
ValidIssuer = builder.Configuration["Jwt:Issuer"],
ValidAudience = builder.Configuration["Jwt:Audience"],
IssuerSigningKey = new SymmetricSecurityKey(
Encoding.UTF8.GetBytes(builder.Configuration["Jwt:Key"]!))
};
// SignalR 需要接收 Token 从查询参数
options.Events = new JwtBearerEvents
{
OnMessageReceived = context =>
{
var accessToken = context.Request.Query["access_token"];
var path = context.HttpContext.Request.Path;
if (!string.IsNullOrEmpty(accessToken)
&& path.StartsWithSegments("/hubs"))
{
context.Token = accessToken;
}
return Task.CompletedTask;
}
};
});Redis Backplane 多实例扩展
// ============================================
// Redis Backplane — 多实例消息同步
// ============================================
// NuGet: Microsoft.AspNetCore.SignalR.StackExchangeRedis
builder.Services.AddSignalR()
.AddStackExchangeRedis(connectionString, options =>
{
// Redis Key 前缀(多应用共享 Redis 时区分)
options.Configuration.ChannelPrefix = "myapp:signalr:";
// 连接工厂(自定义连接配置)
// options.ConnectionFactory = async =>
// {
// return ConnectionMultiplexer.Connect(connectionString);
// };
});Redis Backplane 工作原理:
节点 A Redis 节点 B
| | |
|-- Publish "msg" ------>| |
| |-- Publish "msg" ------->|
| | |
客户端 A 收到 Redis 中转 客户端 B 收到
注意:
- Redis Backplane 只转发消息,不转发连接状态
- 每个节点维护自己的连接列表
- Groups 信息不跨节点共享(需要自己实现)
- 性能瓶颈在 Redis,大规模场景建议使用 Azure SignalR ServiceNginx 反向代理配置
# ============================================
# Nginx 配置 — 支持 WebSocket
# ============================================
upstream signalr_backend {
least_conn;
server 10.0.0.1:5000;
server 10.0.0.2:5000;
server 10.0.0.3:5000;
}
server {
listen 80;
server_name app.example.com;
location /hubs {
proxy_pass http://signalr_backend;
proxy_http_version 1.1;
# WebSocket 升级头
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
# 代理头
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# WebSocket 超时(必须大于 KeepAlive 间隔)
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}
}优点
缺点
总结
SignalR 最适合解决的是"服务端如何稳定地把实时事件推给客户端",而不是替代所有消息和通知场景。真正做实时系统时,重点不只是写个 Hub,而是把连接状态、鉴权、重连、分组、横向扩展和消息补偿一起设计进去。生产环境必须验证网关/Nginx/LB 对 Upgrade、KeepAlive 的支持。
关键知识点
- SignalR 是 WebSocket 的工程化封装,不等于"消息可靠送达系统"。
- 组、用户、连接 ID 是最常见三种消息路由维度。
- 多实例部署时通常要额外引入 backplane 或托管服务。
- 实时系统最难的是状态一致性,不是发送函数本身。
项目落地视角
- 聊天、通知中心、在线协作、监控大屏都很适合 SignalR。
- 后台任务、订单状态流、设备状态流常通过 Hub 外推送实现。
- 前端要同时设计实时推送和历史消息补拉,不要只依赖在线推送。
- 生产环境必须验证网关 / Nginx / LB 对 Upgrade、KeepAlive 的支持。
常见误区
- 以为 SignalR 连接建立成功,就等于业务消息不会丢。
- 不处理断线重连后的组恢复和状态补偿。
- 把所有业务通知都塞进一个 Hub,导致职责混乱。
- 线上问题只看前端控制台,不查服务端日志和代理配置。
进阶路线
- 深入学习 SignalR Redis Backplane / Azure SignalR Service。
- 对高频消息设计压缩、聚合和节流策略。
- 建立连接数、断开率、重连率、消息失败率监控。
- 将 SignalR 与领域事件、Kafka、Outbox 结合形成完整实时链路。
适用场景
- 聊天室。
- 实时通知和状态推送。
- 在线协作和多人编辑。
- 监控大屏、设备状态流、告警中心。
落地建议
- 先设计消息模型和分组模型,再设计 Hub。
- 默认考虑重连、心跳、补拉和权限控制。
- 高价值消息不要只靠实时推送,最好有可回放来源。
- 对多实例场景尽早验证扩展方案,而不是上线后再补。
排错清单
- 连不上:检查 CORS、认证、网关和 WebSocket Upgrade。
- 容易断:检查 KeepAlive、空闲超时和代理配置。
- 收不到:检查 JoinGroup 时机、多实例同步和用户标识。
- 消息乱:检查客户端是否做了去重、排序和补拉。
