SpringBoot整合RocketMQ实现发送同步消息
作者:嘉禾嘉宁papa
一、简介
RocketMQ 是一款开源的分布式消息中间件,由阿里巴巴开源。由阿里巴巴集团开发并开源,目前被捐赠给Apache基金会,并入选孵化器项目,2017年从Apache基金会毕业后,RocketMQ被指定为顶级项目(TLP)。它具有高可用性、高性能、低延迟等特点,广泛应用于阿里巴巴集团内部以及众多外部企业的业务系统中。
1.1、RocketMQ 主要特点
- 分布式架构:RocketMQ 是基于分布式架构设计的,支持水平扩展,可以灵活地部署和扩展。
- 高可用性:RocketMQ 支持主从架构,消息存储采用主从复制的方式,保证了消息的高可用性和可靠性。
- 高性能:RocketMQ 在消息存储、消息传输等方面进行了优化,具有较高的吞吐量和较低的延迟。
- 丰富的特性:RocketMQ 提供了丰富的特性,包括顺序消息、延迟消息、事务消息、消息过滤、消息轨迹等,满足了不同场景下的需求。
- 监控和管理:RocketMQ 提供了丰富的监控和管理功能,可以通过控制台或者监控工具实时监控消息的状态和性能指标。
- 开源社区支持:RocketMQ 是一款开源的消息中间件,拥有活跃的开源社区,提供了丰富的文档和示例,为用户提供了便利的支持和帮助。
1.2、RocketMQ 核心组件
RocketMQ 的架构主要包括以下核心组件:
- NameServer:NameServer 是 RocketMQ 的路由管理组件,负责管理 Broker 的路由信息。客户端在发送消息或者消费消息时,需要先从 NameServer 获取相应的 Broker 地址,然后再与 Broker 建立连接。
- Broker:Broker 是 RocketMQ 的消息存储和传输组件,负责存储消息以及向消费者传递消息。一个 RocketMQ 系统可以包含多个 Broker,每个 Broker 负责存储一部分消息数据,并提供相应的消息服务。
- Producer:Producer 是消息的生产者,负责产生消息并将消息发送到 Broker 中。Producer 将消息发送到指定的 Topic,然后由 Broker 存储并传递给相应的消费者。
- Consumer:Consumer 是消息的消费者,负责订阅并消费 Broker 中的消息。Consumer 通过订阅指定的 Topic 来接收消息,并进行相应的业务处理。
- Topic:Topic 是消息的主题,用于对消息进行分类和管理。Producer 将消息发送到指定的 Topic,而 Consumer 则订阅相应的 Topic 来接收消息。
- Message Queue:Message Queue 是消息队列,用于存储消息。每个 Topic 可以分为多个 Message Queue,每个 Message Queue 保存了一部分消息,多个 Message Queue 组成了一个 Topic 的完整消息存储。
总的来说,RocketMQ是阿里推出的优秀开源分布式消息中间件,具有高性能、高可靠、高并发等优点,是构建分布式系统不可或缺的基础组件之一。
1.3、概念
同步发送指的是生产者在发送消息后,会阻塞当前线程,直到收到Broker的发送响应后才返回,响应中包含消息是否发送成功的状态。同步发送的优缺点如下:
优点:
- 发送可靠性高,能够立即得知发送结果
- 发送反馈直接返回,不需要通过专门的监听通道接收结果
缺点:
- 当Broker发生故障或者网络延迟时,生产者线程会被阻塞,影响发送效率
- 对于发送端吞吐量较高的场景,同步发送存在性能瓶颈
1.4、场景
同步发送对消息可靠性传输有较高要求的场景,如通知消息等,发送端对发送吞吐量要求不是特别高的场景
二、父工程
因为这个系统会有很多RocketMQ的知识,我准备拆开写,避免频繁的导入依赖和包,我这里采用分模块开发。
2.1、父工程依赖
pom.xml
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.0</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.3</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.12.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.26</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>com.alian</groupId> <artifactId>common-rocketmq-dto</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency> </dependencies> </dependencyManagement>
2.2、公共模块
这里需要注意的是下面这个依赖,其实就是一个会员类。如果你传输的不是对象也可以不要,我这里演示就创建了。
<dependency> <groupId>com.alian</groupId> <artifactId>common-rocketmq-dto</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency>
Member.java
package com.alian.common; import lombok.Data; import java.util.Date; @Data public class Member { private Long id; private String memberName; private int age; private Date birthday; }
后续的项目都会用到这个父工程和公共模块,后面就不再过多说明了。
三、生产者
我们在父工程下新建一个模块用于发送同步消息。
3.1 Maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>rocketmq</artifactId> <groupId>com.alian</groupId> <version>1.0.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>01-send-sync-message</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.alian</groupId> <artifactId>common-rocketmq-dto</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency> </dependencies> </project>
3.2 application配置
application.properties
server.port=8001 # rocketmq地址 rocketmq.name-server=192.168.0.234:9876 # 默认的生产者组 rocketmq.producer.group=sync_group # 发送同步消息超时时间 rocketmq.producer.send-message-timeout=3000 # 用于设置在消息发送失败后,生产者是否尝试切换到下一个服务器。设置为 true 表示启用,在发送失败时尝试切换到下一个服务器 rocketmq.producer.retry-next-server=true # 用于指定消息发送失败时的重试次数 rocketmq.producer.retry-times-when-send-failed=3 # 设置消息压缩的阈值,为0表示禁用消息体的压缩 rocketmq.producer.compress-message-body-threshold=0
3.3 发送字符串消息
消息的发送比较简单,我们直接引用
@Autowired private RocketMQTemplate rocketMQTemplate;
此对象封装了一系列的消息发送方法。
SendStrMessageTest.java
package com.alian.sync; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; @Slf4j @SpringBootTest public class SendStrMessageTest { @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void syncSendStringMessage() { String topic = "string_message_topic"; String message = "我是一条同步文本消息:syncSendStringMessage"; SendResult sendResult = rocketMQTemplate.syncSend(topic, message); log.info("同步发送返回的结果:{}", sendResult); } @Test public void syncSendStringMessageWithBuilder() { String topic = "string_message_topic"; String message = "我是一条同步的文本消息:syncSendStringMessageWithBuilder"; Message<String> msg = MessageBuilder.withPayload(message) // 消息类型 .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") .build(); SendResult sendResult = rocketMQTemplate.syncSend(topic, msg); log.info("同步发送返回的结果:{}", sendResult); } @Test public void syncSendStringMessageWithBuilderTimeOut() { String topic = "string_message_topic"; String message = "我是一条同步的文本消息:syncSendStringMessageWithBuilderTimeOut"; Message<String> msg = MessageBuilder.withPayload(message) // 消息类型 .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") .build(); // 3秒发送超时 SendResult sendResult = rocketMQTemplate.syncSend(topic, msg, 3000); log.info("同步发送返回的结果:{}", sendResult); } @AfterEach public void waiting() { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } } }
这里需要注意的是,我发送字符串消息的时候,topic都是 string_message_topic,因为我这里是本地开发环境,并且我在配置文件中配置了
autoCreateTopicEnable=true
当该参数设置为true时,如果生产者在发送消息时使用了一个在Broker端不存在的Topic,则Broker会自动创建该Topic,允许消息正常发送和存储。
当该参数设置为false时,如果生产者使用了不存在的Topic,则Broker会直接拒绝发送请求,不会自动创建Topic。
官方对该参数的解释是:自动创建Topic的特性主要是为了方便,但也可能带来一些风险,比如有的应用程序由于编码上的低级错误导致无意中创建了大量的Topic。因此,生产环境建议将该参数设置为false,只有手工创建所需的Topic。
3.4 发送JSON消息
SendJsonMessageTest.java
package com.alian.sync; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import java.util.HashMap; import java.util.Map; @Slf4j @SpringBootTest public class SendJsonMessageTest { @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void sendJsonMessage() { String topic = "json_message_topic"; JSONObject json = new JSONObject(); json.put("name", "Alian"); json.put("age", "28"); json.put("hobby", "java"); SendResult sendResult = rocketMQTemplate.syncSend(topic, json); log.info("同步发送返回的结果:{}", sendResult); } @Test public void sendJsonMessageWithBuilder() { String topic = "json_message_topic"; JSONObject json = new JSONObject(); json.put("name", "Alian"); json.put("age", "28"); json.put("hobby", "java"); Message<JSONObject> msg = MessageBuilder.withPayload(json) // 消息类型 .setHeader(MessageHeaders.CONTENT_TYPE, "application/json") .build(); SendResult sendResult = rocketMQTemplate.syncSend(topic, msg); log.info("同步发送返回的结果:{}", sendResult); } @Test public void sendMapMessage() { String topic = "json_message_topic"; Map<String, String> map = new HashMap<>(); map.put("1", "java"); map.put("2", "go"); map.put("3", "c"); map.put("4", "vue"); map.put("5", "react"); SendResult sendResult = rocketMQTemplate.syncSend(topic, map); log.info("同步发送返回的结果:{}", sendResult); } @AfterEach public void waiting() { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } } }
这里需要注意的是,我发送JSON消息的时候,topic都是 json_message_topic,这里Map消息也能被JSONObject消费。
3.5 发送Java对象消息
SendJavaObjectMessageTest.java
package com.alian.sync; import com.alian.common.Member; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import java.util.Date; import java.util.HashMap; import java.util.Map; @Slf4j @SpringBootTest public class SendJavaObjectMessageTest { @Autowired private RocketMQTemplate rocketMQTemplate; @Test public void sendJavaObjectMessage() { String topic = "java_object_message_topic"; Member member = new Member(); member.setId(10086L); member.setMemberName("Alian"); member.setAge(28); member.setBirthday(new Date()); SendResult sendResult = rocketMQTemplate.syncSend(topic, member); log.info("同步发送返回的结果:{}", sendResult); } @Test public void sendJavaObjectMessageWithBuilder() { String topic = "java_object_message_topic"; Member member = new Member(); member.setId(10086L); member.setMemberName("Alian"); member.setAge(28); member.setBirthday(new Date()); Message<Member> msg = MessageBuilder.withPayload(member) // 设置消息类型 .setHeader(MessageHeaders.CONTENT_TYPE, "application/json") .build(); SendResult sendResult = rocketMQTemplate.syncSend(topic, msg); log.info("同步发送返回的结果:{}", sendResult); } @AfterEach public void waiting() { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } } }
这里需要注意的是,我发送java对象消息的时候,topic都是 java_object_message_topic
四、消费者
我们在父工程下新建一个模块用于发送同步消息。
4.1 Maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>rocketmq</artifactId> <groupId>com.alian</groupId> <version>1.0.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>08-comsume-concurrent</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.alian</groupId> <artifactId>common-rocketmq-dto</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency> </dependencies> </project>
4.2 application配置
application.properties
server.port=8008 # rocketmq地址 rocketmq.name-server=192.168.0.234:9876 # 默认的消费者组 rocketmq.consumer.group=CONCURRENT_CONSUMER_GROUP # 批量拉取消息的数量 rocketmq.consumer.pull-batch-size=10 # 集群消费模式 rocketmq.consumer.message-model=CLUSTERING
实际上对于本文来说,下面两个配置不用配置,也不会生效。
# 默认的消费者组 rocketmq.consumer.group=CONCURRENT_CONSUMER_GROUP # 集群消费模式 rocketmq.consumer.message-model=CLUSTERING
因为优先的是@RocketMQMessageListener 注解中设置 consumerGroup 和messageModel 参数。
4.3 消费字符串消息
StringMessageConsumer.java
package com.alian.concurrent; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Slf4j @Service @RocketMQMessageListener(topic = "string_message_topic", consumerGroup = "CONCURRENT_GROUP_STRING", consumeThreadNumber = 1) public class StringMessageConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("字符串消费者接收到的消息: {}", message); // 处理消息的业务逻辑 } }
如果要消费消息,我们需要实现RocketMQListener<T>,实现onMessage方法。发送的是什么对象,我们接收就是什么对象,也就是T是什么对象。生产者发送的字符串消息,我们这里就用String接收,也就是RocketMQListener<String>。
package org.apache.rocketmq.spring.core; public interface RocketMQListener<T> { void onMessage(T var1); }
同时加上@RocketMQMessageListener注解,主要用到三个注解
- topic:指定该消费者订阅的Topic名称,可以是单个Topic,也可以是用||分隔的多个Topic
- consumerGroup:指定该消费者所属的消费组名称,消费组用于组织同一类消费实例,相同消费组的消费实例可以消费完全相同的消息,RocketMQ 通过消费者组(Consumer Group)来维护不同消费者的消费进度。 每个消费者组都有一个消费进度(offset),用于标记该组下的消费者在某个主题(Topic)和队列(Queue)上已经消费到的位置。
需要注意的是:如果Topic不存在,只有在生产者发送消息时,并且autoCreateTopicEnable设置为true的情况下,Broker端才会自动创建该Topic。消费者启动时,即使autoCreateTopicEnable=true,也不会自动创建不存在的Topic。
具体来说:
- 生产者启动并发送消息到一个不存在的Topic时:
- 如果autoCreateTopicEnable=true,Broker会自动创建该Topic,允许消息发送成功
- 如果autoCreateTopicEnable=false,Broker会拒绝消息发送,报错该Topic不存在
- 消费者启动订阅一个不存在的Topic时:
- 无论autoCreateTopicEnable的值是true还是false,Broker都不会自动创建该Topic
- 消费者会启动成功,但获取不到消息,处于持续等待状态
所以生产者发送消息时才可能自动创建Topic,而消费者启动时是不会自动创建Topic的。
4.4 消费JSON消息
JsonMessageConsumer.java
package com.alian.concurrent; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Slf4j @Service @RocketMQMessageListener(topic = "json_message_topic", consumerGroup = "CONCURRENT_GROUP_JSON") public class JsonMessageConsumer implements RocketMQListener<JSONObject> { @Override public void onMessage(JSONObject json) { log.info("json消费者接收到的消息: {}", json); // 处理消息的业务逻辑 } }
生产者发送的JSONObject消息,我们这里就用JSONObject接收,也就是RocketMQListener<JSONObject>
4.5 消费Java对象消息
JavaObjectMessageConsumer.java
package com.alian.concurrent; import com.alian.common.Member; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Slf4j @Service @RocketMQMessageListener(topic = "java_object_message_topic", consumerGroup = "CONCURRENT_GROUP_JAVA_OBJECT") public class JavaObjectMessageConsumer implements RocketMQListener<Member> { @Override public void onMessage(Member member) { // 因为发序列化的原因Member必须是同一个包 log.info("java对象消费者接收到的消息: {}", member); // 处理消息的业务逻辑 } }
生产者发送的Member消息,我们这里就用Member接收,也就是RocketMQListener<Member>。
这里需要再次说明下发送java对象消息时,因为反序列的原因,所以生产者和消费者使用的是公共包里同一个对象,也就是发送和接收的对象的包路径要一致。
五、部分运行结果
5.1、字符串消息
同步发送返回的结果:SendResult [sendStatus=SEND_OK, msgId=7F000001339418B4AAC22DB848C70000, offsetMsgId=C0A800EA00002A9F0000000000000000, messageQueue=MessageQueue [topic=string_message_topic, brokerName=broker-a, queueId=1], queueOffset=0] 字符串消费者接收到的消息: 我是一条同步文本消息:syncSendStringMessage
5.2、json消息
同步发送返回的结果:SendResult [sendStatus=SEND_OK, msgId=7F00000137FC18B4AAC22DB9D42F0000, offsetMsgId=C0A800EA00002A9F0000000000000146, messageQueue=MessageQueue [topic=json_message_topic, brokerName=broker-a, queueId=1], queueOffset=0] json消费者接收到的消息: {"name":"Alian","age":"28","hobby":"java"}
5.3、java对象消息
同步发送返回的结果:SendResult [sendStatus=SEND_OK, msgId=7F000001098C18B4AAC22DBACEB50000, offsetMsgId=C0A800EA00002A9F0000000000000270, messageQueue=MessageQueue [topic=java_object_message_topic, brokerName=broker-a, queueId=0], queueOffset=0] java对象消费者接收到的消息: Member(id=10086, memberName=Alian, age=28, birthday=Sat Mar 09 21:06:57 CST 2024)
5.4、现象说明
为什么在rocketmq中当autoCreateTopicEnable=true,先启动消费者,然后生产者第一次向一个未创建的topic中发送消息时,消息发送成功了(马上返回成功了),但是消费者要等一段时间才能收到?这种现象的原因是RocketMQ在创建Topic时,存在一个延迟同步的过程。具体来说:
- 当生产者发送消息到一个不存在的Topic时,Broker会自动创建该Topic。
- 但是Broker创建Topic后,并不会立即将其元数据信息同步给所有的消费者,而是异步延迟同步。
- 生产者发送消息时,Broker会立即返回发送成功响应,因为它只需将消息持久化存储即可。
- 消费者在启动时已订阅了该Topic,但由于元数据未同步,暂时无法获知该Topic的路由信息,所以无法立即开始消费。
- 过了一段时间后,Broker将新创建的Topic元数据同步给了消费者,消费者才开始从该Topic拉取并消费消息。
这种延迟同步机制是RocketMQ的一个设计,目的是为了减小创建Topic时对消费者的影响,避免大量消费者同时更新元数据造成系统抖动。
我们可以通过调整Broker的配置项来控制这个延迟同步时间,比如:
- brokerTopicPutEnable=true开启自动创建Topic功能
- topicDelayEnable=true开启延时创建Topic功能
- topicDelayOffsetEnabled=true开启延时创建Topic偏移量同步功能
通过调小topicDelayOffsetInterval可以缩短元数据同步延迟时间,但也会增加系统开销。
所以这种现象实际上是RocketMQ的一个正常设计行为,目的是为了系统整体的健壮性和可用性。如果应用对延迟时间不太敏感,保持默认配置即可;如果对延迟敏感,可以适当调小延迟同步时间。
以上就是SpringBoot整合RocketMQ实现发送同步消息的详细内容,更多关于SpringBoot RocketMQ发送同步消息的资料请关注脚本之家其它相关文章!