java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > springboot3.0整合rabbitmq

springboot3.0整合rabbitmq3.13的实现示例

作者:TanYYF

本文主要介绍了springboot3.0整合rabbitmq3.13的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

1 RabbitMQ 核心概念

RabbitMQ 是一个开源的消息代理软件,实现了高级消息队列协议 (AMQP 0-9-1),为应用程序提供了异步通信的能力。在深入了解消息发送机制之前,我们需要理解几个核心概念:

消息流经 RabbitMQ 的基本过程是:

这种架构提供了应用程序解耦、异步通信和流量缓冲等优势,使分布式系统更加灵活和可靠。

2.RabbitMQ 交换器类型详解

RabbitMQ中的Exchange(交换器)是消息路由的核心组件,负责接收生产者发送的消息,并根据特定的路由规则将消息分发到一个或多个队列中。在RabbitMQ 3.13中,主要有以下几种交换器类型:

2.1. Direct Exchange(直连交换器)

特点

工作原理

应用场景

2.2. Fanout Exchange(扇出交换器)

特点

工作原理

应用场景

2.3. Topic Exchange(主题交换器)

特点

工作原理

应用场景

2.4. Headers Exchange(头交换器)

特点

工作原理

应用场景

2.5. Default Exchange(默认交换器)

特点

工作原理

2.6. Dead Letter Exchange(死信交换器)

特点

工作原理

总结

不同类型的交换器适用于不同的业务场景。在选择交换器类型时,需要根据具体的路由需求来决定:

3.spring boot 整合rabbitmq

3.1 项目结构

spring-rabbitmq-demo/
├── src/
│   └── main/
│       ├── java/
│       │   └── cn/
│       │       └── spring/
│       │           └── rabbitmq/
│       │               └── demo/
│       │                   ├── config/
│       │                   │   ├── RabbitMQConfig.java
│       │                   │   ├── RabbitmqTemplatePostProcessor.java
│       │                   │   └── SwaggerConfig.java
│       │                   ├── controller/
│       │                   │   └── rabbitmq/
│       │                   │       └── RabbitmqDemoController.java
│       │                   ├── rabbitmq/
│       │                   │   ├── consumer/
│       │                   │   │   └── MessageConsumer.java
│       │                   │   ├── message/
│       │                   │   │   ├── BaseMessage.java
│       │                   │   │   └── DemoMessage.java
│       │                   │   └── sender/
│       │                   │       └── MessageSender.java
│       │                   └── RabbitmqDemoApplication.java
│       └── resources/
│           ├── application.yaml
│           └── logback-spring.xml
└── pom.xml

3.2 依赖配置

Maven 依赖

在 pom.xml 文件中添加以下依赖:

<dependencies>
    <!-- Web 相关 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- RabbitMQ 相关 -->
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
    </dependency>
    
    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    
    <!-- Swagger 相关 -->
    <dependency>
        <groupId>org.springdoc</groupId>
        <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
        <version>2.8.11</version>
    </dependency>
    
    <!-- Jackson 用于JSON序列化 -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    
    <!-- hutool 工具包 -->
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.7.15</version>
    </dependency>
</dependencies>

3.3. 配置文件

server:
  port: 8081
spring:
  application:
    name: spring-rabbitmq-demo
  profiles:
    active: dev

  main:
    allow-circular-references: true # 允许循环依赖
