为什么使用消息队列
RocketMQ 是阿里的开源框架,理论单机并发数为 10 万 +,低于 kafka 的 100 万,高于 RabbitMQ 的 1 万 2 和 ActiveMQ 的 6000 并发。当然只支持 java,不像 kafka、RabbitMQ 和 ActiveMQ 支持其他全部语言
- 解耦
- 传统同步调用中 (如 A→B→>C) 若 C 系统故障会导致全链路问题,在 B 与 C 间加入 MQ 后,B 只需将消息发至 MQ 即可完成业务,C 系统宕机重启后仍可接收消息,实现 B 与 C 的解耦合;
- 异步
- 同步调用中 B 需等待 C 响应,加入 MQ 后 B 发消息至 MQ 即结束,后续由 MQ 异步通知 C,提升链路效率;
- 流量削峰填谷
- MQ 吞吐量高(如卡夫卡百万级),若 C 系统并发仅 1 万,面对 30 万 1 分钟的突发流量,MQ 可缓冲流量,C 系统按 1 万 / 秒的速度消化,用 30 分钟处理完 30 万请求,避免 C 系统被压垮
保证不被重复消费
并不能保证不能重复消费,在网络不稳定的情况下,生产者没有收到 RocketMQ 的 ACK 包确定已经接收到消息会重复发送,RocketMQ 没有接收消费者的 ACK 包确定消费,会触发 RocketMQ 的 rebalance, 重新分配 Queue。解决去重方式有 redis 和数据库两种:
- Redis:性能高,适合高并发场景,实现简单
- 根据消息处理业务 id 写入,SET key value NX EX TTL,key 使用业务 id
- 写入成功说明没有处理过,执行业务
- 写入失败则已经存在,说明消息正在执行,跳过避免重复消费,但是要打印日志方便后续业务的及时响应
- 数据库新增:适合创建类操作,订单生成等。数据一致性强,可靠性高,不需要依赖外部组件
- 在执行消息创建的时候查询一次,筛选出已经存在的数据,若是返回数据说明已经执行操作了
- 执行写入数据操作,若是发生唯一索引冲突说明已经插入数据成功。插入数据成功则执行后续操作
- 数据库修改:天然幂等,适用于业务状态有明确的流转路径。
- 使用数据库的 WHERE 条件确保满足执行条件更新
- 条件更新的有点事: 可避免并发问题,适用与大多数场景
做到宁可重复判断,不可重复执行,通过幂等性实现最终一致性
正文完
发表至: 后端
2025-09-26