java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot整合RocketMQ

SpringBoot整合RocketMQ极速实战教程

作者:cfm_2914

本文详细介绍了SpringBoot集成RocketMQ的全过程,涵盖普通、广播、顺序、延迟、批量、过滤、事务等多种消息模式,通过自动装配、全局配置、核心组件等介绍RocketMQ与SpringBoot的深入集成原理与实战应用,感兴趣的朋友一起看看吧

一、前期准备

1.1 环境依赖

1.2 核心依赖引入(Maven)

Apache 官方适配 SpringBoot Starter,无需手动适配版本,自动兼容。

<!-- SpringBoot 整合 RocketMQ 官方依赖 -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

二、全局配置文件(application.yml)

配置服务地址、生产组、消费组,后续所有功能统一复用该配置。

# RocketMQ 配置
rocketmq:
  # NameServer 集群地址
  name-server: 127.0.0.1:9876
  # 生产者配置
  producer:
    # 生产者组名
    group: demo-producer-group
    # 消息发送超时时间
    send-message-timeout: 3000
    # 最大重试次数
    retry-times-when-send-failed: 2
  # 消费者默认配置
  consumer:
    group: demo-consumer-group

三、SpringBoot 集成 RocketMQ 核心原理

在进行各类消息实战之前,先深度讲解 SpringBoot 与 RocketMQ 的底层集成原理,搞懂自动装配、核心组件、通信机制,知其然更知其所以然,解决开发中组件失效、连接失败、消息发送异常等底层问题。

3.1 自动装配原理(核心)

RocketMQ 官方提供的 rocketmq-spring-boot-starter 遵循 SpringBoot 自动装配机制,无需手动创建生产者、消费者实例,框架自动初始化托管Bean。

3.2 核心集成组件说明

3.3 完整通信执行流程

3.4 SpringBoot集成优势

四、基础消息实战:普通生产与消费

三、基础消息实战:普通生产与消费

3.1 普通消息实现原理

原理说明:普通消息是RocketMQ最基础的消息模型,采用生产者主动推送、消费者主动拉取模式。生产者通过NameServer获取Broker路由信息,基于负载均衡策略选择Broker节点投递消息,消息落地Broker磁盘持久化;消费者定时拉取消息,业务执行成功自动提交消费位点,异常触发重试,保障At-Least-Once至少一次投递语义。支持同步、异步、单向三种发送模式,适配不同吞吐与可靠性需求。

3.2 普通消息生产者

支持三种发送模式:同步、异步、单向,覆盖绝大多数业务场景。

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MqProducer {
    @Autowired
    private RocketMQTemplate rocketMqTemplate;
    // 定义全局Topic
    private static final String TOPIC = "demo-normal-topic";
    /**
     * 同步发送消息(可靠,适合核心业务)
     */
    public void sendSyncMsg(String msg) {
        SendResult sendResult = rocketMqTemplate.syncSend(TOPIC, msg);
        System.out.println("同步发送结果:" + sendResult.getSendStatus());
    }
    /**
     * 异步发送消息(高吞吐,适合非核心业务)
     */
    public void sendAsyncMsg(String msg) {
        rocketMqTemplate.asyncSend(TOPIC, msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("异步发送成功");
            }
            @Override
            public void onException(Throwable e) {
                System.err.println("异步发送失败:" + e.getMessage());
            }
        });
    }
    /**
     * 单向发送(极致高性能,无需响应)
     */
    public void sendOneWayMsg(String msg) {
        rocketMqTemplate.sendOneWay(TOPIC, msg);
    }
}

3.3 普通消息消费者

默认集群消费模式,自动负载均衡,异常自动重试。

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
        topic = "demo-normal-topic",
        consumerGroup = "demo-consumer-group"
)
public class NormalMsgConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        // 执行业务逻辑
        System.out.println("收到普通消息:" + message);
        // 异常自动进入重试队列,无需手动处理
        // int a = 1 / 0;
    }
}

五、广播消息实战

实现原理:广播消息核心是消费组级别的全量投递。集群消费是组内分摊消息,而广播消费会让Broker将同一条消息推送给当前消费组内所有在线消费者实例,每个实例独立消费、独立维护位点。为避免集群异常刷屏,广播消费关闭重试机制,消息消费失败不会进入重试队列。适用于全服务缓存刷新、全局配置更新等全员同步场景。

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
        topic = "demo-broadcast-topic",
        consumerGroup = "demo-broadcast-group",
        consumeMode = ConsumeMode.BROADCASTING // 开启广播消费
)
public class BroadcastConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("广播消费消息:" + message);
    }
}

注意:广播消费不支持重试,适合全局缓存刷新、配置更新场景。

六、顺序消息实战(分区有序)

