Nestjs|webSocket

5次阅读
没有评论

官网:https://docs.nestjs.cn/websockets/gateways

src/
├── app.module.ts
├── main.ts
├── websocket/
│ ├── websocket.module.ts
│ ├── websocket.gateway.ts
│ ├── websocket.service.ts
│ ├── dto/
│ │ ├── message.dto.ts
│ │ └── join-room.dto.ts
│ ├── interfaces/
│ │ └── socket-user.interface.ts
│ └── adapters/
│ └── redis-adapter.ts
└── chat/
└── chat.controller.ts

安装依赖

npm install @nestjs/websockets @nestjs/platform-socket.io socket.io
npm install -D @types/socket.io
# 可选:Redis 适配器
npm install socket.io-redis @types/socket.io-redis
  • 适配器(Adapter)在 NestJS 中主要用于 协议转换和底层库抽象
    • HTTP 适配器 – 适配 Express/Fastify
    • WebSocket 适配器 – 适配 Socket.io/ws
    • GraphQL 适配器 – 适配 Apollo/Mercurius

服务端代码

# 网关实现
import {
  WebSocketGateway,
  WebSocketServer,
  SubscribeMessage,
  MessageBody,
  ConnectedSocket,
  OnGatewayInit,
  OnGatewayConnection,
  OnGatewayDisconnect,
} from '@nestjs/websockets';
import {Server, Socket} from 'socket.io';
import {Logger, UseFilters, UseGuards, UsePipes, ValidationPipe} from '@nestjs/common';
import {WebSocketService} from './websocket.service';
import {SendMessageDto} from './dto/message.dto';
import {JoinRoomDto} from './dto/join-room.dto';
import {WsJwtGuard} from './guards/ws-jwt.guard';
import {WsExceptionFilter} from './filters/ws-exception.filter';

@WebSocketGateway({
  cors: {
    origin: '*', // 生产环境请配置具体域名
    credentials: true,
  },
  namespace: '/chat',
  transports: ['websocket', 'polling'],
})
@UseFilters(WsExceptionFilter)
@UsePipes(new ValidationPipe())
export class WebSocketGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {@WebSocketServer()
  server: Server;
  private logger: Logger = new Logger('WebSocketGateway');
  constructor(private readonly webSocketService: WebSocketService) {}
  afterInit(server: Server) {this.logger.log('WebSocket Gateway Initialized');
    // 设置最大监听器数量
    server.sockets.setMaxListeners(20);
  }

  async handleConnection(client: Socket) {
    try {
      // 从查询参数获取 token
      const token = client.handshake.query.token as string;
      // 验证用户
      const user = await this.webSocketService.validateUser(token);
      
      if (!user) {client.disconnect();
        return;
      }

      // 存储用户信息到 socket
      client.data.user = user;
      
      // 加入用户专属房间
      client.join(`user_${user.id}`);
      
      // 更新在线用户列表
      await this.webSocketService.addOnlineUser(user.id, client.id);
      
      // 广播用户上线通知
      this.server.emit('user-online', {
        userId: user.id,
        username: user.username,
        timestamp: new Date(),});

      this.logger.log(`Client connected: ${client.id}, User: ${user.username}`);
      
      // 发送连接成功消息
      client.emit('connected', {
        message: 'Connected successfully',
        userId: user.id,
        serverTime: new Date(),});

    } catch (error) {this.logger.error(`Connection error: ${error.message}`);
      client.disconnect();}
  }

  @SubscribeMessage('join-room')
  @UseGuards(WsJwtGuard)
  async handleJoinRoom(@ConnectedSocket() client: Socket,
    @MessageBody() joinRoomDto: JoinRoomDto,) {const { roomId} = joinRoomDto;
    const user = client.data.user;
    // 离开之前加入的房间
    const previousRooms = Array.from(client.rooms).filter(room => room.startsWith('room_'));
    previousRooms.forEach(room => client.leave(room));
    // 加入新房间
    client.join(`room_${roomId}`);
    // 通知房间内其他用户
    client.to(`room_${roomId}`).emit('user-joined', {
      userId: user.id,
      username: user.username,
      roomId,
      timestamp: new Date(),});
  }

