官网:https://docs.nestjs.cn/websockets/gateways
src/
├── app.module.ts
├── main.ts
├── websocket/
│ ├── websocket.module.ts
│ ├── websocket.gateway.ts
│ ├── websocket.service.ts
│ ├── dto/
│ │ ├── message.dto.ts
│ │ └── join-room.dto.ts
│ ├── interfaces/
│ │ └── socket-user.interface.ts
│ └── adapters/
│ └── redis-adapter.ts
└── chat/
└── chat.controller.ts
安装依赖
npm install @nestjs/websockets @nestjs/platform-socket.io socket.io
npm install -D @types/socket.io
# 可选:Redis 适配器
npm install socket.io-redis @types/socket.io-redis
- 适配器(Adapter)在 NestJS 中主要用于 协议转换和底层库抽象
- HTTP 适配器 – 适配 Express/Fastify
- WebSocket 适配器 – 适配 Socket.io/ws
- GraphQL 适配器 – 适配 Apollo/Mercurius
服务端代码
# 网关实现
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
MessageBody,
ConnectedSocket,
OnGatewayInit,
OnGatewayConnection,
OnGatewayDisconnect,
} from '@nestjs/websockets';
import {Server, Socket} from 'socket.io';
import {Logger, UseFilters, UseGuards, UsePipes, ValidationPipe} from '@nestjs/common';
import {WebSocketService} from './websocket.service';
import {SendMessageDto} from './dto/message.dto';
import {JoinRoomDto} from './dto/join-room.dto';
import {WsJwtGuard} from './guards/ws-jwt.guard';
import {WsExceptionFilter} from './filters/ws-exception.filter';
@WebSocketGateway({
cors: {
origin: '*', // 生产环境请配置具体域名
credentials: true,
},
namespace: '/chat',
transports: ['websocket', 'polling'],
})
@UseFilters(WsExceptionFilter)
@UsePipes(new ValidationPipe())
export class WebSocketGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {@WebSocketServer()
server: Server;
private logger: Logger = new Logger('WebSocketGateway');
constructor(private readonly webSocketService: WebSocketService) {}
afterInit(server: Server) {this.logger.log('WebSocket Gateway Initialized');
// 设置最大监听器数量
server.sockets.setMaxListeners(20);
}
async handleConnection(client: Socket) {
try {
// 从查询参数获取 token
const token = client.handshake.query.token as string;
// 验证用户
const user = await this.webSocketService.validateUser(token);
if (!user) {client.disconnect();
return;
}
// 存储用户信息到 socket
client.data.user = user;
// 加入用户专属房间
client.join(`user_${user.id}`);
// 更新在线用户列表
await this.webSocketService.addOnlineUser(user.id, client.id);
// 广播用户上线通知
this.server.emit('user-online', {
userId: user.id,
username: user.username,
timestamp: new Date(),});
this.logger.log(`Client connected: ${client.id}, User: ${user.username}`);
// 发送连接成功消息
client.emit('connected', {
message: 'Connected successfully',
userId: user.id,
serverTime: new Date(),});
} catch (error) {this.logger.error(`Connection error: ${error.message}`);
client.disconnect();}
}
@SubscribeMessage('join-room')
@UseGuards(WsJwtGuard)
async handleJoinRoom(@ConnectedSocket() client: Socket,
@MessageBody() joinRoomDto: JoinRoomDto,) {const { roomId} = joinRoomDto;
const user = client.data.user;
// 离开之前加入的房间
const previousRooms = Array.from(client.rooms).filter(room => room.startsWith('room_'));
previousRooms.forEach(room => client.leave(room));
// 加入新房间
client.join(`room_${roomId}`);
// 通知房间内其他用户
client.to(`room_${roomId}`).emit('user-joined', {
userId: user.id,
username: user.username,
roomId,
timestamp: new Date(),});
}
@SubscribeMessage('read-receipt')
@UseGuards(WsJwtGuard)
async handleReadReceipt(@ConnectedSocket() client: Socket,
@MessageBody() data: { messageId: string; roomId: string},
) {
const user = client.data.user;
// 更新消息已读状态
await this.webSocketService.markMessageAsRead(data.messageId, user.id);
// 广播已读回执
this.server.to(`room_${data.roomId}`).emit('message-read', {
messageId: data.messageId,
userId: user.id,
username: user.username,
readAt: new Date(),});
}
@SubscribeMessage('get-online-users')
@UseGuards(WsJwtGuard)
async handleGetOnlineUsers(@ConnectedSocket() client: Socket) {const onlineUsers = await this.webSocketService.getOnlineUsers();
client.emit('online-users-list', onlineUsers);
}
@SubscribeMessage('ping')
handlePing(@ConnectedSocket() client: Socket) {client.emit('pong', { timestamp: new Date().toISOString()});
}
// 服务端主动推送消息的方法
public sendNotificationToUser(userId: string, notification: any) {this.server.to(`user_${userId}`).emit('notification', notification);
}
public broadcastToRoom(roomId: string, event: string, data: any) {this.server.to(`room_${roomId}`).emit(event, data);
}
}
src/websocket/dto/message.dto.ts
import {IsString, IsEnum, IsOptional, IsNotEmpty, MinLength, MaxLength} from 'class-validator';
export enum MessageType {
TEXT = 'text',
IMAGE = 'image',
FILE = 'file',
SYSTEM = 'system',
}
export class SendMessageDto {@IsString()
@IsNotEmpty()
roomId: string;
@IsString()
@IsNotEmpty()
@MinLength(1)
@MaxLength(2000)
content: string;
@IsEnum(MessageType)
@IsOptional()
type?: MessageType = MessageType.TEXT;
@IsOptional()
metadata?: Record<string, any>;
}
export class JoinRoomDto {@IsString()
@IsNotEmpty()
roomId: string;
}
src/websocket/websocket.service.ts
import {Injectable} from '@nestjs/common';
import {InjectRepository} from '@nestjs/typeorm';
import {Repository} from 'typeorm';
import {Message} from './entities/message.entity';
import {RedisService} from '../redis/redis.service';
@Injectable()
export class WebSocketService {private onlineUsers = new Map<string, Set<string>>(); // userId -> socketIds
constructor(@InjectRepository(Message)
private messageRepository: Repository<Message>,
private redisService: RedisService,
) {}
async validateUser(token: string): Promise<any> {
// 这里实现 JWT 验证逻辑
// 返回用户信息或 null
try {
// 示例:解码 token 获取用户信息
// const payload = this.jwtService.verify(token);
// return {id: payload.sub, username: payload.username};
return {id: '1', username: 'testuser', avatar: 'avatar-url'};
} catch {return null;}
}
async addOnlineUser(userId: string, socketId: string): Promise<void> {if (!this.onlineUsers.has(userId)) {this.onlineUsers.set(userId, new Set());
}
this.onlineUsers.get(userId).add(socketId);
// 同时存储到 Redis(用于集群部署)await this.redisService.addOnlineUser(userId, socketId);
}
async removeOnlineUser(userId: string, socketId: string): Promise<void> {const sockets = this.onlineUsers.get(userId);
if (sockets) {sockets.delete(socketId);
if (sockets.size === 0) {this.onlineUsers.delete(userId);
}
}
// 从 Redis 移除
await this.redisService.removeOnlineUser(userId, socketId);
}
async getOnlineUsers(): Promise<any[]> {const users = [];
for (const [userId, sockets] of this.onlineUsers.entries()) {if (sockets.size> 0) {
users.push({
userId,
socketCount: sockets.size,
// 可以添加更多用户信息
});
}
}
return users;
}
async saveMessage(messageData: {
senderId: string;
roomId: string;
content: string;
type: string;
}): Promise<Message> {const message = this.messageRepository.create(messageData);
return await this.messageRepository.save(message);
}
async markMessageAsRead(messageId: string, userId: string): Promise<void> {
await this.messageRepository
.createQueryBuilder()
.update(Message)
.set({readBy: () => `array_append(read_by, '${userId}')` })
.where('id = :messageId', { messageId})
.execute();}
async getMessageHistory(roomId: string, page = 1, limit = 50): Promise<Message[]> {
return await this.messageRepository.find({where: { roomId},
order: {createdAt: 'DESC'},
skip: (page - 1) * limit,
take: limit,
});
}
}
src/websocket/websocket.module.ts
import {Module} from '@nestjs/common';
import {TypeOrmModule} from '@nestjs/typeorm';
import {WebSocketGateway} from './websocket.gateway';
import {WebSocketService} from './websocket.service';
import {Message} from './entities/message.entity';
import {RedisModule} from '../redis/redis.module';
@Module({
imports: [
TypeOrmModule.forFeature([Message]),
RedisModule,
],
providers: [WebSocketGateway, WebSocketService],
exports: [WebSocketGateway, WebSocketService],
})
export class WebSocketModule {}
src/app.module.ts
import {Module} from '@nestjs/common';
import {TypeOrmModule} from '@nestjs/typeorm';
import {ConfigModule, ConfigService} from '@nestjs/config';
import {WebSocketModule} from './websocket/websocket.module';
import {ChatModule} from './chat/chat.module';
@Module({
imports: [
ConfigModule.forRoot({isGlobal: true,}),
TypeOrmModule.forRootAsync({imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
type: 'postgres',
host: configService.get('DB_HOST'),
port: configService.get('DB_PORT'),
username: configService.get('DB_USERNAME'),
password: configService.get('DB_PASSWORD'),
database: configService.get('DB_DATABASE'),
entities: [__dirname + '/**/*.entity{.ts,.js}'],
synchronize: configService.get('NODE_ENV') !== 'production',
}),
inject: [ConfigService],
}),
WebSocketModule,
ChatModule,
],
})
export class AppModule {}
/src/main.ts
# main.ts 注册适配器
app.useWebSocketAdapter(new WsAdapter(app))
客户端代码
import {io} from "socket.io-client";
class WebSocketService {constructor() {this.socket = io("http://localhost:3000");
this.setupListeners();}
setupListeners() {
// 监听服务器返回的消息
this.socket.on("operation-result", (data) => {console.log("Operation result:", data);
});
this.socket.on("private-message", (data) => {this.handlePrivateMessage(data);
});
}
// 加入房间
joinRoom(roomId, userId) {this.socket.emit("join-room", { roomId, userId});
}
// 离开房间
leaveRoom(roomId) {this.socket.emit("leave-room", roomId);
}
writing() {// 显示用户正在输入
this.socket.on('user-typing', (data) => {console.log(`${data.username} is typing...`);
});
}
// 发送私信
sendPrivateMessage(toUserId, message) {
this.socket.emit("private-message", {
to: toUserId,
message,
timestamp: new Date().toISOString()
});
}
// 批量操作
sendBulkMessage(userIds, content) {
this.socket.emit("bulk-message", {
userIds,
content,
type: "notification"
});
}
// 高频事件(带节流)throttledEmit = this.throttle((eventName, data) => {this.socket.emit(eventName, data);
}, 1000);
// 风险操作(需要确认)performRiskyOperation(operation, options = {}) {return new Promise((resolve, reject) => {
this.socket.emit("risky-operation", {
operation,
...options
});
// 监听服务器响应
this.socket.once("operation-success", resolve);
this.socket.once("operation-error", reject);
});
}
}
执行流程
客户端发送流程:
- socket.emit(‘event-name’, data)
↓ - 消息序列化为 WebSocket 帧
↓ - 通过 TCP/IP 发送到服务器
↓ - NestJS WebSocket 适配器接收
↓ - 路由到对应的 @SubscribeMessage 装饰器方法
↓ - 执行业务逻辑
↓ - 返回响应给客户端(可选)
服务器响应:
- 方法中使用 return 返回数据
- 使用 client.emit() 主动发送
- 使用 this.server.emit() 广播
注意:事件的名称要区分大小写,名称要完全一致, 否则服务端无法处理事件
正文完