--- #############rabbitmq配置#####################
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: xxxx
    password: xxxx
    virtual-host: /
    publisher-returns: true # 启用发布者返回功能,如果消息没有到达队列,则会通知生产者。
    # NONE(默认值):不返回任何信息。
    # SIMPLE(简单模式)使用同步阻塞方式等待MQ的确认回执,发送消息后会阻塞当前线程,直到收到确认结果,性能相对较低,因为需要等待确认结果
    # CORRELATED(关联模式):使用异步非阻塞方式,生产者发送消息后,不等待MQ的确认回执,而是直接返回,并通过回调函数的方式通知生产者。
    publisher-confirm-type: NONE
    listener:
      # SIMPLE:RabbitMQ 消费者将消息分发到一个单独的线程(Invoker Thread)进行处理,消费者线程与业务处理线程是分离的
      #特点:
          #异步处理:消息消费与业务处理在不同线程中执行
          #更好的容错性:业务处理异常不会直接影响 RabbitMQ 消费者线程
          #更高的资源消耗:需要额外的线程来进行消息分发和处理
          #支持并发消费者配置
      #DIRECT:适用于对延迟敏感、吞吐量要求高的场景,或者资源受限的环境
        #工作原理:监听器直接在 RabbitMQ 消费者线程上调用执行,没有额外的分发线程
          #特点:
          #同步处理:消息消费与业务处理在同一个线程中执行
          #更低的延迟:没有线程切换开销
          #更少的资源消耗:不需要额外的线程池
          #更简单的线程模型:更容易调试和分析
          #业务处理的异常会直接影响消费者线程
      #STREAM(流容器)
        #工作原理:使用 RabbitMQ Stream Client 处理消息流
          #特点:
          #专为高吞吐量和持久化流处理设计
          #支持超大规模消息保留
          #支持消费者组和消息重播
          #适用于需要处理大量历史数据的场景
          #需要 RabbitMQ 3.9+ 版本支持
      type: simple # 使用simple类型监听器容器
      simple:
        # 是否在启动时自动启动容器,默认为true 当设置为 true 时,容器会在 Spring 应用上下文启动完成后自动开始监听消息;设置为 false 时,需要手动调用 start() 方法启动容器
        auto-startup: true
        # 侦听器调用者线程的最小数量,默认为1 控制并发消费者的最小数量,用于处理消息的并行度
        concurrency: 1
        # 侦听器调用者线程的最大数量,默认为1(与concurrency相同)当消息负载较高时,容器可以动态扩展到的最大消费者数量
        max-concurrency: 1
        # 每个消费者能够同时存在且未被确认的消息的最大数量。,默认为250
        prefetch: 250
        # 确认模式,可选值:NONE(不确认)、MANUAL(手动确认)、AUTO(自动确认),默认为AUTO
        acknowledge-mode: AUTO
        # 默认情况下,拒绝交付是否重新排队,默认为true 当监听器方法抛出异常时,决定消息是否重新放回队列。设置为 false 可以避免消息无限重试
        default-requeue-rejected: true
        # 应该多久发布一次空闲容器事件,默认不发布(无默认值) 用于监控容器状态,当容器在指定时间内没有消息处理时会发布 ApplicationEvent
        idle-event-interval: 0ms
        # 是否将批处理消息作为离散消息传递,或者将整个批处理传递给监听器,默认为true 当启用时,批量消息会被分解为单条消息分别处理;禁用时,整个批次作为一个消息传递给监听器
        de-batching-enabled: true
        # 当容器停止时是否立即停止还是处理完所有预取的消息,默认为false 设置为 true 时,容器会在处理完当前消息后立即停止;false 时,会处理完所有预取消息后再停止
        force-stop: false
        # 是否启用观察(Observation),默认为false 启用后可以通过 Micrometer 收集容器的指标信息
        observation-enabled: false
        # 批次大小,以物理消息数量表示,默认为1 与批量处理相关,定义每个批次包含的消息数量
        batch-size: 10
        # 如果容器声明的队列在代理上不可用,是否失败,默认为true 设置为 true 时,如果队列不可用容器会失败或停止;false 时容器会继续运行
        missing-queues-fatal: true
        # 是否基于'receive-timeout'和'batch-size'创建消息批次,默认为false 启用后会将 deBatchingEnabled 强制设为 true,将生产者创建的批次内容作为离散记录包含在批次中
        consumer-batch-enabled: false
        # 适中的接收超时时间
        receive-timeout: 5000ms
        # 重试相关配置
        retry:
          # 是否启用重试机制,默认为false
          enabled: false
          # 最大尝试次数,默认为3
          max-attempts: 3
          # 初始重试间隔时间,默认为1000ms
          initial-interval: 1000ms
          # 最大重试间隔时间,默认为10000ms
          max-interval: 10000ms
          # 重试间隔的乘数,默认为2.0
          multiplier: 2.0
          # 重试时是有状态还是无状态,默认为true(无状态)
          stateless: true
    template:
      # 是否启用强制消息投递,默认为false
      # 当设置为true时,如果消息无法路由到队列,会抛出AmqpMessageReturnedException异常
      # 需要配合RabbitTemplate的ReturnsCallback使用
      mandatory: false
      # receive()操作的超时时间,默认为0ms(无限等待)
      # 用于receive()方法调用时的等待超时时间
      receive-timeout: 0ms
      # sendAndReceive()操作的超时时间,默认为5000ms(5秒)
      # 用于请求-回复模式下的等待超时时间
      reply-timeout: 5000ms
      # 发送操作使用的默认交换机名称,默认为空字符串(使用默认交换机)
      # 当使用RabbitTemplate发送消息时不指定交换机时使用此默认值
      exchange: ""
      # 发送操作使用的默认路由键,默认为空字符串
      # 当使用RabbitTemplate发送消息时不指定路由键时使用此默认值
      routing-key: ""
      # 当没有明确指定接收队列时,默认接收消息的队列名称,默认为null
      # 用于RabbitTemplate接收消息时的默认队列
      default-receive-queue:
      # 是否启用观察(Observation),默认为false
      # 启用后可以通过Micrometer收集RabbitTemplate的指标信息
      observation-enabled: false
      # 用于反序列化时允许的包/类的简单模式列表,默认为null
      # 用于控制哪些类可以被反序列化,防止不安全的反序列化
      allowed-list-patterns:
      # 重试相关配置
      retry:
        # 是否启用重试机制,默认为false
        # 启用后RabbitTemplate在发送消息失败时会进行重试
        enabled: false
        # 最大尝试次数,默认为3次
        max-attempts: 3
        # 初始重试间隔时间,默认为1000ms(1秒)
        initial-interval: 1000ms
        # 最大重试间隔时间,默认为10000ms(10秒)
        max-interval: 10000ms
        # 重试间隔的乘数,默认为2.0
        # 每次重试的间隔时间会乘以这个数值
        multiplier: 2.0