  @SubscribeMessage('read-receipt')
  @UseGuards(WsJwtGuard)
  async handleReadReceipt(@ConnectedSocket() client: Socket,
    @MessageBody() data: { messageId: string; roomId: string},
  ) {
    const user = client.data.user;
    // 更新消息已读状态
    await this.webSocketService.markMessageAsRead(data.messageId, user.id);
    // 广播已读回执
    this.server.to(`room_${data.roomId}`).emit('message-read', {
      messageId: data.messageId,
      userId: user.id,
      username: user.username,
      readAt: new Date(),});
  }

  @SubscribeMessage('get-online-users')
  @UseGuards(WsJwtGuard)
  async handleGetOnlineUsers(@ConnectedSocket() client: Socket) {const onlineUsers = await this.webSocketService.getOnlineUsers();
    client.emit('online-users-list', onlineUsers);
  }

  @SubscribeMessage('ping')
  handlePing(@ConnectedSocket() client: Socket) {client.emit('pong', { timestamp: new Date().toISOString()});
  }

  // 服务端主动推送消息的方法
  public sendNotificationToUser(userId: string, notification: any) {this.server.to(`user_${userId}`).emit('notification', notification);
  }

  public broadcastToRoom(roomId: string, event: string, data: any) {this.server.to(`room_${roomId}`).emit(event, data);
  }
}

src/websocket/dto/message.dto.ts

import {IsString, IsEnum, IsOptional, IsNotEmpty, MinLength, MaxLength} from 'class-validator';

export enum MessageType {
  TEXT = 'text',
  IMAGE = 'image',
  FILE = 'file',
  SYSTEM = 'system',
}

export class SendMessageDto {@IsString()
  @IsNotEmpty()
  roomId: string;

  @IsString()
  @IsNotEmpty()
  @MinLength(1)
  @MaxLength(2000)
  content: string;

  @IsEnum(MessageType)
  @IsOptional()
  type?: MessageType = MessageType.TEXT;

  @IsOptional()
  metadata?: Record<string, any>;
}

export class JoinRoomDto {@IsString()
  @IsNotEmpty()
  roomId: string;
}

src/websocket/websocket.service.ts

import {Injectable} from '@nestjs/common';
import {InjectRepository} from '@nestjs/typeorm';
import {Repository} from 'typeorm';
import {Message} from './entities/message.entity';
import {RedisService} from '../redis/redis.service';

@Injectable()
export class WebSocketService {private onlineUsers = new Map<string, Set<string>>(); // userId -> socketIds

  constructor(@InjectRepository(Message)
    private messageRepository: Repository<Message>,
    private redisService: RedisService,
  ) {}

  async validateUser(token: string): Promise<any> {
    // 这里实现 JWT 验证逻辑
    // 返回用户信息或 null
    try {
      // 示例:解码 token 获取用户信息
      // const payload = this.jwtService.verify(token);
      // return {id: payload.sub, username: payload.username};
      return {id: '1', username: 'testuser', avatar: 'avatar-url'};
    } catch {return null;}
  }

  async addOnlineUser(userId: string, socketId: string): Promise<void> {if (!this.onlineUsers.has(userId)) {this.onlineUsers.set(userId, new Set());
    }
    this.onlineUsers.get(userId).add(socketId);
    
    // 同时存储到 Redis(用于集群部署)await this.redisService.addOnlineUser(userId, socketId);
  }

  async removeOnlineUser(userId: string, socketId: string): Promise<void> {const sockets = this.onlineUsers.get(userId);
    if (sockets) {sockets.delete(socketId);
      if (sockets.size === 0) {this.onlineUsers.delete(userId);
      }
    }
    
    // 从 Redis 移除
    await this.redisService.removeOnlineUser(userId, socketId);
  }

  async getOnlineUsers(): Promise<any[]> {const users = [];
    for (const [userId, sockets] of this.onlineUsers.entries()) {if (sockets.size> 0) {
        users.push({
          userId,
          socketCount: sockets.size,
          // 可以添加更多用户信息
        });
      }
    }
    return users;
  }

  async saveMessage(messageData: {
    senderId: string;
    roomId: string;
    content: string;
    type: string;
  }): Promise<Message> {const message = this.messageRepository.create(messageData);
    return await this.messageRepository.save(message);
  }

  async markMessageAsRead(messageId: string, userId: string): Promise<void> {
    await this.messageRepository
      .createQueryBuilder()
      .update(Message)
      .set({readBy: () => `array_append(read_by, '${userId}')` })
      .where('id = :messageId', { messageId})
      .execute();}

