跳到主要内容

实时系统架构设计方案集

目录


架构选型决策树

开始

├─ 用户量 < 1000 ────► 单体应用 + SQLite
│ (成本: $0-20/月)

├─ 用户量 1K-10K ────► 单体应用 + PostgreSQL + Redis
│ (成本: $50-200/月)

├─ 用户量 10K-100K ──► 微服务 + 负载均衡
│ (成本: $500-2000/月)

├─ 用户量 100K-1M ───► 微服务 + 消息队列 + CDN
│ (成本: $2K-10K/月)

└─ 用户量 > 1M ──────► 分布式架构 + 边缘计算
(成本: $10K+/月)

关键决策因素:

  1. 并发连接数 - WebSocket能支撑多少同时在线
  2. 消息吞吐量 - 每秒处理多少条消息
  3. 延迟要求 - P95延迟目标(100ms? 1s? 10s?)
  4. 预算约束 - 能承受的月度成本
  5. 团队技能 - 团队对技术栈的熟悉程度

方案1: 单体应用架构

适用场景

  • MVP产品
  • 用户量 < 10K
  • 团队 1-3人
  • 预算 < $200/月

架构图

┌─────────────────────────────────────┐
│ 客户端 (浏览器) │
│ - React/Vue │
│ - WebSocket客户端 │
└──────────────┬──────────────────────┘
│ WSS / HTTPS
┌──────────────▼──────────────────────┐
│ Nginx (反向代理) │
│ - SSL终止 │
│ - WebSocket升级 │
└──────────────┬──────────────────────┘

┌──────────────▼──────────────────────┐
│ Node.js 单进程应用 │
│ ┌─────────────────────────────┐ │
│ │ Express + Socket.io │ │
│ ├─────────────────────────────┤ │
│ │ 业务逻辑 │ │
│ ├─────────────────────────────┤ │
│ │ 内存状态管理 │ │
│ └─────────────────────────────┘ │
└──────────────┬──────────────────────┘

┌──────────────▼──────────────────────┐
│ PostgreSQL + Redis │
│ - 持久化存储 │
│ - 缓存层 │
└─────────────────────────────────────┘

代码实现

// server.ts - 完整的单体应用
import express from 'express';
import { createServer } from 'http';
import { Server } from 'socket.io';
import { PrismaClient } from '@prisma/client';
import Redis from 'ioredis';

const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer, {
cors: { origin: process.env.CLIENT_URL }
});

const db = new PrismaClient();
const redis = new Redis(process.env.REDIS_URL);

// 内存中的连接管理
const connections = new Map<string, Set<string>>();

// WebSocket连接处理
io.on('connection', (socket) => {
console.log('Client connected:', socket.id);

// 加入房间
socket.on('join_room', async (roomId: string) => {
socket.join(roomId);

// 记录连接
if (!connections.has(roomId)) {
connections.set(roomId, new Set());
}
connections.get(roomId)!.add(socket.id);

// 广播房间人数
io.to(roomId).emit('room_count', connections.get(roomId)!.size);
});

// 接收消息
socket.on('message', async (data) => {
const { roomId, content } = data;

// 保存到数据库
const message = await db.message.create({
data: {
roomId,
content,
userId: socket.data.userId,
createdAt: new Date()
}
});

// 缓存最近消息
await redis.lpush(`room:${roomId}:messages`, JSON.stringify(message));
await redis.ltrim(`room:${roomId}:messages`, 0, 99); // 只保留最近100条

// 广播给房间所有人
io.to(roomId).emit('new_message', message);
});

// 断开连接
socket.on('disconnect', () => {
// 清理连接记录
connections.forEach((users, roomId) => {
if (users.has(socket.id)) {
users.delete(socket.id);
io.to(roomId).emit('room_count', users.size);
}
});
});
});

// REST API
app.get('/api/rooms/:id/messages', async (req, res) => {
const { id } = req.params;

// 先从缓存读取
const cached = await redis.lrange(`room:${id}:messages`, 0, 99);
if (cached.length > 0) {
return res.json(cached.map(JSON.parse));
}

// 缓存未命中,从数据库读取
const messages = await db.message.findMany({
where: { roomId: id },
orderBy: { createdAt: 'desc' },
take: 100
});

res.json(messages);
});

// 启动服务器
const PORT = process.env.PORT || 3000;
httpServer.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});