3.4 核心组件实现

3.4.1 消息实体类

BaseMessage.java

@Data
public class BaseMessage implements java.io.Serializable {
    @Serial
    private static final long serialVersionUID = 1L;
    
    protected Long deliveryTag = IdUtil.getSnowflake().nextId();
}

DemoMessage.java

@Data
public class DemoMessage extends BaseMessage{
    private Long userId;
    private String message;
    private Date createTime;
}

3.4.2 RabbitMQ 配置类

RabbitMQConfig.java

@Configuration
@Import(RabbitmqTemplatePostProcessor.class)
public class RabbitMQConfig {

    /**
     * 延迟交换机(用于接收延迟消息)
     *
     * @return
     */
    @Bean
    public Exchange delayExchange() {
        return ExchangeBuilder.directExchange("delay.exchange")
                .durable(true)
                .build();
    }

    /**
     * 延迟队列(没有消费者监听)
     * 设置死信交换机和路由键,当消息过期后自动转发到死信交换机
     *
     * @return
     */
    @Bean
    public Queue delayQueue() {
        return QueueBuilder.durable("delay.queue")
                // 设置死信交换机
                .deadLetterExchange("delay.process.exchange")
                // 设置死信路由键
                .deadLetterRoutingKey("delay.process")
                .build();
    }

    /**
     * 延迟交换机与延迟队列绑定
     *
     * @return
     */
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue())
                .to(delayExchange())
                .with("delay")
                .noargs();
    }


    /**
     * json消息转换器
     *
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 为批量处理创建专用的监听器容器工厂
     *
     * @param connectionFactory
     * @return
     */
    @Bean("batchListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory batchListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(3);
        factory.setPrefetchCount(10);
        factory.setBatchListener(true); // 启用批量监听
        factory.setConsumerBatchEnabled(true); // 启用消费者端批量处理
        factory.setBatchSize(10); // 设置批次大小
        factory.setReceiveTimeout(5000L); // 设置接收超时时间
        factory.setBatchReceiveTimeout(5000L);
        factory.setDeBatchingEnabled(false); // 禁用分解批处理消息
        factory.setMessageConverter(messageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置为 true 可在当前消息处理完毕后停止容器,并重新排队任何预取的消息。在使用独占或单活动消费者时很有用
        factory.setForceStop(true);
        // 设置关闭超时时间
        factory.setContainerCustomizer((container -> {
            container.setShutdownTimeout(30000L);
            // container.setExclusive(true);
        }));
        return factory;
    }
}

RabbitmqTemplatePostProcessor.java
RabbitTemplate 启用 ConfirmCallback、ReturnsCallback 和消息持久化 需要配合配置:

