java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot防止消息重复消费

SpringBoot防止消息重复消费的方法步骤

作者:海棠Flower未眠

这段文章详细介绍了消息队列中消息重复消费的问题及解决方案,重点讲解了Redis唯一ID防重、数据库唯一索引防重、业务状态机防重和全局唯一约束防重四种方案,适合不同业务场景和并发量的需求,需要的朋友可以参考下

在项目开发中难免会用到一些消息队列,例如经常使用的MQ,在 MQ 消息队列的生产实践中,消息丢失、消息重复、消息积压是三大核心难题。其中消息重复消费是100% 必然发生的问题,不属于 Bug,而是 MQ 机制特性。 很多同学开发的订单、支付、积分、物流系统。

经常出现:

• 同一订单多次扣款 

• 同一笔积分多次发放 

• 重复生成订单、重复发货 

• 重复回调、重复更新数据   

所有问题的根源只有一个:没有做好消息幂等性

那么应该怎么处理消息幂等呢?今天分享一下如何使用SpringBoot 做消息幂等。

一、为什么会出现消息重复消费?

MQ 设计核心原则:宁可重复,绝不丢失。为了保证消息可靠性,MQ 会开启重试机制,直接导致重复消费。

1.1、重复原因

 消费者 ACK 超时
消费者业务执行成功,但返回 ACK 确认时网络抖动、超时,MQ 未收到确认,判定消费失败,重新投递消息。(生产最高频)

 消费者异常退出
业务执行一半、执行成功后程序宕机、重启,未完成 ACK,触发 MQ 重试。

生产者重复投递
生产者重试机制、接口重发、网络重传,导致发送多条相同消息。

MQ 集群故障切换
主从切换、节点重启、分区重平衡,导致消息重复分发。

结论:所有 MQ 项目,必须强制做幂等,没有例外。

二、什么是消息幂等性?

幂等性:接口/业务执行 1 次 和执行 N 次,最终业务结果完全一致,不会产生脏数据、重复数据、异常数据。

MQ 幂等核心目标:保证同一条消息,只会生效一次,多次消费无副作用。

所有幂等方案的核心抓手:唯一消息标识(msgId、orderId、tradeId、businessId)。

三、幂等方案

针对不同业务场景、不同并发量级,整理业界通用 4 套方案,从轻量到厚重,从通用到专用,按需选用。

3.1、方案一:Redis 唯一ID防重

3.1.1、核心原理

利用 Redis SETNX 原子命令,实现消息唯一占用:

  1.  每条消息携带全局唯一 msgId

  2.  消费前尝试根据 msgId 占坑(SETNX)

  3. 占坑成功:首次消费,执行业务逻辑

  4. 占坑失败:重复消息,直接 ACK 丢弃

  5. 设置过期时间,避免 Redis 死数据堆积

3.1.2、完整代码

3.1.2.1、Redis 工具类

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

@Component
public class MqIdempotentRedisUtil {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 消息幂等占坑
     * @param msgId 消息唯一ID
     * @param expireSeconds 过期时间(大于业务最大执行时长)
     * @return true=首次消费,false=重复消费
     */
    public boolean tryLock(String msgId, long expireSeconds) {
        String key = "mq:idempotent:" + msgId;
        // SETNX 原子操作:不存在则设置,存在则返回false
        return stringRedisTemplate.opsForValue()
                .setIfAbsent(key, "consumed", expireSeconds, TimeUnit.SECONDS);
    }
}

3.1.2.2、幂等消费者

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;

@Component
public class OrderMsgConsumer {

    @Resource
    private MqIdempotentRedisUtil idempotentRedisUtil;

    // 业务最大执行时长5秒,锁过期时间设30秒(预留缓冲)
    private static final long LOCK_EXPIRE_TIME = 30;

    @RabbitListener(queues = "order.pay.queue")
    public void consume(Message message, Channel channel) throws IOException {
        // 1. 获取全局唯一消息ID(生产者必须传递)
        String msgId = message.getMessageProperties().getHeader("msgId");
        if (msgId == null || "".equals(msgId)) {
            // 无唯一ID,非法消息,直接丢弃
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            return;
        }

        try {
            // 2. 幂等判断:占坑失败=重复消息
            boolean isFirstConsume = idempotentRedisUtil.tryLock(msgId, LOCK_EXPIRE_TIME);
            if (!isFirstConsume) {
                System.out.println("【重复消息丢弃】msgId:" + msgId);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                return;
            }

            // 3. 核心业务逻辑(下单、支付、积分、物流等)
            doBusiness(msgId);

            // 4. 手动ACK确认消费成功
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 消费异常,拒绝消息,重回队列重试
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            e.printStackTrace();
        }
    }

    private void doBusiness(String msgId) {
        // 模拟业务执行
        System.out.println("【首次消费成功】处理消息:" + msgId);
    }
}

3.1.3、优缺点分析

✅ 优点:性能高、无数据库压力、适配所有MQ、代码简单、不侵入业务

❌ 缺点:依赖Redis,Redis宕机需降级兜底

