SprigBoot整合rocketmq-v5-client-spring-boot的示例详解
作者:弄个昵称
- 安装RocketMQ 服务端
Apache RocketMQ官方网站
5.X文档地址
rocketmq-v5-client-spring-boot官方示例
rocketmq-v5-client-spring-boot-starter maven仓库
RocketMQ服务端 的安装包分为两种,二进制包和源码包。这里以5.3.2版本做示例 。 点击这里 下载 Apache RocketMQ 5.3.2的源码包。你也可以从这里 下载到二进制包。二进制包是已经编译完成后可以直接运行的,源码包是需要编译后运行的。
本文整合的rocketmq-v5-client-spring-boot 版本2.3.3 , 内部引用的是rocketmq-client-java , 版本5.0.7,使用的是gRPC 协议 , 使用前建议先把官方文档与示例看一下,使用的Java环境是openjdk-17
建议使用jdk11以上版本
ubuntu安装RocketMQ
查看jdk版本
java -version
如果没安装jdk的话,在root账号下执行以下命令
apt-get upgrade apt-get update apt install openjdk-17-jdk
创建文件夹
cd /usr/local
mkdir rocketmq
cd rocketmq
下载RocketMQ二进制包(我这里使用的是5.3.2版本,使用其他版本可前往官方下载)
wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.2/rocketmq-all-5.3.2-bin-release.zip
解压 没有unzip的解压命令 , ubuntu会提示,根据提示安装unzip插件
unzip rocketmq-all-5.3.2-bin-release.zip
进入bin目录
调整runserver.sh的内存大小
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
调整runbroker.sh内存大小
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m"
修改/conf/broker.conf配置文件,这里主要是为了测试方便我们放开自动创建Topic的配置,加入以下配置(经过测试5.0以上版本不支持自动创建主题topic)
# 开启自动创建 Topic 加不加都行 autoCreateTopicEnable=true #内网ip namesrvAddr:nameSrv地址 公网访问设置公网IP 内网访问设置内网IP 以下所有IP需一致 namesrvAddr=192.168.3.86:9876 #brokerIP1:broker也需要一个ip 内网或公网 brokerIP1=192.168.3.86
配置 NameServer 的环境变量
配置环境
vim /etc/profile
添加以下配置
#MQ安装位置 export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-5.3.2 #MQ公网或内网ip 公网访问设置公网IP 内网访问设置内网IP export NAMESRV_ADDR=192.168.3.86:9876
重新编译文件生效
source /etc/profile
修改完后,我们就可以启动 RocketMQ 的 NameServer 了
启动 namesrv
nohup sh bin/mqnamesrv &
验证
# 验证 namesrv 是否启动成功 tail -f -n 500 mqnamesrv.log ... The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876 # 或者是 tail -f ~/logs/rocketmqlogs/namesrv.log
启动 Broker 消息存储中心和 Proxy 代理
# 启动(不使用代理) nohup sh bin/mqbroker -n 192.168.3.86:9876 >mqbroker.log 2>&1 & # 启动 Broker+Proxy nohup sh bin/mqbroker -n 192.168.3.86:9876 --enable-proxy & # 推荐使用 指定配置文件启动(broker默认使用的端口是10911,我们也可以在配置文件修改端口) nohup sh bin/mqbroker -n 192.168.3.86:9876 -c conf/broker.conf --enable-proxy &
# 验证是否启动成功 tail -n 500 nohup.out tail -f ~/logs/rocketmqlogs/broker.log tail -f ~/logs/rocketmqlogs/proxy.log
Wed May 14 12:41:41 CST 2025 rocketmq-proxy startup successfully
使用tail -f ~/logs/rocketmqlogs/broker.log 查看日志如果提示以下
The default acl dir /usr/local/rocketmq/rocketmq-all-5.3.2/conf/acl is not exist
需要切换conf目录下 新建acl文件夹就行了
mkdir acl
由于v5可参考的文档太少,这个报错我也没找到为什么源码包里会少一个acl的文件夹,有知道的希望留言告知
测试消息收发
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
可以通过 mqadmin 命令创建
Admin Tool官方命令工具
注意 TestTopic 是topic名称
sh bin/mqadmin updatetopic -n 192.168.3.86:9876 -t TestTopic -c DefaultCluster
打印
create topic to 192.168.3.86:10911 success. TopicConfig [topicName=TestTopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]
安装RocketMQ Dashboard 可视化
官方介绍
按照官方安装运行就可以
需要注意的点,IP需要跟上面设置的IP一致,防火墙开通8080,8081,10911,9876
注意:默认端口为:8080,不修改会跟Proxy端口冲突,Proxy端口默认的也是8080
我这里修改了RocketMQ Dashboard的默认端口,改成8082
可以在本地运行,也可以打包运行,我是打包运行的,运行成功后访问:http://192.168.3.86:8082
可以先添加主题,(研究了几天,没研究自动添加的方法 , 如果那位大佬研究出来了可以给我分享一下)
点击提交就行了。研究了源码,这个添加跟更新是一个接口。提交之后就可以敲代码了(有研究出来能自动加载Topic的麻烦留言)
如果启动报错,需添加 topic
CODE: 17 DESC: No topic route info in name server for the topic: delay-topic
整合rocketmq-client-java
先用官方原生 rocketmq-client-java JDK
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.7</version> </dependency>
官方示例代码可以去看看
发送普通消息
package com.example.mq.producer; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientConfigurationBuilder; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; public class ProducerExample { private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class); public static void main(String[] args) throws ClientException { // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8080;xxx:8081。 String endpoint = "192.168.3.86:8081"; // 消息发送的目标Topic名称,需要提前创建。 String topic = "TestTopic"; ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint); ClientConfiguration configuration = builder.build(); // 初始化Producer时需要设置通信配置以及预绑定的Topic。 Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(configuration) .build(); // 普通消息发送。 Message message = provider.newMessageBuilder() .setTopic(topic) // 设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") // 设置消息Tag,用于消费端根据指定Tag过滤消息。 .setTag("messageTag") // 消息体。 .setBody("你好,mq".getBytes()) .build(); try { // 发送消息,需要关注发送结果,并捕获失败等异常。 SendReceipt sendReceipt = producer.send(message); logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (ClientException e) { logger.error("Failed to send message", e); } // producer.close(); } }
订阅
package com.example.mq.consumer; import java.io.IOException; import java.time.Duration; import java.util.Collections; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PushConsumerExample { private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class); private PushConsumerExample() { } public static void main(String[] args) throws ClientException, IOException, InterruptedException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。 String endpoints = "192.168.3.86:8081"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .build(); // 订阅消息的过滤规则,表示订阅所有Tag的消息。 String tag = "*"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); // 为消费者指定所属的消费者分组,Group需要提前创建。 String consumerGroup = "YourConsumerGroup"; // 指定需要订阅哪个目标Topic,Topic需要提前创建。 String topic = "TestTopic"; // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。 PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // 设置消费者分组。 .setConsumerGroup(consumerGroup) // 设置预绑定的订阅关系。 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) // 设置消费监听器。 .setMessageListener(messageView -> { // 处理消息并返回消费结果。 logger.info("Consume message successfully, messageId={}", messageView.getMessageId()); ByteBuffer body = messageView.getBody(); String message = StandardCharsets.UTF_8.decode(body).toString(); logger.info("Consume message successfully, body={}", message); return ConsumeResult.SUCCESS; }) .build(); Thread.sleep(Long.MAX_VALUE); // 如果不需要再使用 PushConsumer,可关闭该实例。 // pushConsumer.close(); } }
4.整合rocketmq-v5-client-spring-boot
整合SpringBoot rocketmq-v5-client-spring-boot
注意原生的rocketmq-client-java需要注释掉,rocketmq-v5-client-spring-boot已经引入了,不注释会jar包冲突
引入maven
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-v5-client-spring-boot</artifactId> <version>2.3.3</version> </dependency>
先创建变量
public class RocketMQVariable { /** * 普通消息队列 */ public static final String NORMAL_TOPIC = "normal-topic"; // 如果使用负载均衡模式 需要设置相同的消费组名 public static final String NORMAL_GROUP = "normal-group"; // 处理广播消费模式使用 public static final String NORMAL1_GROUP = "normal1-group"; /** * 异步普通消息队列 */ public static final String ASYNC_NORMAL_TOPIC = "async-normal-topic"; public static final String ASYNC_NORMAL_GROUP = "async-normal-group"; /** * 顺序消息队列 */ public static final String FIFO_TOPIC = "fifo-topic"; public static final String FIFO_GROUP = "fifo-group"; /** * 定时/延时消息队列 */ public static final String DELAY_TOPIC = "delay-topic"; public static final String DELAY_GROUP = "delay-group"; /** * 事务消息队列 */ public static final String TRANSACTION_TOPIC = "transaction-topic"; public static final String TRANSACTION_GROUP = "transaction-group"; }
注意:
如果使用负载均衡模式 需设置相同的Topic 相同的group
如果使用广播消费模式 需设置相同的Topic 不同的group
编写工具类
import com.alibaba.fastjson2.JSONObject; import jakarta.annotation.Resource; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.apache.rocketmq.client.apis.producer.Transaction; import org.apache.rocketmq.client.common.Pair; import org.apache.rocketmq.client.core.RocketMQClientTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Component public class RocketMQV2Service { private static final Logger log = LoggerFactory.getLogger(RocketMQV2Service.class); @Resource private RocketMQClientTemplate template; /** * 发送普通消息 * * @param topic * @param message */ public void syncSendNormalMessage(String topic, Object message) { SendReceipt sendReceipt = template.syncSendNormalMessage(topic, message); log.info("普通消息发送完成:topic={}, message = {}, sendReceipt = {}", topic, message, sendReceipt); } /** * 发送异步普通消息 * * @param topic * @param message */ public void asyncSendNormalMessage(String topic, Object message) { CompletableFuture<SendReceipt> future = new CompletableFuture<>(); ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool(); future.whenCompleteAsync((sendReceipt, throwable) -> { if (null != throwable) { log.error("发送消息失败", throwable); return; } log.info("发送异步消息消费成功5, messageId={}", sendReceipt.getMessageId()); }, sendCallbackExecutor); CompletableFuture<SendReceipt> completableFuture = template.asyncSendNormalMessage(topic, message, future); log.info("发送异步消息成功1, topic={}, message = {}, sendReceipt={}", topic, message, completableFuture); } /** * 发送顺序消息 * * @param topic * @param message * @param messageGroup */ public void syncSendFifoMessage(String topic, Object message, String messageGroup) { SendReceipt sendReceipt = template.syncSendFifoMessage(topic, message, messageGroup); log.info("顺序消息发送完成:topic={}, message = {}, sendReceipt = {}", topic, message, sendReceipt); } /** * 发送延时消息 * * @param topic * @param message * @param delay 单位:秒 */ public void syncSendDelayMessage(String topic, Object message, Long delay) { SendReceipt sendReceipt = template.syncSendDelayMessage(topic, message, Duration.ofSeconds(delay)); log.info("延时消息发送完成 :topic={}, message = {}, sendReceipt = {}", topic, message, sendReceipt); } /** * 发送延时消息 * * @param topic * @param message * @param duration Duration.ofSeconds(秒) Duration.ofMinutes(分钟) Duration.ofHours(小时) */ public void syncSendDelayMessage(String topic, Object message, Duration duration) { SendReceipt sendReceipt = template.syncSendDelayMessage(topic, message, duration); log.info("延时消息发送完成 :topic={}, message = {}, sendReceipt = {}", topic, message, sendReceipt); } /** * 发送事务消息 * * @param topic * @param message * @throws ClientException */ public Pair<SendReceipt, Transaction> sendMessageInTransaction(String topic, Object message) { try { Pair<SendReceipt, Transaction> pair = template.sendMessageInTransaction(topic, message); SendReceipt sendReceipt = pair.getSendReceipt(); Transaction transaction = pair.getTransaction(); log.info("事务消息发送完成 transaction = {} , sendReceipt = {}", JSONObject.toJSONString(transaction), JSONObject.toJSONString(sendReceipt)); log.info("消息id : {} ", sendReceipt.getMessageId()); //如果这里提交了事务 if (doLocalTransaction(1)) { log.info("本地事务执行成功"); transaction.commit(); } else { log.info("本地事务执行失败"); transaction.rollback(); } return pair; } catch (ClientException e) { throw new RuntimeException(e); } } boolean doLocalTransaction(int number) { // 本地事务逻辑 数据库操作 log.info("执行本地事务 : {}", number); return number > 5; } }
测试代码
yml配置
rocketmq: producer: endpoints: 192.168.3.86:8081 topic: push-consumer: endpoints: 192.168.3.86:8081 access-key: secret-key: topic: tag: "*"
发送消息
import com.alibaba.fastjson2.JSONObject; import com.example.mq.util.RocketMQV2Service; import com.example.mq.variable.RocketMQVariable; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.message.MessageId; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.apache.rocketmq.client.apis.producer.Transaction; import org.apache.rocketmq.client.common.Pair; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.Duration; @Slf4j @RestController @RequestMapping("/send") public class SendController { @Autowired private RocketMQV2Service rocketMQV2Service; /** * 普通消息 * * @return */ @GetMapping("/normal.message") public String normalMessage() { rocketMQV2Service.syncSendNormalMessage(RocketMQVariable.NORMAL_TOPIC, "hello RocketMQ 这是普通消息"); return "发送成功"; } /** * 异步普通消息 * * @return */ @GetMapping("/async.normal.message") public String asyncSendNormalMessageNormalMessage() { rocketMQV2Service.asyncSendNormalMessage(RocketMQVariable.ASYNC_NORMAL_TOPIC, "hello RocketMQ 这是异步普通消息"); return "发送成功"; } /** * 顺序消息 * * @return */ @GetMapping("/flfo.message") public String flfoMessage() { for (int i = 0; i < 20; i++) { rocketMQV2Service.syncSendFifoMessage(RocketMQVariable.FIFO_TOPIC, "hello RocketMQ 这是顺序消息" + i, RocketMQVariable.FIFO_GROUP); } return "发送成功"; } /** * 定时/延时消息 * * @return */ @GetMapping("/delay.message") public String delayMessage() { rocketMQV2Service.syncSendDelayMessage(RocketMQVariable.DELAY_TOPIC, "hello RocketMQ 这是30秒定时消息", Duration.ofSeconds(30)); rocketMQV2Service.syncSendDelayMessage(RocketMQVariable.DELAY_TOPIC, "hello RocketMQ 这是10秒定时消息 ", 10l); rocketMQV2Service.syncSendDelayMessage(RocketMQVariable.DELAY_TOPIC, "hello RocketMQ 这是1分钟定时消息", Duration.ofMinutes(1)); return "发送成功"; } /** * 事务消息 * * @return */ @GetMapping("/transaction.message") public String transactionMessage() throws ClientException { Pair<SendReceipt, Transaction> sendReceiptTransactionPair = rocketMQV2Service.sendMessageInTransaction(RocketMQVariable.TRANSACTION_TOPIC, "hello RocketMQ 这是事务消息"); Transaction transaction = sendReceiptTransactionPair.getTransaction(); SendReceipt sendReceipt = sendReceiptTransactionPair.getSendReceipt(); MessageId messageId = sendReceipt.getMessageId(); log.info("事务消息发送完成 messageId = {}", messageId); log.info("事务消息发送完成 transaction = {} , sendReceipt = {}", JSONObject.toJSONString(transaction), JSONObject.toJSONString(sendReceipt)); return "发送成功"; } }
普通消息(广播模式)
import com.alibaba.fastjson2.JSONObject; import com.example.mq.variable.RocketMQVariable; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.annotation.RocketMQMessageListener; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.core.RocketMQListener; import org.springframework.stereotype.Service; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Objects; @Slf4j @Service @RocketMQMessageListener(topic = RocketMQVariable.NORMAL_TOPIC, consumerGroup = RocketMQVariable.NORMAL_GROUP) public class PushConsumerNormalService implements RocketMQListener { @Override public ConsumeResult consume(MessageView messageView) { log.info("普通消息, messageView={}", messageView); ByteBuffer body = messageView.getBody(); String message = StandardCharsets.UTF_8.decode(body).toString(); log.info("普通消息, message={}", message); Map<String, String> properties = messageView.getProperties(); log.info("普通消息, properties={}", JSONObject.toJSONString(properties)); if (Objects.nonNull(messageView.getProperties().get("OrderId"))) { log.info("普通消息, message={}", messageView); return ConsumeResult.FAILURE; } return ConsumeResult.SUCCESS; } }
import com.alibaba.fastjson2.JSONObject; import com.example.mq.variable.RocketMQVariable; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.annotation.RocketMQMessageListener; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.core.RocketMQListener; import org.springframework.stereotype.Service; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Objects; @Slf4j @Service @RocketMQMessageListener(topic = RocketMQVariable.NORMAL_TOPIC, consumerGroup = RocketMQVariable.NORMAL1_GROUP) public class PushConsumerNormal1Service implements RocketMQListener { @Override public ConsumeResult consume(MessageView messageView) { log.info("普通消息1, messageView={}", messageView); ByteBuffer body = messageView.getBody(); String message = StandardCharsets.UTF_8.decode(body).toString(); log.info("普通消息1, message={}", message); Map<String, String> properties = messageView.getProperties(); log.info("普通消息1, properties={}", JSONObject.toJSONString(properties)); if (Objects.nonNull(messageView.getProperties().get("OrderId"))) { log.info("普通消息1, message={}", messageView); return ConsumeResult.FAILURE; } return ConsumeResult.SUCCESS; } }
日志 (注意consumerGroup 不同)
负载均衡模式 把consumerGroup改为 RocketMQVariable.NORMAL_GROUP
import com.alibaba.fastjson2.JSONObject; import com.example.mq.variable.RocketMQVariable; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.annotation.RocketMQMessageListener; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.core.RocketMQListener; import org.springframework.stereotype.Service; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Objects; @Slf4j @Service @RocketMQMessageListener(topic = RocketMQVariable.NORMAL_TOPIC, consumerGroup = RocketMQVariable.NORMAL_GROUP) public class PushConsumerNormal1Service implements RocketMQListener { @Override public ConsumeResult consume(MessageView messageView) { log.info("普通消息1, messageView={}", messageView); ByteBuffer body = messageView.getBody(); String message = StandardCharsets.UTF_8.decode(body).toString(); log.info("普通消息1, message={}", message); Map<String, String> properties = messageView.getProperties(); log.info("普通消息1, properties={}", JSONObject.toJSONString(properties)); if (Objects.nonNull(messageView.getProperties().get("OrderId"))) { log.info("普通消息1, message={}", messageView); return ConsumeResult.FAILURE; } return ConsumeResult.SUCCESS; } }
日志 (会自动选择一个消费者消费)
顺序消费
import com.alibaba.fastjson2.JSONObject; import com.example.mq.variable.RocketMQVariable; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.annotation.RocketMQMessageListener; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.core.RocketMQListener; import org.springframework.stereotype.Service; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Objects; @Slf4j @Service @RocketMQMessageListener(topic = RocketMQVariable.FIFO_TOPIC, consumerGroup = RocketMQVariable.FIFO_GROUP) public class PushConsumerFifoService implements RocketMQListener { @Override public ConsumeResult consume(MessageView messageView) { log.info("顺序消息, messageView={}", messageView); ByteBuffer body = messageView.getBody(); String message = StandardCharsets.UTF_8.decode(body).toString(); log.info("顺序消息, message={}", message); Map<String, String> properties = messageView.getProperties(); log.info("顺序消息, properties={}", JSONObject.toJSONString(properties)); if (Objects.nonNull(messageView.getProperties().get("OrderId"))) { log.info("顺序消息, message={}", messageView); return ConsumeResult.FAILURE; } log.info("rollback transaction"); return ConsumeResult.SUCCESS; } }
日志
定时/延时任务消息 (可自定义时间)
import com.alibaba.fastjson2.JSONObject; import com.example.mq.variable.RocketMQVariable; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.annotation.RocketMQMessageListener; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.core.RocketMQListener; import org.springframework.stereotype.Service; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Objects; @Slf4j @Service @RocketMQMessageListener(topic = RocketMQVariable.DELAY_TOPIC, consumerGroup = RocketMQVariable.DELAY_GROUP) public class PushConsumerDelayService implements RocketMQListener { @Override public ConsumeResult consume(MessageView messageView) { log.info("定时/延时消息, messageView={}", messageView); ByteBuffer body = messageView.getBody(); String message = StandardCharsets.UTF_8.decode(body).toString(); log.info("定时/延时消息, message={}", message); Map<String, String> properties = messageView.getProperties(); log.info("定时/延时消息, properties={}", JSONObject.toJSONString(properties)); if (Objects.nonNull(messageView.getProperties().get("OrderId"))) { log.info("定时/延时消息, message={}", messageView); return ConsumeResult.FAILURE; } log.info("定时/延时消息 消费完成"); return ConsumeResult.SUCCESS; } }
日志
事务处理情况1
/** * 发送事务消息 * * @param topic * @param message * @throws ClientException */ public Pair<SendReceipt, Transaction> sendMessageInTransaction(String topic, Object message) { try { Pair<SendReceipt, Transaction> pair = template.sendMessageInTransaction(topic, message); SendReceipt sendReceipt = pair.getSendReceipt(); Transaction transaction = pair.getTransaction(); log.info("事务消息发送完成 transaction = {} , sendReceipt = {}", JSONObject.toJSONString(transaction), JSONObject.toJSONString(sendReceipt)); log.info("消息id : {} ", sendReceipt.getMessageId()); //如果这里提交了事务 if (doLocalTransaction(1)) { log.info("本地事务执行成功"); transaction.commit(); } else { log.info("本地事务执行失败"); transaction.rollback(); } return pair; } catch (ClientException e) { throw new RuntimeException(e); } } boolean doLocalTransaction(int number) { // 本地事务逻辑 数据库操作 log.info("执行本地事务 : {}", number); return number > 5; }
如果在工具类里面提交了事务 transaction.commit();下面的就不会进入处理了
@Slf4j @RocketMQTransactionListener public class PushConsumerTransactionTemplate implements RocketMQTransactionChecker { @Override public TransactionResolution check(MessageView messageView) { log.info("Receive transactional message check, message={}", messageView); return null; } }
而是直接消费了
import com.example.mq.variable.RocketMQVariable; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.annotation.RocketMQMessageListener; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.core.RocketMQListener; import org.springframework.stereotype.Service; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Objects; @Slf4j @Service @RocketMQMessageListener(topic = RocketMQVariable.TRANSACTION_TOPIC, consumerGroup = RocketMQVariable.TRANSACTION_GROUP) public class PushConsumerTransactionService implements RocketMQListener { @Override public ConsumeResult consume(MessageView messageView) { log.info("事务消息消费, messageView={}", messageView); ByteBuffer body = messageView.getBody(); String message = StandardCharsets.UTF_8.decode(body).toString(); log.info("事务消息消费, message={}", message); if (Objects.isNull(message)) { log.info("事务消息 消费失败"); return ConsumeResult.FAILURE; } log.info("事务消息 消费成功"); return ConsumeResult.SUCCESS; } }
日志
事务处理情况2
/** * 发送事务消息 这里只发消息 不参与事务提交 * * @param topic * @param message * @throws ClientException */ public void sendMessageInTransaction(String topic, Object message) { try { template.sendMessageInTransaction(topic, message); } catch (ClientException e) { throw new RuntimeException(e); } }
使用官方事务处理机制处理事务
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.annotation.RocketMQTransactionListener; import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.apis.producer.TransactionResolution; import org.apache.rocketmq.client.core.RocketMQTransactionChecker; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Objects; @Slf4j @RocketMQTransactionListener public class PushConsumerTransactionTemplate implements RocketMQTransactionChecker { @Override public TransactionResolution check(MessageView messageView) { log.info("事务消息 事务操作, messageView={}", messageView); ByteBuffer body = messageView.getBody(); String message = StandardCharsets.UTF_8.decode(body).toString(); log.info("事务消息 事务操作, message={}", message); String messageId = messageView.getMessageId().toString(); if (Objects.nonNull(messageId)) { log.info("事务消息 事务操作, messageId={}", messageId); return TransactionResolution.COMMIT; } log.info("事务消息消费失败"); return TransactionResolution.ROLLBACK; } }
事务提交后才会被消费
import com.example.mq.variable.RocketMQVariable; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.annotation.RocketMQMessageListener; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.core.RocketMQListener; import org.springframework.stereotype.Service; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Objects; @Slf4j @Service @RocketMQMessageListener(topic = RocketMQVariable.TRANSACTION_TOPIC, consumerGroup = RocketMQVariable.TRANSACTION_GROUP) public class PushConsumerTransactionService implements RocketMQListener { @Override public ConsumeResult consume(MessageView messageView) { log.info("事务消息消费, messageView={}", messageView); ByteBuffer body = messageView.getBody(); String message = StandardCharsets.UTF_8.decode(body).toString(); log.info("事务消息消费, message={}", message); if (Objects.isNull(message)) { log.info("事务消息 消费失败"); return ConsumeResult.FAILURE; } log.info("事务消息 消费成功"); return ConsumeResult.SUCCESS; } }
日志
到此这篇关于SprigBoot整合rocketmq-v5-client-spring-boot的详细过程的文章就介绍到这了,更多相关SprigBoot rocketmq-v5-client-spring-boot内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!