publisher-returns: true # 启用发布者返回功能,如果消息没有到达队列,则会通知生产者。
    # NONE(默认值):不返回任何信息。
    # SIMPLE(简单模式)使用同步阻塞方式等待MQ的确认回执,发送消息后会阻塞当前线程,直到收到确认结果,性能相对较低,因为需要等待确认结果
    # CORRELATED(关联模式):使用异步非阻塞方式,生产者发送消息后,不等待MQ的确认回执,而是直接返回,并通过回调函数的方式通知生产者。
    publisher-confirm-type: NONE
@Slf4j
public class RabbitmqTemplatePostProcessor implements BeanPostProcessor {
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof RabbitTemplate) {
            RabbitTemplate rabbitTemplate = (RabbitTemplate) bean;

            // 启用发送确认机制(ConfirmCallback)
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    if (ack) {
                        log.info("消息成功发送到交换机,correlationData: {}", correlationData);
                    } else {
                        log.error("消息发送到交换机失败,correlationData: {}, cause: {}", correlationData, cause);
                    }
                }
            });

            // 启用发送失败回调机制(ReturnCallback)
            rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                @Override
                public void returnedMessage(ReturnedMessage returnedMessage) {
                    log.error("消息无法路由到队列,exchange: {}, routingKey: {}, message: {}, replyCode: {}, replyText: {}",
                            returnedMessage.getExchange(),
                            returnedMessage.getRoutingKey(),
                            returnedMessage.getMessage(),
                            returnedMessage.getReplyCode(),
                            returnedMessage.getReplyText());
                }
            });

            // 设置消息持久化
            rabbitTemplate.setMandatory(true);

            log.info("RabbitTemplate 配置完成:启用 ConfirmCallback、ReturnsCallback 和消息持久化");
        }
        return bean;
    }

}

3.4.3 消息发送者

MessageSender.java

@Component
public class MessageSender {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Value("${rabbitmq.fanout.exchange:fanout-exchange}")
    private String fanoutExchange;

    @Value("${rabbitmq.topic.exchange:topic-exchange}")
    private String topicExchange;

    @Value("${rabbitmq.delay.exchange:delay.exchange}")
    private String delayExchange;


    /**
     * 发送消息到队列demo.queue
     *
     * @param message
     */
    public void basicSend(DemoMessage message) {
        rabbitTemplate.convertAndSend("demo.queue", message);
    }


    /**
     * 发送消息到广播exchange
     *
     * @param message
     */
    public void fanoutSend(DemoMessage message) {
        rabbitTemplate.convertAndSend(fanoutExchange, "", message);
    }

    /**
     * 发送消息到topic exchange
     *
     * @param message    消息内容
     * @param routingKey 路由键
     */
    public void topicSend(DemoMessage message, String routingKey) {
        rabbitTemplate.convertAndSend(topicExchange, routingKey, message);
    }

    /**
     * 发送延迟消息
     *
     * @param message 消息内容
     * @param delay   延迟时间(毫秒)
     */
    public void delaySend(DemoMessage message, long delay) {
        rabbitTemplate.convertAndSend(delayExchange, "delay", message, msg -> {
            if (delay > 0) {
                msg.getMessageProperties().setExpiration(String.valueOf(delay));
            }
            return msg;
        });
    }

    /**
     * 批量发送消息
     *
     * @param messages 消息列表
     */
    public void batchSend(List<DemoMessage> messages) {
        messages.forEach(message -> {
            rabbitTemplate.convertAndSend("batch.exchange", "batch", message, msg -> {
                // 自定义消息属性
                return msg;
            });

        });
    }
}

3.4.4 消息消费者

MessageConsumer.java

@Slf4j
@Component
public class MessageConsumer {

    @Resource
    private ObjectProvider<MessageConverter> messageConverterObjectProvider;

    /**
     * 监听并处理DemoMessage类型的消息
     *
     * @param message 消息内容
     */
    @RabbitListener(queuesToDeclare = {@Queue("demo.queue")})
    public void handleMessageByAnnotation(DemoMessage message) {
        log.info("[handleMessageByAnnotation] 收到消息: userId={}, message={}, createTime={}",
                message.getUserId(), message.getMessage(), message.getCreateTime());
    }