部署配置

Dockerfile:

FROM node:18-alpine

WORKDIR /app

COPY package*.json ./
RUN npm ci --only=production

COPY . .
RUN npm run build

EXPOSE 3000

CMD ["node", "dist/server.js"]

docker-compose.yml:

version: '3.8'

services:
app:
build: .
ports:
- "3000:3000"
environment:
DATABASE_URL: postgresql://user:pass@db:5432/myapp
REDIS_URL: redis://redis:6379
depends_on:
- db
- redis

db:
image: postgres:15-alpine
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: myapp
volumes:
- pgdata:/var/lib/postgresql/data

redis:
image: redis:7-alpine
volumes:
- redisdata:/data

volumes:
pgdata:
redisdata:

性能指标

指标目标值
并发连接数5,000
消息吞吐量1,000/秒
P95延迟< 100ms
内存使用< 512MB
CPU使用< 50%

成本估算

Render.com部署:

  • Web Service: $7/月 (512MB RAM)
  • PostgreSQL: $7/月 (1GB)
  • Redis: $10/月 (256MB)
  • 总计: $24/月

方案2: 微服务架构

适用场景

  • 用户量 10K-100K
  • 团队 3-10人
  • 需要独立扩展不同模块
  • 预算 $500-2000/月

架构图

                    ┌─────────────┐
│ 客户端 │
└──────┬──────┘

┌──────▼──────┐
│ CDN/Nginx │
└──────┬──────┘

┌────────────────┼────────────────┐
│ │ │
┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
│ API网关 │ │ WebSocket │ │ 静态资源 │
│ (REST) │ │ 服务 │ │ 服务 │
└─────┬─────┘ └─────┬─────┘ └───────────┘
│ │
│ ┌──────▼──────┐
│ │ Redis │
│ │ Pub/Sub │
│ └──────┬──────┘
│ │
┌─────▼────────────────▼─────┐
│ 消息队列 (Kafka) │
└─────┬────────────────┬─────┘
│ │
┌─────▼─────┐ ┌─────▼─────┐
│ 用户服务 │ │ 消息服务 │
└─────┬─────┘ └─────┬─────┘
│ │
┌─────▼────────────────▼─────┐
│ PostgreSQL (主) │
│ └─► 读副本1 │
│ └─► 读副本2 │
└───────────────────────────┘

服务拆分

1. API网关服务

// api-gateway/src/index.ts
import express from 'express';
import { createProxyMiddleware } from 'http-proxy-middleware';

const app = express();

// 路由到不同的微服务
app.use('/api/users', createProxyMiddleware({
target: 'http://user-service:3001',
changeOrigin: true
}));

app.use('/api/messages', createProxyMiddleware({
target: 'http://message-service:3002',
changeOrigin: true
}));

app.use('/api/rooms', createProxyMiddleware({
target: 'http://room-service:3003',
changeOrigin: true
}));

// 统一认证中间件
app.use(async (req, res, next) => {
const token = req.headers.authorization?.split(' ')[1];
if (!token) return res.status(401).json({ error: 'Unauthorized' });

try {
const user = await verifyToken(token);
req.user = user;
next();
} catch (error) {
res.status(401).json({ error: 'Invalid token' });
}
});

app.listen(3000);

2. WebSocket服务(可水平扩展)

// websocket-service/src/index.ts
import { Server } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import Redis from 'ioredis';
import { Kafka } from 'kafkajs';

const io = new Server(8080);

// Redis适配器实现多实例同步
const pubClient = new Redis(process.env.REDIS_URL);
const subClient = pubClient.duplicate();
io.adapter(createAdapter(pubClient, subClient));

// Kafka生产者
const kafka = new Kafka({
clientId: 'websocket-service',
brokers: [process.env.KAFKA_BROKER]
});
const producer = kafka.producer();

io.on('connection', (socket) => {
socket.on('message', async (data) => {
// 发送到Kafka进行异步处理
await producer.send({
topic: 'messages',
messages: [{ value: JSON.stringify(data) }]
});

// 立即广播(乐观更新)
io.to(data.roomId).emit('new_message', data);
});
});

