WebSocket实时通信完全指南
一、为什么需要WebSocket
HTTP的局限
传统HTTP请求-响应:
客户端 → 请求 → 服务器
客户端 ← 响应 ← 服务器
问题:
- 单向性:服务器不能主动推送
- 轮询浪费:客户端不断询问"有新数据吗?"
- HTTP头开销:每次请求携带大量headers(Cookie、User-Agent等)
- 延迟高:连接建立 → 请求 → 响应 → 关闭
WebSocket的优势
双向通道:
客户端 ⇄ 持久连接 ⇄ 服务器
特性:
- ✅ 全双工通信(同时收发)
- ✅ 低延迟(1-10ms)
- ✅ 低开销(无HTTP headers)
- ✅ 服务器可主动推送
- ✅ 保持连接状态
二、WebSocket协议详解
2.1 握手过程
客户端发起:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
服务器响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
关键字段:
Sec-WebSocket-Key:客户端随机值(Base64)Sec-WebSocket-Accept:服务器计算的SHA-1哈希Accept = BASE64(SHA1(Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))
2.2 数据帧格式
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
Opcode类型:
0x0延续帧0x1文本帧(UTF-8)0x2二进制帧0x8关闭连接0x9Ping0xAPong
2.3 心跳保活
Ping-Pong机制:
// 客户端每30秒发送Ping
setInterval(() => {
ws.send(JSON.stringify({type: 'ping'}));
}, 30000);
// 服务器收到Ping后返回Pong
ws.on('message', (data) => {
if (data.type === 'ping') {
ws.send(JSON.stringify({type: 'pong'}));
}
});
作用:
- 检测连接存活
- 防止代理/防火墙超时断开
- 清理僵尸连接
三、客户端实现
3.1 JavaScript(浏览器)
// 创建连接
const ws = new WebSocket('wss://example.com/socket');
// 连接打开
ws.onopen = (event) => {
console.log('Connected to server');
ws.send('Hello Server!');
};
// 接收消息
ws.onmessage = (event) => {
console.log('Received:', event.data);
// 如果是JSON
try {
const data = JSON.parse(event.data);
handleMessage(data);
} catch (e) {
console.log('Non-JSON message:', event.data);
}
};
// 错误处理
ws.onerror = (error) => {
console.error('WebSocket Error:', error);
};
// 连接关闭
ws.onclose = (event) => {
console.log('Disconnected:', event.code, event.reason);
// 重连逻辑
if (event.code !== 1000) { // 非正常关闭
setTimeout(reconnect, 3000);
}
};
// 发送二进制数据
const buffer = new ArrayBuffer(8);
ws.send(buffer);
// 检查连接状态
console.log(ws.readyState);
// 0: CONNECTING
// 1: OPEN
// 2: CLOSING
// 3: CLOSED
3.2 高级封装
class ReconnectingWebSocket {
constructor(url, options = {}) {
this.url = url;
this.reconnectInterval = options.reconnectInterval || 3000;
this.maxReconnectAttempts = options.maxReconnectAttempts || 10;
this.reconnectAttempts = 0;
this.messageQueue = [];
this.handlers = {
open: [],
message: [],
error: [],
close: []
};
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = (event) => {
console.log('Connected');
this.reconnectAttempts = 0;
// 发送积压的消息
while (this.messageQueue.length > 0) {
this.ws.send(this.messageQueue.shift());
}
this.handlers.open.forEach(handler => handler(event));
};
this.ws.onmessage = (event) => {
this.handlers.message.forEach(handler => handler(event));
};
this.ws.onerror = (error) => {
console.error('Error:', error);
this.handlers.error.forEach(handler => handler(error));
};
this.ws.onclose = (event) => {
console.log('Closed:', event.code);
this.handlers.close.forEach(handler => handler(event));
// 自动重连
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
console.log(`Reconnecting... Attempt ${this.reconnectAttempts}`);
setTimeout(() => this.connect(), this.reconnectInterval);
} else {
console.error('Max reconnection attempts reached');
}
};
}
send(data) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(data);
} else {
// 连接未就绪,加入队列
this.messageQueue.push(data);
}
}
on(event, handler) {
if (this.handlers[event]) {
this.handlers[event].push(handler);
}
}
close() {
this.maxReconnectAttempts = 0; // 禁止重连
this.ws.close();
}
}
// 使用
const socket = new ReconnectingWebSocket('wss://example.com/socket');
socket.on('message', (event) => {
console.log('Received:', event.data);
});
socket.send('Hello!');
四、服务端实现
4.1 Node.js + ws库
const WebSocket = require('ws');
const http = require('http');
// 创建HTTP服务器
const server = http.createServer((req, res) => {
res.writeHead(200);
res.end('WebSocket server running');
});
// 创建WebSocket服务器
const wss = new WebSocket.Server({ server });
// 客户端连接
wss.on('connection', (ws, req) => {
const clientIP = req.socket.remoteAddress;
console.log(`Client connected: ${clientIP}`);
// 发送欢迎消息
ws.send(JSON.stringify({
type: 'welcome',
message: 'Connected to server',
timestamp: Date.now()
}));
// 接收消息
ws.on('message', (data) => {
console.log('Received:', data.toString());
try {
const message = JSON.parse(data);
handleMessage(ws, message);
} catch (e) {
ws.send(JSON.stringify({type: 'error', message: 'Invalid JSON'}));
}
});
// 错误处理
ws.on('error', (error) => {
console.error('WebSocket error:', error);
});
// 连接关闭
ws.on('close', () => {
console.log(`Client disconnected: ${clientIP}`);
});
// 心跳
ws.isAlive = true;
ws.on('pong', () => {
ws.isAlive = true;
});
});
// 心跳检测(每30秒)
setInterval(() => {
wss.clients.forEach((ws) => {
if (ws.isAlive === false) {
return ws.terminate(); // 断开僵尸连接
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
// 广播消息给所有客户端
function broadcast(data) {
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
}
// 启动服务器
server.listen(8080, () => {
console.log('Server running on http://localhost:8080');
});
4.2 房间/频道管理
class RoomManager {
constructor() {
this.rooms = new Map(); // roomId -> Set of ws connections
}
join(ws, roomId) {
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Set());
}
this.rooms.get(roomId).add(ws);
ws.roomId = roomId;
// 通知房间其他人
this.broadcast(roomId, {
type: 'user_joined',
roomId: roomId,
userCount: this.rooms.get(roomId).size
}, ws);
}
leave(ws) {
if (ws.roomId && this.rooms.has(ws.roomId)) {
this.rooms.get(ws.roomId).delete(ws);
// 通知剩余成员
this.broadcast(ws.roomId, {
type: 'user_left',
userCount: this.rooms.get(ws.roomId).size
});
// 如果房间空了,删除
if (this.rooms.get(ws.roomId).size === 0) {
this.rooms.delete(ws.roomId);
}
}
}
broadcast(roomId, message, excludeWs = null) {
if (!this.rooms.has(roomId)) return;
const data = JSON.stringify(message);
this.rooms.get(roomId).forEach((client) => {
if (client !== excludeWs && client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
}
sendTo(ws, message) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
}
}
}
// 使用
const roomManager = new RoomManager();
wss.on('connection', (ws) => {
ws.on('message', (data) => {
const message = JSON.parse(data);
switch (message.type) {
case 'join_room':
roomManager.join(ws, message.roomId);
break;
case 'leave_room':
roomManager.leave(ws);
break;
case 'chat_message':
roomManager.broadcast(ws.roomId, {
type: 'chat_message',
username: ws.username,
text: message.text,
timestamp: Date.now()
});
break;
}
});
ws.on('close', () => {
roomManager.leave(ws);
});
});
五、规模化架构
5.1 单机限制
问题:
- Node.js单进程:~10k并发连接
- 内存:每个连接 ~10-50KB
- CPU:消息广播时计算密集
5.2 垂直扩展
多进程:
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
const numCPUs = os.cpus().length;
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker) => {
console.log(`Worker ${worker.process.pid} died, restarting...`);
cluster.fork();
});
} else {
// 启动WebSocket服务器(如前)
startWebSocketServer();
}
问题:不同进程的客户端无法互相通信
5.3 水平扩展 + Redis Pub/Sub
架构:
客户端1 → 服务器A ↘
客户端2 → 服务器A → Redis Pub/Sub → 服务器B → 客户端3
客户端4 → 服务器B ↗
实现:
const Redis = require('ioredis');
const redis = new Redis();
const redisSub = new Redis();
// 订阅Redis频道
redisSub.subscribe('chat_messages');
redisSub.on('message', (channel, message) => {
// 收到其他服务器发布的消息,广播给本机客户端
broadcast(message);
});
// 客户端发送消息时
ws.on('message', (data) => {
const message = JSON.parse(data);
// 发布到Redis
redis.publish('chat_messages', JSON.stringify({
...message,
serverId: process.env.SERVER_ID
}));
// 本地也广播(避免往返Redis延迟)
broadcast(JSON.stringify(message));
});
function broadcast(data) {
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
}
5.4 负载均衡
问题:WebSocket是长连接,普通负载均衡会有问题
解决:粘性会话(Sticky Session)
Nginx配置:
upstream websocket {
ip_hash; # 基于IP的粘性会话
server 192.168.1.1:8080;
server 192.168.1.2:8080;
server 192.168.1.3:8080;
}
server {
listen 80;
server_name ws.example.com;
location /socket {
proxy_pass http://websocket;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_read_timeout 86400; # 24小时超时
}
}
六、安全考虑
6.1 认证与授权
Token认证:
// 客户端:在URL中携带token
const ws = new WebSocket(`wss://example.com/socket?token=${userToken}`);
// 服务器:验证token
wss.on('connection', (ws, req) => {
const url = new URL(req.url, 'ws://base');
const token = url.searchParams.get('token');
if (!verifyToken(token)) {
ws.close(4001, 'Unauthorized');
return;
}
ws.userId = getUserIdFromToken(token);
ws.authenticated = true;
// 后续处理...
});
// 或:握手后发送认证消息
ws.on('message', (data) => {
const message = JSON.parse(data);
if (!ws.authenticated) {
if (message.type === 'auth') {
if (verifyCredentials(message.username, message.password)) {
ws.authenticated = true;
ws.userId = message.username;
ws.send(JSON.stringify({type: 'auth_success'}));
} else {
ws.close(4001, 'Authentication failed');
}
} else {
ws.send(JSON.stringify({type: 'error', message: 'Not authenticated'}));
}
return;
}
// 处理已认证用户的消息...
});
6.2 速率限制
const rateLimit = new Map(); // userId -> {count, resetTime}
function checkRateLimit(ws) {
const userId = ws.userId;
const now = Date.now();
const limit = 100; // 每分钟100条消息
const window = 60000; // 1分钟
if (!rateLimit.has(userId)) {
rateLimit.set(userId, {count: 1, resetTime: now + window});
return true;
}
const userLimit = rateLimit.get(userId);
if (now > userLimit.resetTime) {
userLimit.count = 1;
userLimit.resetTime = now + window;
return true;
}
if (userLimit.count >= limit) {
ws.send(JSON.stringify({
type: 'error',
message: 'Rate limit exceeded'
}));
return false;
}
userLimit.count++;
return true;
}
ws.on('message', (data) => {
if (!checkRateLimit(ws)) return;
// 处理消息...
});
6.3 消息验证
const Joi = require('joi');
const messageSchema = Joi.object({
type: Joi.string().required(),
roomId: Joi.string().max(50),
text: Joi.string().max(1000),
// ... 其他字段
});
ws.on('message', (data) => {
try {
const message = JSON.parse(data);
const {error} = messageSchema.validate(message);
if (error) {
ws.send(JSON.stringify({
type: 'error',
message: 'Invalid message format'
}));
return;
}
// 处理消息...
} catch (e) {
ws.send(JSON.stringify({
type: 'error',
message: 'Invalid JSON'
}));
}
});
七、监控与调试
7.1 关键指标
const metrics = {
connections: 0,
messagesReceived: 0,
messagesSent: 0,
errors: 0,
bytesReceived: 0,
bytesSent: 0
};
wss.on('connection', (ws) => {
metrics.connections++;
ws.on('message', (data) => {
metrics.messagesReceived++;
metrics.bytesReceived += data.length;
});
ws.on('close', () => {
metrics.connections--;
});
ws.on('error', () => {
metrics.errors++;
});
});
// 定期输出统计
setInterval(() => {
console.log('Metrics:', {
connections: metrics.connections,
messagesPerSecond: metrics.messagesReceived / 60,
errorRate: metrics.errors / (metrics.messagesReceived || 1)
});
// 重置计数器
metrics.messagesReceived = 0;
metrics.messagesSent = 0;
}, 60000);
7.2 日志记录
const winston = require('winston');
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [
new winston.transports.File({ filename: 'error.log', level: 'error' }),
new winston.transports.File({ filename: 'combined.log' })
]
});
wss.on('connection', (ws, req) => {
const clientIP = req.socket.remoteAddress;
logger.info('Client connected', { ip: clientIP, timestamp: Date.now() });
ws.on('message', (data) => {
logger.debug('Message received', {
ip: clientIP,
size: data.length,
preview: data.toString().substring(0, 100)
});
});
ws.on('error', (error) => {
logger.error('WebSocket error', {
ip: clientIP,
error: error.message,
stack: error.stack
});
});
});
八、最佳实践总结
✅ DO
- 使用WSS(加密):
wss://而非ws:// - 实现心跳:检测连接健康,清理僵尸连接
- 自动重连:客户端断线后重新连接
- 消息队列:连接未就绪时缓存消息
- 认证授权:验证用户身份,控制权限
- 速率限制:防止滥用
- 消息压缩:减少带宽(扩展permessage-deflate)
- 监控日志:追踪性能和问题
❌ DON'T
- 不要信任客户端输入:始终验证
- 不要在WebSocket中传输敏感数据:使用加密
- 不要同步处理消息:使用异步/队列
- 不要忽略错误:捕获并记录
- 不要无限重连:设置最大次数和指数退避
- 不要在生产环境使用
ws://:必须WSS
九、与其他技术对比
| 特性 | WebSocket | Server-Sent Events | Long Polling | gRPC Streaming |
|---|---|---|---|---|
| 双向通信 | ✅ | ❌(单向) | ❌ | ✅ |
| 浏览器支持 | ✅ 广泛 | ✅ 广泛 | ✅ 全部 | ❌(需gRPC-Web) |
| 服务器推送 | ✅ | ✅ | 模拟 | ✅ |
| 开销 | 低 | 中 | 高 | 低 |
| 复杂度 | 中 | 低 | 低 | 高 |
| 适用场景 | 聊天、游戏、协作 | 通知、新闻流 | 兼容性要求高 | 微服务间通信 |
总结
WebSocket是实时双向通信的标准解决方案,适用于:
- ✅ 聊天应用
- ✅ 实时仪表盘
- ✅ 多人游戏
- ✅ 协作编辑
- ✅ 实时通知
关键要点:
- 理解协议握手和帧格式
- 实现心跳和重连机制
- 规模化需要Redis Pub/Sub + 负载均衡
- 安全包括认证、速率限制、输入验证
- 监控连接数、消息速率、错误率
WebSocket使持续内容生成系统能够实时推送给用户,是现代实时应用的基石。