实现原理:RocketMQ顺序消息仅支持分区有序(局部有序),不支持全局有序。核心实现逻辑:Topic会被拆分为多个消息队列,生产者通过自定义业务Key哈希取模,将同一业务维度(同一订单/同一用户)的消息固定投递到同一个MessageQueue;消费者单线程消费单个队列,严格保证队列内消息FIFO先进先出,从而实现业务时序一致。不同队列消息并行消费,兼顾有序性与吞吐量。

5.1 有序消息生产者原理与代码

public void sendOrderMsg(String bizKey, String msg) {
    // bizKey相同 → 固定发送同一队列 → 保证顺序
    rocketMqTemplate.syncSendOrderly("demo-order-topic", msg, bizKey);
}

5.2 有序消息消费者原理与代码

import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
        topic = "demo-order-topic",
        consumerGroup = "demo-order-group",
        messageModel = MessageModel.CLUSTERING
)
public class OrderMsgConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("顺序消费:" + message);
    }
}

七、延迟消息实战

实现原理:延迟消息核心是系统队列中转延时机制。生产者发送消息时指定延迟等级,Broker接收消息后不会存入目标Topic队列,而是转入内置的SCHEDULE_TOPIC_XXXX延迟队列。Broker后台定时线程扫描延迟队列,倒计时结束后,将消息重新路由至用户指定的普通Topic队列,消费者方可正常拉取消费。RocketMQ不支持自定义任意时间,仅支持官方预设18个固定延迟等级,保证服务性能稳定。

/**
 * 发送延迟消息
 * 延迟等级:1~18级 → 1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h
 */
public void sendDelayMsg(String msg) {
    // 3级延迟 = 10秒后消费
    rocketMqTemplate.syncSend("demo-delay-topic", msg, 3);
}

八、批量消息实战

实现原理:批量消息核心是合并网络IO、减少请求次数。多条同Topic、同配置的消息封装为一个消息列表,通过一次网络请求提交至Broker,Broker批量持久化存储。大幅降低频繁网络连接、请求握手带来的性能开销,极大提升高吞吐场景的并发能力。批量消息为整体事务机制,整批消息要么全部成功存储,要么全部失败,不支持部分成功。

import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
public void sendBatchMsg() {
    List<Message> messageList = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        Message message = new Message("demo-batch-topic", ("批量消息" + i).getBytes());
        messageList.add(message);
    }
    // 批量发送
    rocketMqTemplate.syncSend(messageList);
}

注意:单批次总大小不超过4MB,整体成功/整体失败。

九、消息过滤实战(Tag过滤)

实现原理:消息过滤采用服务端预过滤机制,避免无效消息拉取浪费网络资源。Tag过滤是轻量级过滤方案,生产者为消息绑定业务标签,Broker存储时关联Tag标识;消费者订阅指定Tag表达式,Broker在服务端直接匹配过滤,仅推送符合条件的消息至消费者,无性能损耗。复杂场景可使用SQL过滤,基于消息自定义属性做多条件筛选。

8.1 Tag过滤实现原理与代码

// 发送订单消息,tag = ORDER
public void sendTagMsg() {
    rocketMqTemplate.syncSend("demo-tag-topic:ORDER", "订单创建消息");
}

8.2 消费者订阅指定Tag

@RocketMQMessageListener(
        topic = "demo-tag-topic",
        consumerGroup = "demo-tag-group",
        selectorExpression = "ORDER||PAY" // 只消费ORDER、PAY标签消息
)
@Component
public class TagFilterConsumer implements RocketMQListener<String>{
    @Override
    public void onMessage(String s) {
        System.out.println("过滤消费消息:" + s);
    }
}

十、事务消息实战(核心重点)

实现原理:事务消息基于两阶段提交+超时回查机制,解决本地数据库事务与消息发送的分布式一致性问题。第一阶段发送半消息预提交,消息持久化但对消费者不可见;第二阶段执行本地事务,根据事务结果提交或回滚消息。针对生产者宕机、网络超时等异常,Broker提供定时回查兜底,主动校验本地事务状态,彻底杜绝消息悬挂、数据不一致问题。

实现本地事务与消息发送最终一致性。

9.1 事务消息完整代码实现

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
@RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener {
    @Autowired
    private RocketMQTemplate rocketMqTemplate;
    private static final String TRANS_TOPIC = "demo-trans-topic";
    // 发送事务消息入口
    public void sendTransMsg(String content) {
        Message<String> message = MessageBuilder.withPayload(content).build();
        rocketMqTemplate.sendMessageInTransaction(TRANS_TOPIC, message, content);
    }
    // 执行本地事务
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        try {
            // 模拟本地数据库事务
            System.out.println("执行本地事务:" + arg);
            // 成功则提交消息
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            // 异常回滚消息
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    // 事务回查
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        // 查询数据库事务状态,此处模拟成功
        return RocketMQLocalTransactionState.COMMIT;
    }
}

到此这篇关于SpringBoot整合RocketMQ极速实战教程的文章就介绍到这了,更多相关SpringBoot整合RocketMQ内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文