java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot实现延迟消息

SpringBoot实现延迟消息的两种方案及对象详解

作者:希望永不加班

在日常业务开发中,延迟消息是高频刚需场景,几乎所有中大型项目都会用到,很多同学第一反应是用 定时任务轮询数据库,但这种方式存在性能差、延迟不准、数据库压力大、实时性低等致命问题,完全不适合生产环境,所以本文给大家介绍了SpringBoot实现延迟消息的两种方案

在日常业务开发中,延迟消息是高频刚需场景,几乎所有中大型项目都会用到。

常见业务场景:

很多同学第一反应是用 定时任务轮询数据库,但这种方式存在性能差、延迟不准、数据库压力大、实时性低等致命问题,完全不适合生产环境。

目前企业级主流实现方案只有两种:

1. 死信队列 TTL 延迟消息(RabbitMQ 原生,无需插件)

2. 延迟交换机插件延迟消息(生产首选、灵活度最高、本文重点)

一、核心原理深度讲解

1.1 RabbitMQ 原生短板

RabbitMQ 原生不支持真正的延迟消息,所有消息都是立即投递、立即消费,没有内置定时延迟投递机制。想要实现延迟效果,只能通过曲线方案实现。

1.2 两种实现方案原理对比

方案一:死信队列 TTL(原生无插件)

核心逻辑:消息先进入普通队列,设置过期时间(TTL),消息过期后无人消费,自动变为死信,被转发到死信队列,消费者监听死信队列实现延迟消费。

致命缺陷:

适用场景:全局统一延迟的简单业务(如所有订单统一30分钟超时)

方案二:延迟交换机插件(x-delayed-message)✅ 生产首选

通过安装官方延迟插件,RabbitMQ 会新增一种自定义交换机类型:x-delayed-message

核心原理:

1. 生产者发送消息时,在消息头携带 x-delay 延迟时间(毫秒)

2. 消息不会立即投递到队列,由延迟交换机内部暂存

3. 等待指定延迟时间结束后,交换机自动将消息路由到目标队列

4. 消费者监听队列,完成延迟消费

核心优势:

二、前置环境准备

2.1 安装延迟消息插件

插件版本必须与当前 RabbitMQ 版本完全一致,否则启动报错、功能失效。

插件下载地址:RabbitMQ 官方插件仓库

安装步骤:

1. 将下载好的 rabbitmq_delayed_message_exchange-xxx.ez 放入 RabbitMQ plugins 目录

2. 启用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3. 重启 RabbitMQ 服务:systemctl restart rabbitmq-server

4. 控制台查看交换机类型,出现 x-delayed-message 即安装成功

2.2 SpringBoot 项目依赖

SpringBoot 整合 RabbitMQ 核心依赖,所有版本通用:

<!-- RabbitMQ AMQP 核心依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.3 全局配置文件 application.yml

配置连接信息、消息确认机制、重试机制,适配生产环境:

spring:
  rabbitmq:
    # 基础连接配置
    host:127.0.0.1
    port:5672
    username:guest
    password:guest
    virtual-host:/
    # 开启生产者确认
    publisher-confirm-type:correlated
    publisher-returns:true
    # 消费者手动ACK
    listener:
      simple:
        acknowledge-mode:manual
        retry:
          enabled:true
          max-attempts: 3

三、生产级完整代码实现

3.1 RabbitMQ 延迟交换机配置类

自定义延迟交换机、队列、绑定关系,全部持久化,重启不丢失配置:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
 * 延迟消息队列配置类
 * 基于 rabbitmq-delayed-message-exchange 插件实现
 */