// Kafka消费者监听消息持久化完成事件
const consumer = kafka.consumer({ groupId: 'websocket-group' });
consumer.subscribe({ topic: 'messages-persisted' });
consumer.run({
eachMessage: async ({ message }) => {
const msg = JSON.parse(message.value.toString());
// 确认消息已持久化
io.to(msg.roomId).emit('message_confirmed', msg.id);
}
});

3. 消息服务(处理业务逻辑)

// message-service/src/index.ts
import { Kafka } from 'kafkajs';
import { PrismaClient } from '@prisma/client';

const kafka = new Kafka({
clientId: 'message-service',
brokers: [process.env.KAFKA_BROKER]
});

const consumer = kafka.consumer({ groupId: 'message-processors' });
const producer = kafka.producer();
const db = new PrismaClient();

consumer.subscribe({ topic: 'messages' });

consumer.run({
eachMessage: async ({ message }) => {
const data = JSON.parse(message.value.toString());

try {
// 持久化消息
const savedMessage = await db.message.create({
data: {
roomId: data.roomId,
userId: data.userId,
content: data.content,
createdAt: new Date()
}
});

// 内容审核(调用AI API)
const isApproved = await moderateContent(data.content);

if (!isApproved) {
// 发送删除事件
await producer.send({
topic: 'messages-deleted',
messages: [{ value: JSON.stringify({ id: savedMessage.id }) }]
});
} else {
// 发送持久化完成事件
await producer.send({
topic: 'messages-persisted',
messages: [{ value: JSON.stringify(savedMessage) }]
});
}
} catch (error) {
console.error('Failed to process message:', error);
// 发送到死信队列
await producer.send({
topic: 'messages-dlq',
messages: [{ value: message.value }]
});
}
}
});

Kubernetes部署

# k8s/websocket-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: websocket-service
spec:
replicas: 3 # 3个实例
selector:
matchLabels:
app: websocket
template:
metadata:
labels:
app: websocket
spec:
containers:
- name: websocket
image: myregistry/websocket-service:latest
ports:
- containerPort: 8080
env:
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: redis-secret
key: url
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
---
apiVersion: v1
kind: Service
metadata:
name: websocket-service
spec:
type: LoadBalancer
selector:
app: websocket
ports:
- port: 80
targetPort: 8080

性能指标

指标目标值
并发连接数50,000
消息吞吐量10,000/秒
P95延迟< 50ms
可用性99.9%

成本估算

AWS/GCP部署:

  • API网关: 2 × t3.small = $30/月
  • WebSocket服务: 3 × t3.medium = $150/月
  • 微服务: 4 × t3.small = $60/月
  • RDS PostgreSQL: db.t3.medium = $60/月
  • ElastiCache Redis: cache.t3.medium = $50/月
  • Kafka (MSK): kafka.t3.small × 3 = $300/月
  • 负载均衡: $20/月
  • 总计: ~$670/月

方案3: Serverless架构

适用场景

  • 流量波动大(间歇性高峰)
  • 团队小但技术能力强
  • 希望零运维
  • 按使用付费

架构图

┌─────────────┐
│ 客户端 │
└──────┬──────┘

┌──────▼──────────────────────────┐
│ Cloudflare Workers (边缘) │
│ - 路由 │
│ - 认证 │
│ - 限流 │
└──────┬──────────────────────────┘

├─► Lambda (API处理)
│ └─► DynamoDB

├─► API Gateway WebSocket
│ └─► Lambda (连接管理)
│ └─► DynamoDB (连接表)

└─► S3 (静态资源)

实现示例

1. Cloudflare Worker(边缘层)

// cloudflare-worker/src/index.ts
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);

// 限流
const clientIP = request.headers.get('CF-Connecting-IP');
const rateLimitKey = `ratelimit:${clientIP}`;
const count = await env.KV.get(rateLimitKey);

if (count && parseInt(count) > 100) {
return new Response('Rate limit exceeded', { status: 429 });
}

await env.KV.put(rateLimitKey, (parseInt(count || '0') + 1).toString(), {
expirationTtl: 60 // 1分钟过期
});

// 路由到后端
if (url.pathname.startsWith('/api/')) {
return fetch(`${env.API_URL}${url.pathname}`, request);
}

// 静态资源走S3
if (url.pathname.startsWith('/assets/')) {
return fetch(`${env.S3_URL}${url.pathname}`);
}

return new Response('Not found', { status: 404 });
}
};

2. Lambda WebSocket处理

