WebSocket 实时通信
大约 10 分钟约 2906 字
WebSocket 实时通信
简介
WebSocket 提供浏览器和服务端之间的持久化全双工通道,服务端可以主动把数据推送给客户端,而不需要客户端频繁轮询。它特别适合实时通知、在线聊天、协同编辑、行情推送、监控大屏和多人在线状态等场景,但前提是你要同时处理连接状态、重连机制、鉴权、心跳和消息边界。
特点
实现
原生 WebSocket 基本示例
const socket = new WebSocket('ws://localhost:8080/ws')
socket.addEventListener('open', () => {
console.log('connected')
socket.send(JSON.stringify({ type: 'join', room: 'order-room' }))
})
socket.addEventListener('message', event => {
const data = JSON.parse(event.data)
console.log('message:', data)
})
socket.addEventListener('close', () => {
console.log('closed')
})
socket.addEventListener('error', err => {
console.error('socket error', err)
})// 发送聊天消息
function sendMessage(text) {
socket.send(JSON.stringify({
type: 'message',
content: text,
sentAt: new Date().toISOString()
}))
}原生 WebSocket 适合:
- 先理解协议与状态机
- 自己定义消息格式
- 对实时链路做较细控制
但生产场景通常还要再补:
- 重连
- 鉴权
- 心跳
- 分组/房间
- 广播封装 WebSocket 客户端类
// utils/WebSocketClient.ts — 生产级 WebSocket 封装
type MessageHandler = (data: any) => void
interface WebSocketOptions {
url: string
reconnectInterval?: number // 重连间隔(毫秒)
maxReconnectAttempts?: number // 最大重连次数
heartbeatInterval?: number // 心跳间隔(毫秒)
heartbeatMessage?: string // 心跳消息内容
protocols?: string | string[] // 子协议
}
export class WebSocketClient {
private ws: WebSocket | null = null
private url: string
private reconnectInterval: number
private maxReconnectAttempts: number
private heartbeatInterval: number
private heartbeatMessage: string
private protocols?: string | string[]
private reconnectAttempts = 0
private reconnectTimer: ReturnType<typeof setTimeout> | null = null
private heartbeatTimer: ReturnType<typeof setInterval> | null = null
private isManualClose = false
private messageHandlers = new Map<string, Set<MessageHandler>>()
private statusHandlers = new Set<(status: string) => void>()
constructor(options: WebSocketOptions) {
this.url = options.url
this.reconnectInterval = options.reconnectInterval ?? 3000
this.maxReconnectAttempts = options.maxReconnectAttempts ?? Infinity
this.heartbeatInterval = options.heartbeatInterval ?? 30000
this.heartbeatMessage = options.heartbeatMessage ?? '{"type":"ping"}'
this.protocols = options.protocols
}
connect(): Promise<void> {
return new Promise((resolve, reject) => {
this.isManualClose = false
const ws = this.protocols
? new WebSocket(this.url, this.protocols)
: new WebSocket(this.url)
ws.onopen = () => {
console.log('[WS] 连接成功')
this.reconnectAttempts = 0
this.startHeartbeat()
this.notifyStatus('connected')
resolve()
}
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data)
// 处理心跳响应
if (data.type === 'pong') return
// 分发消息给对应的处理器
const handlers = this.messageHandlers.get(data.type)
if (handlers) {
handlers.forEach(handler => handler(data))
}
// 通配处理器
const wildcardHandlers = this.messageHandlers.get('*')
if (wildcardHandlers) {
wildcardHandlers.forEach(handler => handler(data))
}
} catch {
// 非 JSON 消息
const wildcardHandlers = this.messageHandlers.get('*')
wildcardHandlers?.forEach(handler => handler(event.data))
}
}
ws.onclose = (event) => {
console.log(`[WS] 连接关闭: code=${event.code}, reason=${event.reason}`)
this.stopHeartbeat()
this.notifyStatus('disconnected')
if (!this.isManualClose && this.reconnectAttempts < this.maxReconnectAttempts) {
this.scheduleReconnect()
}
}
ws.onerror = (error) => {
console.error('[WS] 连接错误:', error)
this.notifyStatus('error')
reject(error)
}
this.ws = ws
})
}
disconnect(): void {
this.isManualClose = true
this.stopHeartbeat()
this.clearReconnectTimer()
if (this.ws) {
this.ws.close(1000, '手动关闭')
this.ws = null
}
}
send(type: string, payload: any): void {
if (this.ws?.readyState !== WebSocket.OPEN) {
console.warn('[WS] 未连接,无法发送消息')
return
}
this.ws.send(JSON.stringify({ type, payload, timestamp: Date.now() }))
}
// 注册消息处理器
on(type: string, handler: MessageHandler): () => void {
if (!this.messageHandlers.has(type)) {
this.messageHandlers.set(type, new Set())
}
this.messageHandlers.get(type)!.add(handler)
// 返回取消注册函数
return () => {
this.messageHandlers.get(type)?.delete(handler)
}
}
// 注册连接状态变化处理器
onStatusChange(handler: (status: string) => void): () => void {
this.statusHandlers.add(handler)
return () => this.statusHandlers.delete(handler)
}
get isConnected(): boolean {
return this.ws?.readyState === WebSocket.OPEN
}
// 私有方法
private startHeartbeat(): void {
this.stopHeartbeat()
this.heartbeatTimer = setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(this.heartbeatMessage)
}
}, this.heartbeatInterval)
}
private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer)
this.heartbeatTimer = null
}
}
private scheduleReconnect(): void {
this.clearReconnectTimer()
const delay = Math.min(
this.reconnectInterval * Math.pow(1.5, this.reconnectAttempts),
30000 // 最大 30 秒
)
this.reconnectAttempts++
console.log(`[WS] ${delay}ms 后尝试第 ${this.reconnectAttempts} 次重连`)
this.reconnectTimer = setTimeout(() => {
this.connect().catch(() => {})
}, delay)
}
private clearReconnectTimer(): void {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
}
private notifyStatus(status: string): void {
this.statusHandlers.forEach(handler => handler(status))
}
}Vue Composable 封装
// composables/useWebSocket.ts
import { ref, onUnmounted } from 'vue'
import { WebSocketClient } from '@/utils/WebSocketClient'
export function useWebSocket(url: string) {
const client = new WebSocketClient({ url })
const isConnected = ref(false)
const lastMessage = ref<any>(null)
const error = ref<Error | null>(null)
// 监听连接状态
const unsubStatus = client.onStatusChange((status) => {
isConnected.value = status === 'connected'
})
// 监听所有消息
const unsubMessage = client.on('*', (data) => {
lastMessage.value = data
})
// 组件卸载时自动断开
onUnmounted(() => {
unsubStatus()
unsubMessage()
client.disconnect()
})
return {
client,
isConnected,
lastMessage,
error,
connect: () => client.connect(),
disconnect: () => client.disconnect(),
}
}消息协议设计
// 消息协议规范 — 前后端统一约定
interface WSMessage<T = any> {
type: string // 消息类型
payload: T // 消息内容
timestamp: number // 时间戳
msgId: string // 消息唯一 ID(用于 ACK)
}
// 消息类型枚举
enum MessageType {
// 系统消息
PING = 'ping',
PONG = 'pong',
ACK = 'ack',
// 聊天消息
CHAT_JOIN = 'chat:join',
CHAT_LEAVE = 'chat:leave',
CHAT_MESSAGE = 'chat:message',
CHAT_HISTORY = 'chat:history',
// 通知消息
NOTIFICATION = 'notification',
NOTIFICATION_READ = 'notification:read',
// 协作消息
CURSOR_MOVE = 'collab:cursor',
CONTENT_CHANGE = 'collab:change',
}
// 带消息确认(ACK)的发送
class ReliableWebSocket extends WebSocketClient {
private pendingMessages = new Map<string, {
resolve: () => void
reject: (error: Error) => void
timer: ReturnType<typeof setTimeout>
}>()
async sendWithAck(type: string, payload: any, timeout: number = 5000): Promise<void> {
const msgId = `${Date.now()}-${Math.random().toString(36).slice(2)}`
this.send(type, payload)
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
this.pendingMessages.delete(msgId)
reject(new Error('消息确认超时'))
}, timeout)
this.pendingMessages.set(msgId, { resolve, reject, timer })
})
}
// 收到 ACK 时确认
private handleAck(msgId: string) {
const pending = this.pendingMessages.get(msgId)
if (pending) {
clearTimeout(pending.timer)
pending.resolve()
this.pendingMessages.delete(msgId)
}
}
}ASP.NET Core SignalR 服务端
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSignalR(options =>
{
options.EnableDetailedErrors = true;
options.KeepAliveInterval = TimeSpan.FromSeconds(15);
options.ClientTimeoutInterval = TimeSpan.FromSeconds(30);
});
builder.Services.AddCors(options =>
{
options.AddPolicy("SignalRPolicy", policy =>
{
policy.WithOrigins("http://localhost:5173")
.AllowAnyHeader()
.AllowAnyMethod()
.AllowCredentials();
});
});
var app = builder.Build();
app.UseCors("SignalRPolicy");
app.MapHub<ChatHub>("/hubs/chat");
app.Run();using Microsoft.AspNetCore.SignalR;
public class ChatHub : Hub
{
private readonly ILogger<ChatHub> _logger;
public ChatHub(ILogger<ChatHub> logger)
{
_logger = logger;
}
public async Task JoinRoom(string room)
{
await Groups.AddToGroupAsync(Context.ConnectionId, room);
await Clients.Group(room).SendAsync("UserJoined", new
{
Room = room,
ConnectionId = Context.ConnectionId
});
}
public async Task SendMessage(string room, string message)
{
var user = Context.User?.Identity?.Name ?? "anonymous";
await Clients.Group(room).SendAsync("ReceiveMessage", new
{
User = user,
Content = message,
SentAt = DateTimeOffset.UtcNow
});
}
public override Task OnConnectedAsync()
{
_logger.LogInformation("SignalR connected: {ConnectionId}", Context.ConnectionId);
return base.OnConnectedAsync();
}
public override Task OnDisconnectedAsync(Exception? exception)
{
_logger.LogInformation("SignalR disconnected: {ConnectionId}", Context.ConnectionId);
return base.OnDisconnectedAsync(exception);
}
}// 强类型 Hub 更适合大型项目
public interface IChatClient
{
Task ReceiveMessage(ChatMessage message);
Task UserJoined(UserInfo info);
}
public class TypedChatHub : Hub<IChatClient>
{
public async Task SendMessage(string room, string text)
{
await Clients.Group(room).ReceiveMessage(new ChatMessage
{
User = Context.User?.Identity?.Name ?? "anonymous",
Content = text,
SentAt = DateTimeOffset.UtcNow
});
}
}
public class ChatMessage
{
public string User { get; set; } = string.Empty;
public string Content { get; set; } = string.Empty;
public DateTimeOffset SentAt { get; set; }
}
public class UserInfo
{
public string Name { get; set; } = string.Empty;
public string Room { get; set; } = string.Empty;
}前端 SignalR 客户端
import * as signalR from '@microsoft/signalr'
export class ChatService {
private connection: signalR.HubConnection
constructor() {
this.connection = new signalR.HubConnectionBuilder()
.withUrl('https://localhost:5001/hubs/chat', {
accessTokenFactory: () => localStorage.getItem('token') || ''
})
.withAutomaticReconnect([0, 2000, 5000, 10000])
.configureLogging(signalR.LogLevel.Information)
.build()
}
async start() {
if (this.connection.state === signalR.HubConnectionState.Connected) return
await this.connection.start()
}
async joinRoom(room: string) {
await this.connection.invoke('JoinRoom', room)
}
async sendMessage(room: string, content: string) {
await this.connection.invoke('SendMessage', room, content)
}
onMessage(callback: (message: any) => void) {
this.connection.on('ReceiveMessage', callback)
}
onJoined(callback: (info: any) => void) {
this.connection.on('UserJoined', callback)
}
}// Vue / React 中的接入思路都是:
// 1. 页面挂载时 start
// 2. 加入房间 / 注册监听
// 3. 卸载时移除监听 / 断开鉴权、重连与心跳
// 服务端通常要与认证系统联动
builder.Services.AddAuthentication();
builder.Services.AddAuthorization();
app.UseAuthentication();
app.UseAuthorization();
app.MapHub<ChatHub>("/hubs/chat").RequireAuthorization();this.connection.onreconnecting(error => {
console.warn('reconnecting...', error)
})
this.connection.onreconnected(connectionId => {
console.log('reconnected:', connectionId)
})
this.connection.onclose(error => {
console.error('connection closed', error)
})实时系统里最常见的问题不是”发不发得出去”,而是:
- 断线后能不能恢复
- 恢复后状态是否一致
- 是否会丢消息
- 是否需要补发 / 重拉最近消息Nginx 反向代理配置
# Nginx 配置 WebSocket 代理
upstream websocket_backend {
# Sticky Session — 保证同一客户端连到同一台后端
ip_hash;
server 127.0.0.1:5001;
server 127.0.0.1:5002;
}
server {
listen 80;
server_name ws.example.com;
location /hubs/ {
proxy_pass http://websocket_backend;
proxy_http_version 1.1;
# 必须设置 Upgrade 和 Connection 头
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection “upgrade”;
# 传递客户端信息
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 超时设置(WebSocket 需要更长的超时)
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}
}SSE 作为 WebSocket 的替代方案
// utils/useSSE.ts — Server-Sent Events
// 适用场景:单向推送(服务端 -> 客户端),如通知、股票行情
export function useSSE(url: string) {
const source = ref<EventSource | null>(null)
const messages = ref<any[]>([])
const isConnected = ref(false)
const error = ref<Event | null>(null)
function connect() {
// EventSource 自动重连
source.value = new EventSource(url)
source.value.onopen = () => {
isConnected.value = true
console.log('[SSE] 连接成功')
}
source.value.onmessage = (event) => {
try {
const data = JSON.parse(event.data)
messages.value.push(data)
} catch {
messages.value.push({ raw: event.data })
}
}
source.value.onerror = (event) => {
error.value = event
isConnected.value = false
console.error('[SSE] 连接错误')
}
}
function disconnect() {
source.value?.close()
source.value = null
isConnected.value = false
}
onUnmounted(disconnect)
return { messages, isConnected, error, connect, disconnect }
}
// WebSocket vs SSE 选型:
// WebSocket — 全双工,客户端也需要频繁发送数据
// SSE — 半双工(服务端推送),实现更简单,自动重连,HTTP 兼容
// 长轮询 — 兼容性最好,但延迟高,资源消耗大聊天室完整示例(Vue 组件)
<!-- ChatRoom.vue -->
<script setup lang=”ts”>
import { ref, onMounted, nextTick } from 'vue'
import { useWebSocket } from '@/composables/useWebSocket'
interface ChatMessage {
id: string
user: string
content: string
sentAt: string
}
const props = defineProps<{ roomId: string; username: string }>()
const { client, isConnected, connect, disconnect } = useWebSocket(
`ws://localhost:8080/ws/chat`
)
const messages = ref<ChatMessage[]>([])
const inputText = ref('')
const messageListRef = ref<HTMLDivElement>()
// 注册消息处理器
onMounted(async () => {
await connect()
client.on('chat:message', (data) => {
messages.value.push(data.payload)
scrollToBottom()
})
client.on('chat:history', (data) => {
messages.value = data.payload
scrollToBottom()
})
// 加入房间
client.send('chat:join', { room: props.roomId, user: props.username })
})
function sendMessage() {
if (!inputText.value.trim()) return
client.send('chat:message', {
room: props.roomId,
content: inputText.value.trim(),
})
inputText.value = ''
}
function scrollToBottom() {
nextTick(() => {
if (messageListRef.value) {
messageListRef.value.scrollTop = messageListRef.value.scrollHeight
}
})
}
</script>
<template>
<div class=”chat-room”>
<div class=”header”>
<span>房间: {{ roomId }}</span>
<span :class=”['status', isConnected ? 'online' : 'offline']”>
{{ isConnected ? '在线' : '离线' }}
</span>
</div>
<div ref=”messageListRef” class=”message-list”>
<div v-for=”msg in messages” :key=”msg.id” class=”message”>
<span class=”user”>{{ msg.user }}:</span>
<span class=”content”>{{ msg.content }}</span>
<span class=”time”>{{ new Date(msg.sentAt).toLocaleTimeString() }}</span>
</div>
</div>
<div class=”input-area”>
<input v-model=”inputText” @keyup.enter=”sendMessage” placeholder=”输入消息...” />
<button @click=”sendMessage” :disabled=”!isConnected”>发送</button>
</div>
</div>
</template>优点
缺点
总结
WebSocket / SignalR 的真正价值,在于为实时场景提供持续连接和服务端主动推送能力。要真正用好它,不能只关注“连接成功”,而要从一开始就设计消息协议、鉴权方式、房间模型、重连逻辑和失败恢复策略。
关键知识点
- WebSocket 是协议能力,SignalR 是 .NET 生态下的工程化封装。
- 长连接系统最难的是状态治理,而不是消息发送本身。
- 实时消息是否可靠,要看补偿和重拉设计,不只是看连接是否在线。
- SignalR 很适合 .NET,但也需要和认证、网关、日志、监控一起设计。
项目落地视角
- 聊天、在线协作、通知中心、订单状态推送都很常见。
- 监控大屏、设备状态流、实时告警适合走 WebSocket / SignalR。
- 如果是管理后台低频提示,SSE 可能比 WebSocket 更简单。
- 生产环境要重点验证 Nginx / Gateway / LB 对 Upgrade 头和连接保持的支持。
常见误区
- 只测本地连通,不测断线重连和网络抖动恢复。
- 所有实时数据都直接推送,不做限流、合并和状态压缩。
- 以为连上 WebSocket 就一定不会丢消息。
- 不设计消息协议和版本,后面前后端协作越来越乱。
进阶路线
- 深入学习 SignalR 分布式扩展(Redis backplane / Azure SignalR)。
- 研究 WebSocket 与 SSE、Long Polling 的场景边界。
- 设计消息序列号、ACK、补发机制提升可靠性。
- 将实时链路与日志、Trace、监控告警打通。
适用场景
- 聊天室、在线客服。
- 实时通知、订单状态更新、后台告警。
- 协作编辑、白板、在线状态同步。
- 监控大屏、IoT 设备状态流。
落地建议
- 先定义消息协议,再写 Hub 和客户端。
- 默认设计重连、补拉、超时和心跳机制。
- 对实时链路建立连接数、断开率、发送失败率等监控。
- 对高频推送场景做节流和聚合,不要把每个状态变化都直接广播。
排错清单
- 连不上:先检查 CORS、认证、网关和 Upgrade 头。
- 容易断:先检查心跳、超时配置和代理层空闲连接策略。
- 丢消息:先检查是否有补发 / 重拉设计。
- 群组广播异常:先检查 JoinRoom / LeaveRoom 的时机和房间标识是否一致。