@Configuration
publicclassDelayRabbitConfig {
    // 延迟交换机名称
    publicstaticfinalStringDELAY_EXCHANGE="business_delay_exchange";
    // 延迟队列名称
    publicstaticfinalStringDELAY_QUEUE="business_delay_queue";
    // 路由键
    publicstaticfinalStringDELAY_ROUTING_KEY="business.delay.routing";
    /**
     * 构建延迟交换机
     * x-delayed-message:延迟交换机类型
     * x-delayed-type:转发模式(direct/topic/fanout)
     */
    @Bean
    public DirectExchange delayExchange() {
        Map<String, Object> args = newHashMap<>();
        // 核心参数:声明为延迟交换机
        args.put("x-delayed-type", "direct");
        // 参数:名称、持久化、不自动删除、自定义参数
        returnnewDirectExchange(DELAY_EXCHANGE, true, false, args);
    }
    /**
     * 延迟队列(持久化)
     */
    @Bean
    public Queue delayQueue() {
        returnnewQueue(DELAY_QUEUE, true);
    }
    /**
     * 队列与延迟交换机绑定
     */
    @Bean
    public Binding delayBinding(Queue delayQueue, DirectExchange delayExchange) {
        return BindingBuilder.bind(delayQueue)
                .to(delayExchange)
                .with(DELAY_ROUTING_KEY);
    }
    /**
     * 自定义RabbitTemplate,开启消息可靠投递
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory);
        // 开启 mandatory,消息投递失败返回回调
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }
}

3.2 延迟消息生产者(支持动态自定义延迟时间)

核心亮点:每条消息可单独设置延迟时间,毫秒级单位,灵活适配不同业务:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
publicclassDelayMsgProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 发送延迟消息接口
     * @param message 消息内容
     * @param delayTime 延迟时间(单位:毫秒)
     * @return 结果
     */
    @GetMapping("/send/delay/message")
    public String sendDelayMessage(@RequestParam String message,
                                   @RequestParam Long delayTime) {
        // 发送延迟消息
        rabbitTemplate.convertAndSend(
                DelayRabbitConfig.DELAY_EXCHANGE,
                DelayRabbitConfig.DELAY_ROUTING_KEY,
                message,
                // 核心:设置单条消息延迟时间
                msg -> {
                    msg.getMessageProperties().setHeader("x-delay", delayTime);
                    return msg;
                }
        );
        return"延迟消息发送成功!预计 " + delayTime / 1000 + " 秒后执行,消息内容:" + message;
    }
}

3.3 生产级消费者(手动ACK、异常重试、防消息丢失)

生产环境禁止自动ACK,必须手动确认,保证消息可靠投递,失败可重回队列重试:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
publicclassDelayMsgConsumer {
    /**
     * 监听延迟队列,手动ACK模式
     */
    @RabbitListener(queues = DelayRabbitConfig.DELAY_QUEUE)
    publicvoidconsumeDelayMessage(String msg, Message message, Channel channel)throws IOException {
        // 获取消息唯一标识
        longdeliveryTag= message.getMessageProperties().getDeliveryTag();
        try {
            // 执行业务逻辑
            System.out.println("【延迟消息消费成功】时间:" + System.currentTimeMillis() + ",消息内容:" + msg);
            // 手动确认消费成功,删除消息
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            // 消费异常,拒绝消息,重回队列重试
            System.err.println("【延迟消息消费失败】异常信息:" + e.getMessage());
            channel.basicNack(deliveryTag, false, true);
        }
    }
}

四、接口测试

启动项目,访问接口,测试延迟效果:

测试地址:

http://localhost:8080/send/delay/message?message=订单超时自动取消&delayTime=10000

参数说明:

访问后,控制台会在10秒后打印消费日志,延迟效果精准生效!

五、两种延迟方案深度对比

实现方案

优点

缺点

适用场景

死信队列TTL

原生支持、无需插件、零部署成本

单队列统一延迟、消息阻塞、延迟不准、不灵活

简单统一延迟业务

延迟交换机插件

单消息独立延迟、精准无阻塞、灵活度高、代码简洁

需安装插件、重启服务

所有生产级延迟业务(推荐)

六、注意事项

七、总结

1、定时轮询数据库是最低效的延迟方案,生产环境直接淘汰;

2、死信队列 TTL 适合简单统一延迟场景,局限性非常大;

3、延迟交换机插件方案灵活、精准、稳定,是目前企业 SpringBoot 项目延迟消息的最优解

4、生产落地必须搭配 消息持久化、生产者确认、消费者手动ACK、异常重试,保证消息可靠性。

以上就是SpringBoot实现延迟消息的两种方案及对象详解的详细内容,更多关于SpringBoot实现延迟消息的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文