    /**
     * 监听广播消息1
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("fanout.queue.1"),
            exchange = @Exchange(value = "fanout-exchange", type = "fanout")
    ))
    public void handleFanoutMessage1(DemoMessage message) {
        log.info("[handleFanoutMessage1] 收到广播消息: userId={}, message={},  createTime={}",
                message.getUserId(), message.getMessage(), message.getCreateTime());
    }


    /**
     * 监听广播消息2
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("fanout.queue.2"),
            exchange = @Exchange(value = "fanout-exchange", type = "fanout")
    ))
    public void handleFanoutMessage2(DemoMessage message) {
        log.info("[handleFanoutMessage2] 收到广播消息: userId={}, message={},  createTime={}",
                message.getUserId(), message.getMessage(), message.getCreateTime());
    }

    /**
     * 监听topic消息1
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue.1"),
            exchange = @Exchange(value = "topic-exchange", type = "topic"),
            key = "topic.message.specific"
    ))
    public void handleTopicMessage1(DemoMessage message) {
        log.info("[handleTopicMessage1] 收到Topic消息: userId={}, message={}, createTime={}",
                message.getUserId(), message.getMessage(), message.getCreateTime());
    }

    /**
     * 监听topic消息2
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue.2"),
            exchange = @Exchange(value = "topic-exchange", type = "topic"),
            key = "topic.message.*"
    ))
    public void handleTopicMessage2(DemoMessage message) {
        log.info("[handleTopicMessage2] 收到Topic消息: userId={}, message={}, createTime={}",
                message.getUserId(), message.getMessage(), message.getCreateTime());
    }

    /**
     * 监听延迟消息处理队列
     * 当延迟队列中的消息过期后,会被自动转发到此队列进行处理
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("delay.process.queue"),
            exchange = @Exchange(value = "delay.process.exchange", type = "direct"),
            key = "delay.process"
    ))
    public void handleDelayMessage(DemoMessage message) {
        log.info("[handleDelayMessage] 收到延迟消息: userId={}, message={}, createTime={}",
                message.getUserId(), message.getMessage(), message.getCreateTime());
    }

    /**
     * 批量处理消息
     *
     * @param messages
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("batch.queue"),
            exchange = @Exchange(value = "batch.exchange", type = "direct"),
            key = "batch"
    ), batch = "true", containerFactory = "batchListenerContainerFactory", ackMode = "MANUAL")
    public void handleBatchMessage(List<Message> messages, Channel channel) {
        log.info("[handleBatchMessage] 开始处理批量消息,共 {} 条", messages.size());

        if (CollUtil.isEmpty(messages)) {
            log.info("[handleBatchMessage] 消息列表为空,无需处理");
            return;
        }

        // 分别存储成功和失败的消息
        List<Message> successMessages = new ArrayList<>();
        List<Message> failedMessages = new ArrayList<>();

        // 批量转换消息
        for (Message message : messages) {
            try {
                DemoMessage demoMessage = (DemoMessage) messageConverterObjectProvider.getObject().fromMessage(message);
                demoMessage.setDeliveryTag(message.getMessageProperties().getDeliveryTag());
                successMessages.add(message);
                log.debug("[handleBatchMessage] 消息转换成功: deliveryTag={}", message.getMessageProperties().getDeliveryTag());
            } catch (Exception e) {
                log.error("[handleBatchMessage] 消息转换失败: deliveryTag={}", message.getMessageProperties().getDeliveryTag(), e);
                failedMessages.add(message);
            }
        }

        // 处理成功转换的消息
        if (CollUtil.isNotEmpty(successMessages)) {
            try {
                log.info("[handleBatchMessage] 开始处理 {} 条成功转换的消息", successMessages.size());
                // 模拟处理时间 - 实际应用中这里应该是真正的业务逻辑
                processMessages(successMessages);

                // 批量确认所有成功处理的消息
                for (Message message : successMessages) {
                    try {
                        long deliveryTag = message.getMessageProperties().getDeliveryTag();
                        channel.basicAck(deliveryTag, false);
                        log.debug("[handleBatchMessage] 消息确认成功: deliveryTag={}", deliveryTag);
                    } catch (IOException e) {
                        log.error("[handleBatchMessage] 消息确认失败: deliveryTag={}", message.getMessageProperties().getDeliveryTag(), e);
                    }
                }
                log.info("[handleBatchMessage] 成功处理并确认 {} 条消息", successMessages.size());

            } catch (Exception e) {
                log.error("[handleBatchMessage] 处理消息时发生异常", e);
                // 如果处理过程中出现异常,将所有成功转换的消息标记为失败
                failedMessages.addAll(successMessages);
            }
        }

        // 处理转换失败的消息
        if (!failedMessages.isEmpty()) {
            log.warn("[handleBatchMessage] 共 {} 条消息处理失败,尝试重新入队", failedMessages.size());
            for (Message message : failedMessages) {
                try {
                    long deliveryTag = message.getMessageProperties().getDeliveryTag();
                    // 第三个参数设为true,表示将消息重新放回队列
                    channel.basicNack(deliveryTag, false, true);
                    log.debug("[handleBatchMessage] 消息重新入队: deliveryTag={}", deliveryTag);
                } catch (IOException e) {
                    log.error("[handleBatchMessage] 消息重新入队失败: deliveryTag={}", message.getMessageProperties().getDeliveryTag(), e);
                }
            }
        }

        log.info("[handleBatchMessage] 批量消息处理完成: 成功={}条, 失败={}条",
                successMessages.size(), failedMessages.size());
    }

    /**
     * 实际处理消息的方法
     * 在实际应用中,这里应该包含真正的业务逻辑
     *
     * @param messages 待处理的消息列表
     * @throws Exception 处理异常
     */
    private void processMessages(List<Message> messages) throws Exception {
        // 模拟处理时间
        Thread.sleep(50L);

        // 实际应用中,这里应该是真正的业务逻辑处理
        log.info("[processMessages] 处理了 {} 条消息", messages.size());
    }

3.5 关键技术点

3.5.1 消息确认机制

在批量处理消息时,使用手动确认模式(MANUAL)来确保消息处理的可靠性:

@RabbitListener(..., ackMode = "MANUAL")
public void handleBatchMessage(List<Message> messages, Channel channel) {
    // 处理消息
    // 手动确认消息
    channel.basicAck(deliveryTag, false);
}

3.5.2 批量消息处理

@Bean("batchListenerContainerFactory")
public SimpleRabbitListenerContainerFactory batchListenerContainerFactory(
        ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setBatchListener(true); // 启用批量监听
    factory.setConsumerBatchEnabled(true); // 启用消费者端批量处理
    factory.setBatchSize(10); // 设置批次大小
    return factory;
}

/**
     * 批量处理消息
     *
     * @param messages
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("batch.queue"),
            exchange = @Exchange(value = "batch.exchange", type = "direct"),
            key = "batch"
    ), batch = "true", containerFactory = "batchListenerContainerFactory", ackMode = "MANUAL")
    public void handleBatchMessage(List<Message> messages, Channel channel) {
    }

3.5.3 延迟消息

public void delaySend(DemoMessage message, long delay) {
    rabbitTemplate.convertAndSend(delayExchange, "delay", message, msg -> {
        if (delay > 0) {
            msg.getMessageProperties().setExpiration(String.valueOf(delay));
        }
        return msg;
    });
}


/**
     * 监听延迟消息处理队列
     * 当延迟队列中的消息过期后,会被自动转发到此队列进行处理
     *
     * @param message
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("delay.process.queue"),
            exchange = @Exchange(value = "delay.process.exchange", type = "direct"),
            key = "delay.process"
    ))
    public void handleDelayMessage(DemoMessage message) {
    ...
    }

4.最佳实践

  1. 使用 JSON 序列化:通过 Jackson2JsonMessageConverter 实现消息的 JSON 序列化,提高可读性和兼容性。
  2. 手动确认消息:在关键业务场景中使用手动确认模式,确保消息被正确处理。
  3. 异常处理:合理处理消息转换和业务处理过程中的异常,避免消息丢失。
  4. 批量处理:对于高吞吐量场景,使用批量处理提高处理效率。
  5. 配置优化:根据业务需求合理配置消费者数量、预取数量等参数。

5.总结

本文档介绍了 Spring Boot 与 RabbitMQ 的整合方案,包括基础配置、消息发送、消息消费、批量处理、延迟消息等核心功能。通过合理使用这些功能,可以构建高可用、高性能的异步消息处理系统。在实际应用中,需要根据具体业务场景进行相应的调整和优化。

源码地址:https://gitee.com/zheji/spring-rabbitmq-demo

到此这篇关于springboot3.0整合rabbitmq3.13的实现示例的文章就介绍到这了,更多相关springboot3.0整合rabbitmq内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文