🎯 适用场景:绝大多数互联网业务、中小高并发场景(通用首选)

3.2、方案二:数据库唯一索引防重

3.2.1、原理

新建消息防重表,给 msgId 设置唯一索引,利用数据库唯一约束实现幂等:

  1. 消费前先插入防重记录
  2.  插入成功:首次消费,执行业务
  3. 插入报错(唯一冲突):重复消息,直接丢弃

3.2.2、表结构设计

CREATE TABLE mq_message_record (
    id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '主键',
    msg_id VARCHAR(64) NOT NULL COMMENT '消息唯一ID',
    business_type VARCHAR(32) COMMENT '业务类型',
    create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
    UNIQUE INDEX uk_msg_id (msg_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'MQ消息防重表';

3.2.3、代码实例

import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;

@Service
public class MqIdempotentDbService {

    @Resource
    private MqMessageRecordMapper messageRecordMapper;

    @Transactional(rollbackFor = Exception.class)
    public boolean isFirstConsume(String msgId, String businessType) {
        try {
            // 插入防重记录
            MqMessageRecord record = new MqMessageRecord();
            record.setMsgId(msgId);
            record.setBusinessType(businessType);
            messageRecordMapper.insert(record);
            return true;
        } catch (DuplicateKeyException e) {
            // 唯一索引冲突,重复消息
            return false;
        }
    }
}

3.2.4、优缺点分析

✅ 优点:不依赖中间件、事务一致性极强、绝对可靠、可作为Redis降级方案

❌ 缺点:高并发下数据库压力大、性能低于Redis

🎯 适用场景:核心金融、支付、账务场景、Redis宕机降级兜底

3.3、方案三:业务状态机+乐观锁

3.3.1、原理

针对订单、支付、退款、物流等有明确状态流转的业务,无需额外中间件,依靠业务状态实现天然幂等。

状态流转示例:待支付(1) → 已支付(2) → 已发货(3) → 已完成(4)

核心逻辑:仅允许状态正向流转,已变更状态禁止重复更新。

3.3.2、代码示例

-- 乐观锁更新:仅待支付订单可更新为已支付
UPDATE order_info 
SET status = 2, pay_time = NOW() 
WHERE order_id = #{orderId} AND status = 1;
@Service
public class OrderService {

    @Resource
    private OrderMapper orderMapper;

    @Transactional(rollbackFor = Exception.class)
    public boolean paySuccess(Long orderId) {
        // 更新行数=0 说明:订单已处理,重复消费
        int rows = orderMapper.updateOrderStatus(orderId, 1, 2);
        return rows > 0;
    }
}

3.3.3、优缺点分析

✅ 优点:零额外存储、零开销、业务贴合度最高、绝对幂等

❌ 缺点:仅适用于有状态业务,无状态业务无法使用

🎯 适用场景:订单、支付、退款、积分变动、会员权益变更

3.4、方案四:全局唯一约束

部分业务可直接依靠业务唯一主键实现幂等,例如:

插入数据时直接判断主键是否存在,存在则放弃操作,适配简单的新增类消息业务。

四、注意事项

4.1:锁过期时间小于业务执行时间

若业务执行需要10秒,锁只设置5秒,会导致锁提前失效,重复消息穿透

✅ 解决方案:锁过期时间 = 业务最大耗时 * 3 倍预留缓冲

4.2:先执行业务,再做幂等判断

致命错误!并发场景下会导致两条消息同时执行业务,幂等完全失效。

✅ 正确顺序:幂等判断 > 执行业务 > 手动ACK

4.3:使用自动ACK(自动确认)

自动ACK会导致业务未执行完成就确认消息,异常时无法重试,且幂等逻辑失效。

✅ 生产强制:所有核心业务MQ,必须手动ACK

4.4:msgId重复、为空

生产者未生成全局唯一ID,使用随机ID、局部ID,导致幂等判断错乱。

✅ 规范:生产者统一生成 全局唯一 msgId(UUID/雪花算法)

4.5:Redis锁执行完立即删除

高并发瞬时重复消息,会出现删锁后瞬间穿透,建议依靠过期时间自动失效,不手动删锁。

五、总结

  1.  MQ重复消费是必然现象,核心原因是ACK超时、程序异常、集群切换、生产者重发。
  2. 消息幂等核心:唯一消息ID + 消费前置防重判断
  3.  通用最优方案:Redis SETNX 原子防重,适配所有MQ场景。
  4.  核心业务兜底:数据库唯一索引、业务状态机乐观锁。
  5. 生产规范:手动ACK、合理锁过期时间、前置防重、全局唯一msgId。

消息幂等性是后端开发的必备核心能力,也是面试高频考点、生产环境硬性要求。很多线上脏数据、资金问题、业务异常,根源都不是业务 Bug,而是忽略了 MQ 重复消费的特性。

掌握这几套幂等方案,足以应对 订单、支付、积分、物流、通知 所有业务场景,彻底解决线上消息重复问题,让你的项目稳定性提升一个层级。

以上就是SpringBoot防止消息重复消费的方法步骤的详细内容,更多关于SpringBoot防止消息重复消费的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文