java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot RocketMQ批量发送消息

SpringBoot整合RocketMQ批量发送消息的实现代码

作者:嘉禾嘉宁papa

这篇文章主要介绍了SpringBoot整合RocketMQ批量发送消息的实现,文中通过代码示例讲解的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下

一、简介

今天我们讲讲如何批量发送消息,主要还是使用方法RocketMQTemplatesyncSend方法。

1.1、特点

批量发送和单条发送消息的主要区别有以下几点:

二、Maven依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rocketmq</artifactId>
        <groupId>com.alian</groupId>
        <version>1.0.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>06-send-batched-message</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.alian</groupId>
            <artifactId>common-rocketmq-dto</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

父工程已经在我上一篇文章里,通用公共包也在我上一篇文章里有说明,包括消费者。具体参考:SpringBoot整合RocketMQ实现发送同步消息_java_脚本之家 (jb51.net)

三、application配置

application.properties

server.port=8005

# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默认的生产者组
rocketmq.producer.group=batched_group
# 发送同步消息超时时间
rocketmq.producer.send-message-timeout=3000
# 用于设置在消息发送失败后,生产者是否尝试切换到下一个服务器。设置为 true 表示启用,在发送失败时尝试切换到下一个服务器
rocketmq.producer.retry-next-server=true
# 用于指定消息发送失败时的重试次数
rocketmq.producer.retry-times-when-send-failed=3
# 设置消息压缩的阈值,为0表示禁用消息体的压缩
rocketmq.producer.compress-message-body-threshold=0

四、批量发送

在 RocketMQ 中,RocketMQTemplatesyncSend方法,它允许你批量发送同步消息,主要参数:

测试类都引入依赖

	@Autowired
    private RocketMQTemplate rocketMQTemplate;

4.1、同步消息

    @Test
    public void syncSendBatchStringMessagesWithBuilder() {
        String topic = "string_message_topic";
        String message = "超级喜欢Golang语言:";
        List<Message<String>> messageList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message<String> rocketMessage = MessageBuilder.withPayload(message + i)
                    // 设置消息类型
                    .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain")
                    .build();
            // 加入到列表
            messageList.add(rocketMessage);
        }
        // 使用syncSend发送批量消息
        SendResult sendResult = rocketMQTemplate.syncSend(topic, messageList);
        log.info("同步批量发送普通消息结果:{}",sendResult);
    }

运行结果:

[_GROUP_STRING_1] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:0
[_GROUP_STRING_2] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:1
[_GROUP_STRING_4] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:3
[_GROUP_STRING_5] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:4
[_GROUP_STRING_3] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:2
[_GROUP_STRING_6] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:5
[_GROUP_STRING_7] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:6
[_GROUP_STRING_8] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:7
[_GROUP_STRING_9] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:8
[GROUP_STRING_10] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 同步批量发送普通消息:9

4.2、异步消息

    @Test
    public void asyncSendBatchStringMessageWithBuilder() {
        String topic = "string_message_topic";
        String message = "Alian超级喜欢Golang语言:";
        List<Message<String>> messageList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message<String> rocketMessage = MessageBuilder.withPayload(message + i)
                    // 设置消息类型
                    .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain")
                    .build();
            // 加入到列表
            messageList.add(rocketMessage);
        }
        // 使用asyncSend发送批量消息
        rocketMQTemplate.asyncSend(topic, messageList, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 异步发送成功的回调逻辑
                log.info("异步批量发送普通消息成功: " + sendResult);
            }

            @Override
            public void onException(Throwable e) {
                // 异步发送失败的回调逻辑
                log.info("异步批量发送普通消息失败: " + e.getMessage());
            }
        });
    }

运行结果:

[_GROUP_STRING_1] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:0
[_GROUP_STRING_8] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:1
[_GROUP_STRING_3] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:7
[_GROUP_STRING_6] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:4
[_GROUP_STRING_9] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:2
[_GROUP_STRING_5] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:6
[GROUP_STRING_10] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:3
[_GROUP_STRING_2] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:8
[_GROUP_STRING_4] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:9
[_GROUP_STRING_7] c.a.concurrent.StringMessageConsumer     : 字符串消费者接收到的消息: 异步批量发送普通消息:5

4.3、顺序消息

