java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot发布/订阅广播消息

SpringBoot实现发布/订阅广播消息的示例代码

作者:希望永不加班

在 RabbitMQ 的五大工作模式中,发布/订阅(Publish/Subscribe)广播模式是分布式系统中非常核心的通信方式,本文给大家介绍了SpringBoot实现发布/订阅广播消息的示例代码,需要的朋友可以参考下

引言

在 RabbitMQ 的五大工作模式中,发布/订阅(Publish/Subscribe)广播模式是分布式系统中非常核心的通信方式。

我们日常使用的普通点对点队列,一条消息只会被一个消费者消费(竞争消费);而广播模式可以实现一条消息、多服务、多消费者同时接收,完美实现一对多通知。

像 缓存刷新、配置更新、全局通知、多节点日志同步、服务状态广播 等场景,全部依赖 Fanout 广播模式。

一、什么是 MQ 广播(发布/订阅)模式?

1. 核心定义

广播模式基于 FanoutExchange(扇形交换机) 实现,核心逻辑:

生产者发送一条消息到 Fanout 交换机,所有绑定该交换机的队列,都会完整收到这条消息。

不管路由键是什么、不管队列名称,只要完成绑定,就会无条件广播投递。

2. 核心特性

3. 适用业务场景

二、四大交换机模式核心对比

交换机类型匹配规则消费模式核心场景
Direct(直连)完全匹配 routingKey点对点竞争消费订单、支付、任务处理
Topic(主题)通配符模糊匹配选择性多消费日志分级、消息订阅
Fanout(广播)无视路由键,全部投递全员订阅消费缓存刷新、全局通知
Headers匹配消息头参数自定义匹配极少使用

三、关键认知误区

1:同一个队列多消费者可以实现广播

绝对错误!

同一个队列下的多个消费者,默认是竞争消费,一条消息只会被一个消费者消费。

广播必备条件:每个消费者对应一个独立队列,全部绑定同一个 Fanout 交换机。

2:Fanout 交换机需要配置路由键

Fanout 交换机底层逻辑直接忽略 routingKey,无论发送时传什么值,都不会影响广播效果。

3:广播消息天然可靠、不会丢失

默认非持久化、自动ACK 场景下,广播消息极易丢失,生产必须做持久化+手动ACK

四、SpringBoot 完整实现

1. 基础依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 生产级配置文件

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        # 手动ACK 保证广播消息不丢
        acknowledge-mode: manual
        # 限制预取数,防止单节点消息堆积
        prefetch: 5
        # 开启消费重试
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 1000

3. 广播交换机、队列、绑定配置类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutBroadcastConfig {
    // 广播交换机名称
    public static final String FANOUT_EXCHANGE = "system.fanout.broadcast.exchange";
    // 三个独立消费者队列
    public static final String QUEUE_CACHE_REFRESH = "queue.cache.refresh";
    public static final String QUEUE_NOTICE = "queue.system.notice";
    public static final String QUEUE_LOG = "queue.log.collect";
    // 声明 Fanout 广播交换机:持久化、不自动删除
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE, true, false);
    }
    // 队列1:缓存刷新队列
    @Bean
    public Queue cacheRefreshQueue() {
        return new Queue(QUEUE_CACHE_REFRESH, true);
    }
    // 队列2:系统通知队列
    @Bean
    public Queue noticeQueue() {
        return new Queue(QUEUE_NOTICE, true);
    }
    // 队列3:日志采集队列
    @Bean
    public Queue logQueue() {
        return new Queue(QUEUE_LOG, true);
    }
    // 全部绑定到广播交换机
    @Bean
    public Binding bindingCacheRefresh(Queue cacheRefreshQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(cacheRefreshQueue).to(fanoutExchange);
    }
    @Bean
    public Binding bindingNotice(Queue noticeQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(noticeQueue).to(fanoutExchange);
    }
    @Bean
    public Binding bindingLog(Queue logQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(logQueue).to(fanoutExchange);
    }
}

4. 广播消息生产者

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class BroadcastProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/send/broadcast")
    public String sendBroadcastMsg(@RequestParam String content) {
        // Fanout广播:路由键传空字符串
        rabbitTemplate.convertAndSend(
                FanoutBroadcastConfig.FANOUT_EXCHANGE,
                "",
                content
        );
        return "✅ 广播消息发送成功:" + content;
    }
}

5. 多消费者实现(全员接收)

消费者1:缓存刷新消费者

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class CacheRefreshConsumer {
    @RabbitListener(queues = FanoutBroadcastConfig.QUEUE_CACHE_REFRESH)
    public void consume(String msg, Message message, Channel channel) throws IOException {
        try {
            System.out.println("【缓存服务】接收广播消息:" + msg);
            // 执行缓存刷新业务逻辑
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 消费失败,重回队列重试
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

消费者2:系统通知消费者

@Component
public class SystemNoticeConsumer {
    @RabbitListener(queues = FanoutBroadcastConfig.QUEUE_NOTICE)
    public void consume(String msg, Message message, Channel channel) throws IOException {
        try {
            System.out.println("【通知服务】接收广播消息:" + msg);
            // 执行消息推送业务
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

消费者3:日志采集消费者

@Component
public class LogCollectConsumer {
    @RabbitListener(queues = FanoutBroadcastConfig.QUEUE_LOG)
    public void consume(String msg, Message message, Channel channel) throws IOException {
        try {
            System.out.println("【日志服务】接收广播消息:" + msg);
            // 执行日志采集业务
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

五、测试效果

访问接口:

http://localhost:8080/send/broadcast?content=全局缓存刷新通知

控制台输出:

【缓存服务】接收广播消息:全局缓存刷新通知
【通知服务】接收广播消息:全局缓存刷新通知
【日志服务】接收广播消息:全局缓存刷新通知

一条消息,多服务同时消费,广播生效!

六、总结

1. 必须开启持久化

交换机、队列全部设置持久化,防止重启丢失广播配置。

2. 强制手动ACK

广播场景多为重要通知、缓存同步,自动ACK会导致业务未执行完成消息丢失。

3. 每个服务独立队列

不同微服务必须使用独立队列,避免竞争消费,保证广播全覆盖。

4. 广播消息建议做幂等

MQ 重试、网络抖动会导致广播消息重复推送,核心业务必须基于消息ID做幂等防重。

5. 禁止设置复杂路由键

Fanout 无视路由键,统一传空字符串,保持代码规范。

写在最后

广播发布订阅模式是微服务分布式通信的重要基石,区别于传统的点对点任务消费,它主打全局通知、多节点同步、状态广播,是缓存刷新、配置热更新、系统公告等场景的最优解。

很多开发者一直混淆“竞争消费”和“广播消费”的本质,导致线上通知不全、同步失效等隐性问题。吃透 Fanout 交换机的底层原理与落地规范,能帮你彻底解决分布式多节点同步难题。

以上就是SpringBoot实现发布/订阅广播消息的示例代码的详细内容,更多关于SpringBoot发布/订阅广播消息的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文