java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Spring boot RabbitMQ中间件

Spring boot框架下的RabbitMQ消息中间件详解

作者:阿乾之铭

这篇文章详细介绍了Spring Boot框架下的RabbitMQ消息中间件的基本概念、消息传输模型、环境准备、Spring Boot集成以及消息生产和消费,感兴趣的朋友跟随小编一起看看吧

1. RabbitMQ 基础概念

1.1 消息处理流程与组件配合

Producer(生产者) 发送消息。消息先发送到 Exchange(交换机),而不是直接到队列。

Producer(生产者)

作用:负责发送消息到 RabbitMQ 的入口,指定消息的 Exchange 和 Routing Key。

关键点

代码示例

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMessage(String exchange, String routingKey, String message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
        System.out.println("Sent message: " + message);
    }
}

调用示例

producer.sendMessage("direct-exchange", "key1", "Hello RabbitMQ");

Exchange(交换机)

作用:接收来自 Producer 的消息,并根据 Routing Key 和 Binding 的配置,决定将消息发送到哪些队列。

Exchange 通常需要手动注册为 Bean。

  • RabbitMQ 的 Exchange 是通过名称来标识的。
  • 在 Spring Boot 中,您通过 @Bean 方法注册 Exchange 时,实际上是将 Exchange 的名称和类型绑定到 RabbitMQ 服务器。
  • 发送消息时,RabbitMQ 客户端会根据 Exchange 的名称找到对应的 Exchange,并根据 Routing Key 将消息路由到队列。

类型

代码示例(定义交换机):

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ExchangeConfig {
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct-exchange");
    }
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout-exchange");
    }
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic-exchange");
    }
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange("headers-exchange");
    }
}

Queue(队列)

作用:消息的存储容器,等待消费者从中取出消息进行处理。

Queue 也需要手动注册为 Bean。Spring Boot 不会自动注册队列,因为队列的名称和属性(如是否持久化、是否排他等)需要根据业务需求进行配置。

关键点

代码示例(定义队列):

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
    @Bean
    public Queue demoQueue() {
        return new Queue("demo-queue", true); // 持久化队列
    }
}

Routing Key(路由键)

作用:决定消息如何从交换机路由到队列。

关键点

Binding(绑定)

代码示例(定义绑定):

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BindingConfig {
    @Bean
    public Binding binding(Queue demoQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(demoQueue).to(directExchange).with("key1");
    }
}

with("key1") 的作用是 指定 Binding 的 Routing Key。它的含义是:

Consumer(消费者)

作用:从队列中接收并处理消息。

关键点

代码示例

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class Consumer {
    @RabbitListener(queues = "demo-queue")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

1.2 RabbitMQ 消息传输模型

点对点模型

定义:消息从生产者发送到队列,由消费者从队列中接收,消息只能被一个消费者消费。

实现

代码示例

rabbitTemplate.convertAndSend("", "demo-queue", "Point-to-Point Message");

发布订阅模型

定义:生产者将消息发送到 Fanout 类型的交换机,消息会广播到所有绑定的队列。

实现

代码示例

rabbitTemplate.convertAndSend("fanout-exchange", "", "Fanout Message");

路由模型

定义:生产者将消息发送到 Direct 类型的交换机,根据 Routing Key 精确匹配队列。

实现

代码示例

rabbitTemplate.convertAndSend("direct-exchange", "key1", "Routing Message");

2. 环境准备

2.1 安装与配置 RabbitMQ

下载 Docker

启动 Docker

使用 Docker 快速部署 RabbitMQ

Docker 是部署 RabbitMQ 的最简单方式。通过以下命令,您可以快速启动一个 RabbitMQ 容器:

参数说明

验证 RabbitMQ 是否运行

运行以下命令,查看容器是否正常运行:

docker ps

如果看到 rabbitmq 容器正在运行,说明 RabbitMQ 已成功启动。

2.2 使用 RabbitMQ 管理插件

RabbitMQ 提供了一个 Web 管理界面,方便您监控和管理 RabbitMQ。

访问管理界面

管理界面功能

2.3 用户与权限配置

默认情况下,RabbitMQ 只有一个用户 guest,密码也是 guest。为了安全性和权限管理,建议创建新用户并分配权限。

1. 创建新用户

2. 分配权限

3. 使用新用户登录

2.4  Spring Boot 中引入 RabbitMQ 依赖 

在 pom.xml 中添加以下依赖:

<dependencies>
    <!-- RabbitMQ 依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

spring-boot-starter-amqp 是 Spring Boot 提供的 RabbitMQ 集成依赖,它包含了以下内容:

RabbitMQ 客户端库

Spring AMQP 支持

2.5 Spring Boot 配置 RabbitMQ

在 Spring Boot 项目中,您需要在 application.properties 或 application.yml 中配置 RabbitMQ 的连接信息。

示例配置

配置说明

3. Spring Boot 集成 RabbitMQ 的消息生产和消费

3.1 消息生产者(Producer)

典型做法:手动序列化为 JSON 再发送

@Service
public class CustomObjectProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendCustomObject(String queueName, MyCustomObject obj) {
        // 1. 将自定义对象序列化为 JSON 字符串
        String jsonString = new Gson().toJson(obj);
        // 2. 发送 JSON 字符串到 RabbitMQ
        rabbitTemplate.convertAndSend(queueName, jsonString);
    }
}