在 RocketMQ 中,RocketMQTemplatesyncSendOrderly方法,它允许你批量发送同步消息,主要参数:

    @Test
    public void syncSendBatchOrderlyStringMessagesWithBuilder() {
        String topic = "ordered_string_message_topic";
        String message = "同步批量发送顺序消息,超级喜欢Go语言:";
        List<Message<String>> messageList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Message<String> rocketMessage = MessageBuilder.withPayload(message + i)
                    // 设置消息类型
                    .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain")
                    .build();
            // 加入到列表
            messageList.add(rocketMessage);
        }
        // 使用syncSendOrderly发送批量顺序消息,消费者线程设置为1
        SendResult sendResult = rocketMQTemplate.syncSendOrderly(topic, messageList, "alian_sync_ordered");
        log.info("批量发送顺序消息发送结果:{}",sendResult);
    }

运行结果:

[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:0
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:1
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:2
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:3
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:4
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:5
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:6
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:7
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:8
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: 同步批量发送顺序消息,超级喜欢Go语言:9

所以我之前说批量发送消息的topic不一样,因为

@Slf4j
@Service
@RocketMQMessageListener(topic = "ordered_string_message_topic", 
						consumerGroup = "ORDERED_GROUP_STRING", 
						consumeMode = ConsumeMode.ORDERLY)
public class StringMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("字符串消费者接收到的消息: {}", message);
        // 处理消息的业务逻辑
    }
}

顺序消息要顺序消费,也就是每次是一个线程去消费,相当于单线程,也就有序了。关键就是配置了:consumeMode = ConsumeMode.ORDERLY

当然,我们也可以把消费者线程数设置为 consumeThreadNumber = 1,也就是单线程消费了,从而确保了消息的顺序消费(指单实例):

@RocketMQMessageListener(topic = "ordered_string_message_topic", 
						consumerGroup = "CONCURRENT_GROUP_STRING",
						consumeThreadNumber = 1)
public class StringMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("字符串消费者接收到的消息: {}", message);
        // 处理消息的业务逻辑
    }
}

4.4、关于异步批量发送

有可能你会写下面的异步批量发送顺序消息

	@Test
    public void asyncSendBatchOrderlyStringMessageWithBuilder2() {
        String topic = "ordered_string_message_topic";
        String message = "Alian超级喜欢Golang语言:";
        List<String> messageList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            // 加入到列表
            messageList.add(message + i);
        }
        // 使用 asyncSendOrderly 发送批量消息
        rocketMQTemplate.asyncSendOrderly(topic, messageList, "alian_async_ordered", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 异步发送成功的回调逻辑
                log.info("异步消息发送字符串消息成功: " + sendResult);
            }

            @Override
            public void onException(Throwable e) {
                // 异步发送失败的回调逻辑
                log.info("异步消息发送字符串消息失败: " + e.getMessage());
            }
        });
    }

其实这个是不对的,最终的结果是一个把你这里的messageList,当做了一个消息列表接收了,如下结果:

[GROUP_STRING_18] com.alian.ordered.StringMessageConsumer  : 字符串消费者接收到的消息: ["Alian超级喜欢Golang语言:0","Alian超级喜欢Golang语言:1","Alian超级喜欢Golang语言:2","Alian超级喜欢Golang语言:3","Alian超级喜欢Golang语言:4","Alian超级喜欢Golang语言:5","Alian超级喜欢Golang语言:6","Alian超级喜欢Golang语言:7","Alian超级喜欢Golang语言:8","Alian超级喜欢Golang语言:9"]

RocketMQ对于单条消息和批量消息在队列中是如何被处理的?

4.5、结论

为此我测试了多次,得到结论:

五、其他

既然知道批量消息是作为一个整体的,那么肯定就会对消息大小有限制,在 Apache RocketMQ 中,批量消息的大小默认限制是4MB。这意味着,你不能发送总大小超过4MB的批量消息。

如果你想修改这个限制,你需要修改RocketMQ的配置。具体的修改方法如下:

虽然你可以通过修改配置来增加批量消息的最大大小,但是你应该谨慎地考虑这个决定。增加批量消息的最大大小可能会增加Broker的内存使用量,并可能影响到消息的发送和接收性能。因此,在修改这个配置之前,你应该先考虑你的应用的需求和Broker的性能。

因为优先的是@RocketMQMessageListener 注解中设置 consumerGroup 和messageModel 参数。

以上就是SpringBoot整合RocketMQ批量发送消息的实现的详细内容,更多关于SpringBoot RocketMQ批量发送消息的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文