// lambda/websocket-handler.ts
import { APIGatewayProxyWebsocketHandlerV2 } from 'aws-lambda';
import { DynamoDBClient, PutItemCommand, DeleteItemCommand } from '@aws-sdk/client-dynamodb';

const dynamodb = new DynamoDBClient({});

export const connect: APIGatewayProxyWebsocketHandlerV2 = async (event) => {
const connectionId = event.requestContext.connectionId;

await dynamodb.send(new PutItemCommand({
TableName: 'WebSocketConnections',
Item: {
connectionId: { S: connectionId },
ttl: { N: String(Math.floor(Date.now() / 1000) + 7200) } // 2小时TTL
}
}));

return { statusCode: 200 };
};

export const disconnect: APIGatewayProxyWebsocketHandlerV2 = async (event) => {
const connectionId = event.requestContext.connectionId;

await dynamodb.send(new DeleteItemCommand({
TableName: 'WebSocketConnections',
Key: { connectionId: { S: connectionId } }
}));

return { statusCode: 200 };
};

export const message: APIGatewayProxyWebsocketHandlerV2 = async (event) => {
const data = JSON.parse(event.body || '{}');
const connectionId = event.requestContext.connectionId;

// 处理消息
// ... 业务逻辑 ...

return { statusCode: 200 };
};

3. Serverless框架配置

# serverless.yml
service: realtime-app

provider:
name: aws
runtime: nodejs18.x
region: us-east-1

functions:
connect:
handler: dist/websocket-handler.connect
events:
- websocket: $connect

disconnect:
handler: dist/websocket-handler.disconnect
events:
- websocket: $disconnect

message:
handler: dist/websocket-handler.message
events:
- websocket: $default

apiHandler:
handler: dist/api-handler.handler
events:
- httpApi:
path: /api/{proxy+}
method: ANY

resources:
Resources:
ConnectionsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: WebSocketConnections
AttributeDefinitions:
- AttributeName: connectionId
AttributeType: S
KeySchema:
- AttributeName: connectionId
KeyType: HASH
BillingMode: PAY_PER_REQUEST
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true

成本估算(按使用量)

示例:10K日活用户

  • Lambda调用: 1M次/月 × $0.20/1M = $0.20
  • Lambda运行时间: 100K GB-秒 × $0.0000166667 = $1.67
  • API Gateway WebSocket: 10M消息 × $1/1M = $10
  • DynamoDB: 1M读 + 1M写 = $1.25
  • Cloudflare Workers: 10M请求(免费额度内)= $0
  • 总计: ~$15/月(低流量时非常便宜)

高峰期(100K日活):

  • 约 $150/月(自动扩展,无需预留资源)

方案4: 混合架构

适用场景

  • 成熟产品,需要平衡成本和性能
  • 部分功能高频(WebSocket),部分低频(管理后台)
  • 希望渐进式迁移

架构设计

核心实时功能 ──► 自建K8S集群(可控成本)

├─ WebSocket服务
├─ 消息队列
└─ 时序数据库

低频API ──► Serverless (Lambda)

├─ 用户管理
├─ 报表生成
└─ 定时任务

静态资源 ──► CDN + S3

数据库 ──► 托管服务(RDS)

成本优化策略

  1. 高频WebSocket: 自建节点(Reserved Instances)
  2. 低频API: Lambda按需付费
  3. 存储: S3 + Intelligent-Tiering
  4. CDN: Cloudflare免费版

方案5: 边缘计算架构

适用场景

  • 全球用户
  • 延迟敏感(< 50ms)
  • 预算充足

架构图

全球用户

├─► 北美边缘节点 ─┐
├─► 欧洲边缘节点 ─┼─► 区域中心 ──► 中央数据中心
├─► 亚洲边缘节点 ─┘
└─► 其他边缘节点

技术选型

  • 边缘节点: Cloudflare Durable Objects
  • 区域中心: AWS Regional
  • 中央数据: AWS + 跨区域复制

示例代码:

// Durable Object (边缘状态管理)
export class ChatRoom {
private state: DurableObjectState;
private sessions: Set<WebSocket> = new Set();

constructor(state: DurableObjectState) {
this.state = state;
}

async fetch(request: Request): Promise<Response> {
const pair = new WebSocketPair();
await this.handleSession(pair[1]);
return new Response(null, { status: 101, webSocket: pair[0] });
}

async handleSession(ws: WebSocket) {
ws.accept();
this.sessions.add(ws);

ws.addEventListener('message', (event) => {
const message = event.data;
// 广播给房间内所有人
this.sessions.forEach(session => {
session.send(message);
});

// 异步持久化到中央数据库
this.state.waitUntil(
fetch('https://api.example.com/messages', {
method: 'POST',
body: message
})
);
});

ws.addEventListener('close', () => {
this.sessions.delete(ws);
});
}
}

扩展性方案

水平扩展清单

1. 应用层扩展

  • 使用无状态设计
  • 通过Redis共享session
  • WebSocket通过Redis Pub/Sub同步
  • 负载均衡配置健康检查

2. 数据库扩展

  • 读写分离(主从复制)
  • 连接池优化
  • 查询缓存(Redis)
  • 分库分表(Sharding)

3. 消息队列扩展

  • Kafka分区增加
  • 消费者组水平扩展
  • 死信队列处理

高可用方案

目标: 99.9%可用性

1. 多可用区部署

┌──────────────────────────────────┐
│ 可用区 A │
│ - App Server × 2 │
│ - DB Master │
│ - Redis Master │
└──────────────────────────────────┘

┌──────────────────────────────────┐
│ 可用区 B │
│ - App Server × 2 │
│ - DB Replica │
│ - Redis Replica │
└──────────────────────────────────┘

2. 故障转移策略

// 数据库自动故障转移
const dbConfig = {
replication: {
master: {
host: 'db-master.example.com',
port: 5432
},
slaves: [
{ host: 'db-replica-1.example.com', port: 5432 },
{ host: 'db-replica-2.example.com', port: 5432 }
]
},
pool: {
max: 20,
min: 5,
acquireTimeoutMillis: 30000,
// 自动重试
retry: {
max: 3,
timeout: 5000
}
}
};

// Redis Sentinel自动故障转移
const redisConfig = {
sentinels: [
{ host: 'sentinel-1', port: 26379 },
{ host: 'sentinel-2', port: 26379 },
{ host: 'sentinel-3', port: 26379 }
],
name: 'mymaster',
// 故障转移参数
sentinelRetryStrategy: (times) => {
return Math.min(times * 50, 2000);
}
};

3. 断路器模式

import CircuitBreaker from 'opossum';

const options = {
timeout: 3000,
errorThresholdPercentage: 50,
resetTimeout: 30000
};

const breaker = new CircuitBreaker(fetchData, options);

breaker.fallback(() => {
// 降级方案:返回缓存数据
return getCachedData();
});

breaker.on('open', () => {
console.warn('Circuit breaker opened');
alertOps('Service degraded');
});

成本优化方案

优化策略

1. 使用Spot/Preemptible实例

  • 非核心服务使用Spot实例(节省70%)
  • 配置自动替换机制

2. 资源Right-Sizing

# 监控资源使用
kubectl top nodes
kubectl top pods

# 调整资源限制
resources:
requests:
memory: "256Mi" # 实际需要的最小值
cpu: "250m"
limits:
memory: "512Mi" # 避免OOM
cpu: "500m" # 允许突发

3. 缓存策略

  • CDN缓存静态资源(减少源站流量)
  • Redis缓存热数据(减少数据库查询)
  • 浏览器缓存(减少请求次数)

4. 数据库优化

  • 使用索引(减少查询时间)
  • 定期清理旧数据(减少存储成本)
  • 使用合适的实例规格

成本对比(10K DAU):

方案月成本优点缺点
单体$30简单扩展性差
微服务$670可扩展复杂度高
Serverless$15-150弹性冷启动
混合$200平衡维护成本

架构检查清单

部署前确认:

  • 是否有监控和告警?
  • 是否有日志聚合?
  • 是否有备份策略?
  • 是否有灾难恢复计划?
  • 是否有负载测试?
  • 是否有安全审计?
  • 是否有成本预算?
  • 是否有文档?

选择建议:

  • MVP阶段: 方案1(单体)
  • 产品验证期: 方案3(Serverless)或方案4(混合)
  • 快速增长期: 方案2(微服务)
  • 全球化阶段: 方案5(边缘计算)

记住:过早优化是万恶之源。从最简单的方案开始,根据实际需求逐步演进。