在消费者端,你也可以将消息(JSON 字符串)反序列化为 MyCustomObject

配置自定义 Converter(可选)

Spring AMQP 提供了 Jackson2JsonMessageConverter 等现成转换器。

@Configuration
public class RabbitConfig {
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    // 配置 RabbitTemplate 使用该转换器
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }
}

这样一来,rabbitTemplate.convertAndSend(queueName, myObject) 会自动把 myObject 转成 JSON 发送;消费者端则自动解析为同样的 Java 对象。 1)基本消息发送

场景
将消息直接发送到指定的队列,跳过交换机的路由,让 RabbitMQ 把消息放到这个队列中。

核心代码示例

@Service
public class BasicProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;  // 1.自动注入的 RabbitTemplate
    /**
     * 2.发送基本消息到指定的队列
     * @param queueName  目标队列名称
     * @param message    消息内容
     */
    public void sendToQueue(String queueName, String message) {
        // 3.调用 convertAndSend,直接将消息放入指定队列
        rabbitTemplate.convertAndSend(queueName, message);
        System.out.println("Message sent to queue: " + queueName + ", content: " + message);
    }
}

代码详解

2)发送到交换机

场景
将消息发送到一个交换机(Exchange),再由交换机通过 Routing Key 将消息路由到匹配的队列中。

核心代码示例

@Service
public class ExchangeProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 发送消息到指定交换机
     * @param exchangeName  交换机名称
     * @param routingKey    路由键
     * @param message       消息内容
     */
    public void sendToExchange(String exchangeName, String routingKey, String message) {
        // 将消息发送到 exchangeName 指定的交换机,使用 routingKey 进行路由
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
        System.out.println("Message sent to exchange: " + exchangeName + " with routingKey: " + routingKey);
    }
}

代码详解

3)发送带消息属性的消息

场景
需要为消息设置 TTL(过期时间)或优先级等属性,控制消息在队列中的行为。

核心代码示例

@Service
public class PropertyProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 发送带消息属性的消息(如 TTL, 优先级)
     */
    public void sendMessageWithProperties(String exchange, String routingKey, String messageContent) {
        // 1.创建 MessageProperties 对象,用于指定消息的属性
        MessageProperties properties = new MessageProperties();
        properties.setExpiration("10000"); // 过期时间:10秒 (单位:毫秒)
        properties.setPriority(5);        // 优先级设为 5
        // 2.根据消息体和属性构建 Message 对象
        Message message = new Message(messageContent.getBytes(), properties);
        // 3.使用 send 方法(而非 convertAndSend)直接发送 Message 对象
        rabbitTemplate.send(exchange, routingKey, message);
        System.out.println("Message with properties sent: " + messageContent);
    }
}

代码详解

Message 构造函数 

public Message(byte[] body, MessageProperties messageProperties) {
    this.body = body;
    this.messageProperties = messageProperties;
}
body:消息体的字节数组。
messageProperties:AMQP 的消息属性,包括 TTL、优先级、headers 等。、

如果消息体不是String类型