  async getMessageHistory(roomId: string, page = 1, limit = 50): Promise<Message[]> {
    return await this.messageRepository.find({where: { roomId},
      order: {createdAt: 'DESC'},
      skip: (page - 1) * limit,
      take: limit,
    });
  }
}

src/websocket/websocket.module.ts

import {Module} from '@nestjs/common';
import {TypeOrmModule} from '@nestjs/typeorm';
import {WebSocketGateway} from './websocket.gateway';
import {WebSocketService} from './websocket.service';
import {Message} from './entities/message.entity';
import {RedisModule} from '../redis/redis.module';

@Module({
  imports: [
    TypeOrmModule.forFeature([Message]),
    RedisModule,
  ],
  providers: [WebSocketGateway, WebSocketService],
  exports: [WebSocketGateway, WebSocketService],
})
export class WebSocketModule {}

src/app.module.ts

import {Module} from '@nestjs/common';
import {TypeOrmModule} from '@nestjs/typeorm';
import {ConfigModule, ConfigService} from '@nestjs/config';
import {WebSocketModule} from './websocket/websocket.module';
import {ChatModule} from './chat/chat.module';

@Module({
  imports: [
    ConfigModule.forRoot({isGlobal: true,}),
    TypeOrmModule.forRootAsync({imports: [ConfigModule],
      useFactory: (configService: ConfigService) => ({
        type: 'postgres',
        host: configService.get('DB_HOST'),
        port: configService.get('DB_PORT'),
        username: configService.get('DB_USERNAME'),
        password: configService.get('DB_PASSWORD'),
        database: configService.get('DB_DATABASE'),
        entities: [__dirname + '/**/*.entity{.ts,.js}'],
        synchronize: configService.get('NODE_ENV') !== 'production',
      }),
      inject: [ConfigService],
    }),
    WebSocketModule,
    ChatModule,
  ],
})
export class AppModule {}

/src/main.ts


# main.ts  注册适配器
app.useWebSocketAdapter(new WsAdapter(app))

客户端代码

import {io} from "socket.io-client";

class WebSocketService {constructor() {this.socket = io("http://localhost:3000");
    this.setupListeners();}

  setupListeners() {
    // 监听服务器返回的消息
    this.socket.on("operation-result", (data) => {console.log("Operation result:", data);
    });
    
    this.socket.on("private-message", (data) => {this.handlePrivateMessage(data);
    });
  }

  // 加入房间
  joinRoom(roomId, userId) {this.socket.emit("join-room", { roomId, userId});
  }

  // 离开房间
  leaveRoom(roomId) {this.socket.emit("leave-room", roomId);
  }

  writing() {// 显示用户正在输入
    this.socket.on('user-typing', (data) => {console.log(`${data.username} is typing...`);
    });
  }

  // 发送私信
  sendPrivateMessage(toUserId, message) {
    this.socket.emit("private-message", {
      to: toUserId,
      message,
      timestamp: new Date().toISOString()
    });
  }

  // 批量操作
  sendBulkMessage(userIds, content) {
    this.socket.emit("bulk-message", {
      userIds,
      content,
      type: "notification"
    });
  }

  // 高频事件(带节流)throttledEmit = this.throttle((eventName, data) => {this.socket.emit(eventName, data);
  }, 1000);

  // 风险操作(需要确认)performRiskyOperation(operation, options = {}) {return new Promise((resolve, reject) => {
      this.socket.emit("risky-operation", {
        operation,
        ...options
      });
      
      // 监听服务器响应
      this.socket.once("operation-success", resolve);
      this.socket.once("operation-error", reject);
    });
  }
}

执行流程

客户端发送流程:

  1. socket.emit(‘event-name’, data)
  2. 消息序列化为 WebSocket 帧
  3. 通过 TCP/IP 发送到服务器
  4. NestJS WebSocket 适配器接收
  5. 路由到对应的 @SubscribeMessage 装饰器方法
  6. 执行业务逻辑
  7. 返回响应给客户端(可选)

服务器响应:

  1. 方法中使用 return 返回数据
  2. 使用 client.emit() 主动发送
  3. 使用 this.server.emit() 广播

注意:事件的名称要区分大小写,名称要完全一致, 否则服务端无法处理事件

正文完
 0
评论(没有评论)
验证码