SignalR 协议与传输机制
大约 9 分钟约 2649 字
SignalR 协议与传输机制
简介
SignalR 是 ASP.NET Core 的实时通信库,支持 WebSocket、Server-Sent Events 和 Long Polling 三种传输方式。理解 SignalR 的协议协商、Hub 生命周期、流式传输和横向扩展,有助于构建高性能的实时应用。
特点
传输协议协商
自动传输选择
// SignalR 协商流程:
// 1. 客户端发送 POST /hub/negotiate
// 2. 服务器返回支持的传输列表和连接 Token
// 3. 客户端按优先级选择传输:WebSocket > SSE > Long Polling
// 配置传输选项
builder.Services.AddSignalR(options =>
{
options.EnableDetailedErrors = true; // 详细错误信息(开发环境)
options.KeepAliveInterval = TimeSpan.FromSeconds(15); // 心跳间隔
options.ClientTimeoutInterval = TimeSpan.FromSeconds(30); // 客户端超时
options.HandshakeTimeout = TimeSpan.FromSeconds(15); // 握手超时
options.MaximumReceiveMessageSize = 64 * 1024; // 最大消息 64KB
options.StreamBufferCapacity = 10; // 流缓冲大小
options.MaximumParallelInvocations = 1; // 最大并行调用
});
// 配置 JSON 协议
builder.Services.AddSignalR()
.AddJsonProtocol(options =>
{
options.PayloadSerializerOptions.PropertyNamingPolicy = null; // 保持原始命名
});
// 配置 MessagePack(更高性能)
builder.Services.AddSignalR()
.AddMessagePackProtocol();
// 客户端(JavaScript)
// const connection = new signalR.HubConnectionBuilder()
// .withUrl("/chathub")
// .withAutomaticReconnect([0, 2000, 5000, 10000, 30000]) // 自动重连
// .configureLogging(signalR.LogLevel.Information)
// .build();Hub 开发
Hub 方法与生命周期
public class ChatHub : Hub
{
private readonly ILogger<ChatHub> _logger;
// 客户端调用服务器方法
public async Task SendMessage(string user, string message)
{
_logger.LogInformation("收到消息: {User}: {Message}", user, message);
// 广播给所有客户端(除了发送者)
await Clients.Others.SendAsync("ReceiveMessage", user, message);
// 或者广播给所有人
// await Clients.All.SendAsync("ReceiveMessage", user, message);
// 发送给特定组
// await Clients.Group("room1").SendAsync("ReceiveMessage", user, message);
// 发送给特定用户
// await Clients.User("userId").SendAsync("ReceiveMessage", user, message);
}
// 加入组
public async Task JoinRoom(string roomName)
{
await Groups.AddToGroupAsync(Context.ConnectionId, roomName);
await Clients.Group(roomName).SendAsync("UserJoined", Context.User?.Identity?.Name);
}
// 离开组
public async Task LeaveRoom(string roomName)
{
await Groups.RemoveFromGroupAsync(Context.ConnectionId, roomName);
}
// 连接建立
public override async Task OnConnectedAsync()
{
_logger.LogInformation("连接建立: {ConnectionId}", Context.ConnectionId);
await base.OnConnectedAsync();
}
// 连接断开
public override async Task OnDisconnectedAsync(Exception? exception)
{
if (exception != null)
{
_logger.LogError(exception, "连接异常断开: {ConnectionId}", Context.ConnectionId);
}
else
{
_logger.LogInformation("连接断开: {ConnectionId}", Context.ConnectionId);
}
await base.OnDisconnectedAsync(exception);
}
}
// 强类型 Hub(推荐)
public interface IChatClient
{
Task ReceiveMessage(string user, string message);
Task UserJoined(string userName);
Task UserLeft(string userName);
}
public class TypedChatHub : Hub<IChatClient>
{
public async Task SendMessage(string message)
{
var user = Context.User?.Identity?.Name ?? "Anonymous";
await Clients.Others.ReceiveMessage(user, message);
}
public async Task JoinRoom(string room)
{
await Groups.AddToGroupAsync(Context.ConnectionId, room);
await Clients.Group(room).UserJoined(Context.User?.Identity?.Name!);
}
}流式传输
// 服务器 → 客户端流
public class StreamHub : Hub
{
// IAsyncEnumerable 流(按需推送)
public async IAsyncEnumerable<StockPrice> StreamStockPrices(
string symbol,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var price = await GetLatestPriceAsync(symbol);
yield return price;
await Task.Delay(1000, cancellationToken);
}
}
// Channel 流(批量推送)
public ChannelReader<LogEntry> StreamLogs(CancellationToken cancellationToken)
{
var channel = Channel.CreateBounded<LogEntry>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.DropOldest
});
_ = Task.Run(async () =>
{
try
{
await foreach (var log in _logProvider.GetLogsAsync(cancellationToken))
{
await channel.Writer.WriteAsync(log, cancellationToken);
}
}
finally
{
channel.Writer.Complete();
}
}, cancellationToken);
return channel.Reader;
}
// 客户端 → 服务器流
public async Task UploadStream(IAsyncEnumerable<string> stream)
{
await foreach (var item in stream)
{
ProcessItem(item);
}
}
private async Task<StockPrice> GetLatestPriceAsync(string symbol) => new(symbol, Random.Shared.Next(100, 200));
private void ProcessItem(string item) { }
}
public record StockPrice(string Symbol, decimal Price);
public record LogEntry(DateTime Time, string Level, string Message);Hub 授权与安全
认证集成
// SignalR Hub 认证配置
builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
.AddJwtBearer(options =>
{
options.Events = new JwtBearerEvents
{
OnMessageReceived = context =>
{
// WebSocket 请求没有 HTTP 头,Token 通过 Query 传递
var accessToken = context.Request.Query["access_token"];
var path = context.HttpContext.Request.Path;
if (!string.IsNullOrEmpty(accessToken) && path.StartsWithSegments("/chathub"))
{
context.Token = accessToken;
}
return Task.CompletedTask;
}
};
});
// Hub 级别授权
[Authorize]
public class SecureChatHub : Hub
{
// 只有认证用户才能调用
public async Task SendMessage(string message)
{
var userName = Context.User?.Identity?.Name;
await Clients.All.SendAsync("ReceiveMessage", userName, message);
}
// 角色授权
[Authorize(Roles = "Admin")]
public async Task BroadcastSystemMessage(string message)
{
await Clients.All.SendAsync("SystemMessage", message);
}
// 策略授权
[Authorize(Policy = "CanManageRooms")]
public async Task CreateRoom(string roomName)
{
await Groups.AddToGroupAsync(Context.ConnectionId, roomName);
}
}
// 客户端连接时传递 Token
// var connection = new signalR.HubConnectionBuilder()
// .withUrl("/chathub", { accessTokenFactory: () => token })
// .build();Hub 过滤器(.NET 8+)
// Hub 过滤器 — 类似 MVC 的 Action Filter
public class LoggingHubFilter : IHubFilter
{
private readonly ILogger<LoggingHubFilter> _logger;
public LoggingHubFilter(ILogger<LoggingHubFilter> logger)
{
_logger = logger;
}
public async ValueTask<object?> InvokeMethodAsync(
HubInvocationContext context,
Func<HubInvocationContext, ValueTask<object?>> next)
{
_logger.LogInformation("Hub 方法调用: {Method} (连接: {ConnectionId})",
context.HubMethodName, context.Context.ConnectionId);
try
{
var result = await next(context);
_logger.LogInformation("Hub 方法完成: {Method}", context.HubMethodName);
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "Hub 方法异常: {Method}", context.HubMethodName);
throw;
}
}
}
// 异常处理过滤器
public class HubExceptionFilter : IHubFilter
{
public ValueTask<object?> InvokeMethodAsync(
HubInvocationContext context,
Func<HubInvocationContext, ValueTask<object?>> next)
{
return next(context);
}
public async Task OnConnectedAsync(HubLifetimeContext context)
{
// 连接建立时的逻辑
await Task.CompletedTask;
}
public async Task OnDisconnectedAsync(HubLifetimeContext context, Exception? exception)
{
if (exception != null)
{
context.Hub.Clients.Client(context.Context.ConnectionId)
.SendAsync("Error", "连接异常断开,请重连");
}
}
}
// 注册 Hub 过滤器
builder.Services.AddSignalR(options =>
{
options.AddFilter<LoggingHubFilter>();
options.AddFilter<HubExceptionFilter>();
});消息大小与频率控制
防滥用策略
// SignalR 频率限制
builder.Services.AddSignalR(options =>
{
options.MaximumReceiveMessageSize = 32 * 1024; // 32KB
options.StreamBufferCapacity = 10;
options.MaximumParallelInvocations = 1;
options.EnableDetailedErrors = false; // 生产环境关闭
});
// 自定义频率限制中间件
public class HubRateLimitMiddleware
{
private readonly RequestDelegate _next;
private readonly ILogger<HubRateLimitMiddleware> _logger;
private readonly ConcurrentDictionary<string, RateLimitEntry> _entries = new();
public HubRateLimitMiddleware(RequestDelegate next, ILogger<HubRateLimitMiddleware> logger)
{
_next = next;
_logger = logger;
}
public async Task InvokeAsync(HttpContext context)
{
if (context.Request.Path.StartsWithSegments("/chathub"))
{
var connectionId = context.Request.Query["id"].ToString();
if (!string.IsNullOrEmpty(connectionId))
{
if (!_entries.TryGetValue(connectionId, out var entry))
{
entry = new RateLimitEntry();
_entries[connectionId] = entry;
}
// 每秒最多 10 个请求
if (entry.Increment() > 10)
{
context.Response.StatusCode = 429;
return;
}
}
}
await _next(context);
}
private class RateLimitEntry
{
private int _count;
private DateTime _windowStart = DateTime.UtcNow;
public int Increment()
{
lock (this)
{
if (DateTime.UtcNow - _windowStart > TimeSpan.FromSeconds(1))
{
_count = 0;
_windowStart = DateTime.UtcNow;
}
return ++_count;
}
}
}
}MessagePack 序列化优化
高性能协议配置
// MessagePack 序列化 — 比 JSON 更小更快
// NuGet: Microsoft.AspNetCore.SignalR.Protocols.MessagePack
builder.Services.AddSignalR()
.AddMessagePackProtocol(options =>
{
// 自定义序列化选项
options.SerializerOptions =
MessagePackSerializerOptions.Standard
.WithResolver(CompositeResolver.Create(
MessagePack.Resolvers.StandardResolver.Instance,
GeneratedResolver.Instance)); // AOT 友好的预生成解析器
});
// MessagePack 自定义序列化
[MessagePackObject]
public class ChatMessage
{
[Key(0)]
public string UserName { get; set; } = "";
[Key(1)]
public string Content { get; set; } = "";
[Key(2)]
public DateTimeOffset Timestamp { get; set; }
}
// JSON vs MessagePack 对比:
// JSON: {"userName":"张三","content":"你好","timestamp":"2024-01-01T00:00:00Z"} → 约70字节
// MessagePack: 二进制格式 → 约40字节
// 序列化速度:MessagePack 比 JSON 快 3-5 倍
// 适用场景:高频消息推送、实时游戏、金融行情横向扩展
Redis 背板
// Redis 背板(多实例消息广播)
builder.Services.AddSignalR()
.AddStackExchangeRedis(options =>
{
options.Configuration = "localhost:6379";
options.ConfigurationOptions = new ConfigurationOptions
{
EndPoints = { "localhost:6379" },
AbortOnConnectFail = false
};
});
// 路由配置
app.MapHub<ChatHub>("/chathub");
app.MapHub<StreamHub>("/stream");
// Redis 背板工作原理:
// 1. 每个实例订阅 Redis 频道
// 2. Hub 方法调用时,消息发布到 Redis
// 3. 所有实例收到消息,推送给自己的连接
// 4. 实现跨实例的消息广播用户映射
// 自定义用户 ID 映射(默认使用 ClaimTypes.NameIdentifier)
public class CustomUserIdProvider : IUserIdProvider
{
public string? GetUserId(HubConnectionContext connection)
{
// 自定义用户 ID 逻辑
return connection.User?.FindFirst("sub")?.Value
?? connection.User?.FindFirst(ClaimTypes.Name)?.Value;
}
}
builder.Services.AddSingleton<IUserIdProvider, CustomUserIdProvider>();
// 连接管理
public class ConnectionManager
{
private readonly ConcurrentDictionary<string, HashSet<string>> _userConnections = new();
public void AddConnection(string userId, string connectionId)
{
_userConnections.AddOrUpdate(userId,
_ => new HashSet<string> { connectionId },
(_, set) => { set.Add(connectionId); return set; });
}
public void RemoveConnection(string userId, string connectionId)
{
if (_userConnections.TryGetValue(userId, out var set))
{
set.Remove(connectionId);
if (set.Count == 0)
_userConnections.TryRemove(userId, out _);
}
}
}优点
缺点
总结
SignalR 支持三种传输:WebSocket(最佳)、Server-Sent Events、Long Polling。协商时客户端按优先级选择。Hub 方法通过 Clients.All/Others/Group/User 选择消息目标。强类型 Hub 使用接口定义客户端方法,提供编译时检查。流式传输使用 IAsyncEnumerable<T> 或 Channel<T>。Redis 背板通过发布/订阅实现多实例消息广播。自定义 IUserIdProvider 映射用户标识。心跳机制通过 KeepAliveInterval 保持连接活跃。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
- 框架能力的真正重点是它在请求链路中的位置和对上下游的影响。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
- 画清执行顺序、入参来源、失败返回和日志记录点。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
- 知道 API 名称,却不知道它应该放在请求链路的哪个位置。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
- 继续补齐协议选型、网关治理、端点可观测性和契约演进策略。
适用场景
- 当你准备把《SignalR 协议与传输机制》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《SignalR 协议与传输机制》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《SignalR 协议与传输机制》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《SignalR 协议与传输机制》最大的收益和代价分别是什么?