手动转换为字节:你可以先将自定义对象转换为字节数组(例如通过 JSON 序列化或 Java 序列化),再放入 new Message(...) 的第一个参数。
MyCustomObject obj = new MyCustomObject();
// 假设你想用 JSON
String jsonString = new Gson().toJson(obj);
byte[] body = jsonString.getBytes(StandardCharsets.UTF_8);
MessageProperties properties = new MessageProperties();
// 设置一些属性
Message message = new Message(body, properties);

3.2 消息消费者(Consumer)

消费者的核心功能是在指定的队列中监听消息,并根据配置的确认模式(自动确认或手动确认)对消息进行处理或拒绝。

1)监听队列并消费消息

核心代码示例(自动确认模式)

@Service
public class Consumer {
    /**
     * 使用注解 @RabbitListener 指定要监听的队列
     * 由于默认为 auto-ack 模式,
     * 当消息到达后,RabbitMQ 会自动确认并从队列中删除该消息。
     */
    @RabbitListener(queues = "demo-queue")
    public void receiveMessage(String message) {
        // 1.从 queueName 队列中取到的消息内容
        System.out.println("Received message: " + message);
        // 2.在 auto-ack 模式下,无需手动 ack
        //  如果这里出现异常,RabbitMQ 不会再次发送消息给消费者,消息会丢失。
    }
}

代码详解(自动确认模式)

2)确认机制

自动确认(auto-ack)

手动确认(manual-ack)

核心代码示例:

@Service
public class ManualAckConsumer {
    /**
     * 在 application.properties 中配置:
     * spring.rabbitmq.listener.simple.acknowledge-mode=manual
     * 使得 RabbitMQ 使用手动确认模式
     */
    @RabbitListener(queues = "demo-queue")
    public void receiveMessage(Message message, Channel channel) throws IOException {
        try {
            // 1.从消息中获取消息体
            String body = new String(message.getBody());
            System.out.println("Processing message: " + body);
            // 2.如果业务处理成功,则调用 basicAck 手动确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            System.err.println("Message processing failed: " + e.getMessage());
            // 3.如果处理失败,需要决定是重新入队还是拒绝并进入死信队列
            // requeue = true  -> 重新入队
            // requeue = false -> 丢弃或进入死信队列(根据队列配置)
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

代码详解

配置手动确认

application.properties 添加

spring.rabbitmq.listener.simple.acknowledge-mode=manual

表示 Spring AMQP 使用手动确认模式(manual-ack)。

public void receiveMessage(Message message, Channel channel)

与自动确认不同,这里不仅接收字符串,还接收了 org.springframework.amqp.core.Message 对象和 com.rabbitmq.client.ChannelMessage:包含消息体(body)和消息属性(headers 等)。Channel:给我们提供了 basicAck, basicNack, basicReject 等底层 AMQP 操作。

手动确认成功

channel.basicAck(deliveryTag, multiple)

deliveryTag

本次消息的唯一标记,从 message.getMessageProperties().getDeliveryTag() 获取。

multiple = false:只确认当前这条消息。

basicAck(long deliveryTag, boolean multiple)

这里的 deliveryTag 并不是在你构造 Message 时生成的,而是 RabbitMQ Broker 在投递消息给消费者时由底层 AMQP 协议自动分配的一个递增的序号
long deliveryTag = message.getMessageProperties().getDeliveryTag();

手动确认失败

3)处理消费失败

是重试失败了才会将消息重新入队 ,所以重试在前,重新入队在后

# 启用重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 初始重试间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000
# 间隔倍数
spring.rabbitmq.listener.simple.retry.multiplier=2.0
# 最大重试间隔
spring.rabbitmq.listener.simple.retry.max-interval=10000

行为

死信队列(DLQ)

@Configuration
public class RabbitConfig {
    @Bean
    public Queue normalQueue() {
        return QueueBuilder.durable("normal-queue")
                .withArgument("x-dead-letter-exchange", "dead-letter-exchange")  // 指定死信交换机
                .withArgument("x-dead-letter-routing-key", "dead-letter-routing-key") // 指定死信路由键
                .build();
    }
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dead-letter-exchange");
    }
    @Bean
    public Queue deadLetterQueue() {
        return new Queue("dead-letter-queue");
    }
    @Bean
    public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");
    }
}

原理

重新入队 vs 发送到死信队列

到此这篇关于Spring boot框架下的RabbitMQ消息中间件的文章就介绍到这了,更多相关Spring boot RabbitMQ消息中间件内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文