SpringBoot防止消息重复消费的方法步骤
作者:海棠Flower未眠
在项目开发中难免会用到一些消息队列,例如经常使用的MQ,在 MQ 消息队列的生产实践中,消息丢失、消息重复、消息积压是三大核心难题。其中消息重复消费是100% 必然发生的问题,不属于 Bug,而是 MQ 机制特性。 很多同学开发的订单、支付、积分、物流系统。
经常出现:
• 同一订单多次扣款
• 同一笔积分多次发放
• 重复生成订单、重复发货
• 重复回调、重复更新数据
所有问题的根源只有一个:没有做好消息幂等性。
那么应该怎么处理消息幂等呢?今天分享一下如何使用SpringBoot 做消息幂等。
一、为什么会出现消息重复消费?
MQ 设计核心原则:宁可重复,绝不丢失。为了保证消息可靠性,MQ 会开启重试机制,直接导致重复消费。
1.1、重复原因
消费者 ACK 超时
消费者业务执行成功,但返回 ACK 确认时网络抖动、超时,MQ 未收到确认,判定消费失败,重新投递消息。(生产最高频)
消费者异常退出
业务执行一半、执行成功后程序宕机、重启,未完成 ACK,触发 MQ 重试。
生产者重复投递
生产者重试机制、接口重发、网络重传,导致发送多条相同消息。
MQ 集群故障切换
主从切换、节点重启、分区重平衡,导致消息重复分发。
结论:所有 MQ 项目,必须强制做幂等,没有例外。
二、什么是消息幂等性?
幂等性:接口/业务执行 1 次 和执行 N 次,最终业务结果完全一致,不会产生脏数据、重复数据、异常数据。
MQ 幂等核心目标:保证同一条消息,只会生效一次,多次消费无副作用。
所有幂等方案的核心抓手:唯一消息标识(msgId、orderId、tradeId、businessId)。
三、幂等方案
针对不同业务场景、不同并发量级,整理业界通用 4 套方案,从轻量到厚重,从通用到专用,按需选用。
3.1、方案一:Redis 唯一ID防重
3.1.1、核心原理
利用 Redis SETNX 原子命令,实现消息唯一占用:
每条消息携带全局唯一 msgId
消费前尝试根据 msgId 占坑(SETNX)
占坑成功:首次消费,执行业务逻辑
占坑失败:重复消息,直接 ACK 丢弃
设置过期时间,避免 Redis 死数据堆积
3.1.2、完整代码
3.1.2.1、Redis 工具类
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@Component
public class MqIdempotentRedisUtil {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 消息幂等占坑
* @param msgId 消息唯一ID
* @param expireSeconds 过期时间(大于业务最大执行时长)
* @return true=首次消费,false=重复消费
*/
public boolean tryLock(String msgId, long expireSeconds) {
String key = "mq:idempotent:" + msgId;
// SETNX 原子操作:不存在则设置,存在则返回false
return stringRedisTemplate.opsForValue()
.setIfAbsent(key, "consumed", expireSeconds, TimeUnit.SECONDS);
}
}3.1.2.2、幂等消费者
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
@Component
public class OrderMsgConsumer {
@Resource
private MqIdempotentRedisUtil idempotentRedisUtil;
// 业务最大执行时长5秒,锁过期时间设30秒(预留缓冲)
private static final long LOCK_EXPIRE_TIME = 30;
@RabbitListener(queues = "order.pay.queue")
public void consume(Message message, Channel channel) throws IOException {
// 1. 获取全局唯一消息ID(生产者必须传递)
String msgId = message.getMessageProperties().getHeader("msgId");
if (msgId == null || "".equals(msgId)) {
// 无唯一ID,非法消息,直接丢弃
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return;
}
try {
// 2. 幂等判断:占坑失败=重复消息
boolean isFirstConsume = idempotentRedisUtil.tryLock(msgId, LOCK_EXPIRE_TIME);
if (!isFirstConsume) {
System.out.println("【重复消息丢弃】msgId:" + msgId);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return;
}
// 3. 核心业务逻辑(下单、支付、积分、物流等)
doBusiness(msgId);
// 4. 手动ACK确认消费成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 消费异常,拒绝消息,重回队列重试
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
e.printStackTrace();
}
}
private void doBusiness(String msgId) {
// 模拟业务执行
System.out.println("【首次消费成功】处理消息:" + msgId);
}
}3.1.3、优缺点分析
✅ 优点:性能高、无数据库压力、适配所有MQ、代码简单、不侵入业务
❌ 缺点:依赖Redis,Redis宕机需降级兜底
🎯 适用场景:绝大多数互联网业务、中小高并发场景(通用首选)
3.2、方案二:数据库唯一索引防重
3.2.1、原理
新建消息防重表,给 msgId 设置唯一索引,利用数据库唯一约束实现幂等:
- 消费前先插入防重记录
- 插入成功:首次消费,执行业务
- 插入报错(唯一冲突):重复消息,直接丢弃
3.2.2、表结构设计
CREATE TABLE mq_message_record (
id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键',
msg_id VARCHAR(64) NOT NULL COMMENT '消息唯一ID',
business_type VARCHAR(32) COMMENT '业务类型',
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE INDEX uk_msg_id (msg_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'MQ消息防重表';3.2.3、代码实例
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@Service
public class MqIdempotentDbService {
@Resource
private MqMessageRecordMapper messageRecordMapper;
@Transactional(rollbackFor = Exception.class)
public boolean isFirstConsume(String msgId, String businessType) {
try {
// 插入防重记录
MqMessageRecord record = new MqMessageRecord();
record.setMsgId(msgId);
record.setBusinessType(businessType);
messageRecordMapper.insert(record);
return true;
} catch (DuplicateKeyException e) {
// 唯一索引冲突,重复消息
return false;
}
}
}3.2.4、优缺点分析
✅ 优点:不依赖中间件、事务一致性极强、绝对可靠、可作为Redis降级方案
❌ 缺点:高并发下数据库压力大、性能低于Redis
🎯 适用场景:核心金融、支付、账务场景、Redis宕机降级兜底
3.3、方案三:业务状态机+乐观锁
3.3.1、原理
针对订单、支付、退款、物流等有明确状态流转的业务,无需额外中间件,依靠业务状态实现天然幂等。
状态流转示例:待支付(1) → 已支付(2) → 已发货(3) → 已完成(4)
核心逻辑:仅允许状态正向流转,已变更状态禁止重复更新。
3.3.2、代码示例
-- 乐观锁更新:仅待支付订单可更新为已支付
UPDATE order_info
SET status = 2, pay_time = NOW()
WHERE order_id = #{orderId} AND status = 1;@Service
public class OrderService {
@Resource
private OrderMapper orderMapper;
@Transactional(rollbackFor = Exception.class)
public boolean paySuccess(Long orderId) {
// 更新行数=0 说明:订单已处理,重复消费
int rows = orderMapper.updateOrderStatus(orderId, 1, 2);
return rows > 0;
}
}3.3.3、优缺点分析
✅ 优点:零额外存储、零开销、业务贴合度最高、绝对幂等
❌ 缺点:仅适用于有状态业务,无状态业务无法使用
🎯 适用场景:订单、支付、退款、积分变动、会员权益变更
3.4、方案四:全局唯一约束
部分业务可直接依靠业务唯一主键实现幂等,例如:
- 支付流水号唯一
- 订单ID唯一
- 退款单号唯一
插入数据时直接判断主键是否存在,存在则放弃操作,适配简单的新增类消息业务。
四、注意事项
4.1:锁过期时间小于业务执行时间
若业务执行需要10秒,锁只设置5秒,会导致锁提前失效,重复消息穿透。
✅ 解决方案:锁过期时间 = 业务最大耗时 * 3 倍预留缓冲
4.2:先执行业务,再做幂等判断
致命错误!并发场景下会导致两条消息同时执行业务,幂等完全失效。
✅ 正确顺序:幂等判断 > 执行业务 > 手动ACK
4.3:使用自动ACK(自动确认)
自动ACK会导致业务未执行完成就确认消息,异常时无法重试,且幂等逻辑失效。
✅ 生产强制:所有核心业务MQ,必须手动ACK
4.4:msgId重复、为空
生产者未生成全局唯一ID,使用随机ID、局部ID,导致幂等判断错乱。
✅ 规范:生产者统一生成 全局唯一 msgId(UUID/雪花算法)
4.5:Redis锁执行完立即删除
高并发瞬时重复消息,会出现删锁后瞬间穿透,建议依靠过期时间自动失效,不手动删锁。
五、总结
- MQ重复消费是必然现象,核心原因是ACK超时、程序异常、集群切换、生产者重发。
- 消息幂等核心:唯一消息ID + 消费前置防重判断。
- 通用最优方案:Redis SETNX 原子防重,适配所有MQ场景。
- 核心业务兜底:数据库唯一索引、业务状态机乐观锁。
- 生产规范:手动ACK、合理锁过期时间、前置防重、全局唯一msgId。
消息幂等性是后端开发的必备核心能力,也是面试高频考点、生产环境硬性要求。很多线上脏数据、资金问题、业务异常,根源都不是业务 Bug,而是忽略了 MQ 重复消费的特性。
掌握这几套幂等方案,足以应对 订单、支付、积分、物流、通知 所有业务场景,彻底解决线上消息重复问题,让你的项目稳定性提升一个层级。
以上就是SpringBoot防止消息重复消费的方法步骤的详细内容,更多关于SpringBoot防止消息重复消费的资料请关注脚本之家其它相关文章!
