java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot RocketMQ发送同步消息

SpringBoot整合RocketMQ实现发送同步消息

作者:嘉禾嘉宁papa

RocketMQ 是一款开源的分布式消息中间件,由阿里巴巴开源,它具有高可用性、高性能、低延迟等特点,广泛应用于阿里巴巴集团内部以及众多外部企业的业务系统中,本文给大家介绍了SpringBoot整合RocketMQ实现发送同步消息,需要的朋友可以参考下

一、简介

RocketMQ 是一款开源的分布式消息中间件,由阿里巴巴开源。由阿里巴巴集团开发并开源,目前被捐赠给Apache基金会,并入选孵化器项目,2017年从Apache基金会毕业后,RocketMQ被指定为顶级项目(TLP)。它具有高可用性、高性能、低延迟等特点,广泛应用于阿里巴巴集团内部以及众多外部企业的业务系统中。

1.1、RocketMQ 主要特点

1.2、RocketMQ 核心组件

RocketMQ 的架构主要包括以下核心组件:

总的来说,RocketMQ是阿里推出的优秀开源分布式消息中间件,具有高性能、高可靠、高并发等优点,是构建分布式系统不可或缺的基础组件之一。

1.3、概念

同步发送指的是生产者在发送消息后,会阻塞当前线程,直到收到Broker的发送响应后才返回,响应中包含消息是否发送成功的状态。同步发送的优缺点如下:

优点:

缺点:

1.4、场景

同步发送对消息可靠性传输有较高要求的场景,如通知消息等,发送端对发送吞吐量要求不是特别高的场景

二、父工程

因为这个系统会有很多RocketMQ的知识,我准备拆开写,避免频繁的导入依赖和包,我这里采用分模块开发。

2.1、父工程依赖

pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.6.0</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.3</version>
    </dependency>

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.26</version>
    </dependency>

</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.alian</groupId>
            <artifactId>common-rocketmq-dto</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
</dependencyManagement>

2.2、公共模块

这里需要注意的是下面这个依赖,其实就是一个会员类。如果你传输的不是对象也可以不要,我这里演示就创建了。

<dependency>
    <groupId>com.alian</groupId>
    <artifactId>common-rocketmq-dto</artifactId>
    <version>1.0.0-SNAPSHOT</version>
</dependency>

Member.java

package com.alian.common;

import lombok.Data;

import java.util.Date;

@Data
public class Member {

    private Long id;

    private String memberName;

    private int age;

    private Date birthday;
}

后续的项目都会用到这个父工程和公共模块,后面就不再过多说明了。

三、生产者

我们在父工程下新建一个模块用于发送同步消息。

3.1 Maven依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rocketmq</artifactId>
        <groupId>com.alian</groupId>
        <version>1.0.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>01-send-sync-message</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.alian</groupId>
            <artifactId>common-rocketmq-dto</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
</project>

3.2 application配置

application.properties

server.port=8001

# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默认的生产者组
rocketmq.producer.group=sync_group
# 发送同步消息超时时间
rocketmq.producer.send-message-timeout=3000
# 用于设置在消息发送失败后,生产者是否尝试切换到下一个服务器。设置为 true 表示启用,在发送失败时尝试切换到下一个服务器
rocketmq.producer.retry-next-server=true
# 用于指定消息发送失败时的重试次数
rocketmq.producer.retry-times-when-send-failed=3
# 设置消息压缩的阈值,为0表示禁用消息体的压缩
rocketmq.producer.compress-message-body-threshold=0

3.3 发送字符串消息

消息的发送比较简单,我们直接引用

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

此对象封装了一系列的消息发送方法。

SendStrMessageTest.java

package com.alian.sync;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;

@Slf4j
@SpringBootTest
public class SendStrMessageTest {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void syncSendStringMessage() {
        String topic = "string_message_topic";
        String message = "我是一条同步文本消息:syncSendStringMessage";
        SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
        log.info("同步发送返回的结果:{}", sendResult);
    }

    @Test
    public void syncSendStringMessageWithBuilder() {
        String topic = "string_message_topic";
        String message = "我是一条同步的文本消息:syncSendStringMessageWithBuilder";
        Message<String> msg = MessageBuilder.withPayload(message)
                // 消息类型
                .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain")
                .build();
        SendResult sendResult = rocketMQTemplate.syncSend(topic, msg);
        log.info("同步发送返回的结果:{}", sendResult);
    }

    @Test
    public void syncSendStringMessageWithBuilderTimeOut() {
        String topic = "string_message_topic";
        String message = "我是一条同步的文本消息:syncSendStringMessageWithBuilderTimeOut";
        Message<String> msg = MessageBuilder.withPayload(message)
                // 消息类型
                .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain")
                .build();
        // 3秒发送超时
        SendResult sendResult = rocketMQTemplate.syncSend(topic, msg, 3000);
        log.info("同步发送返回的结果:{}", sendResult);
    }

    @AfterEach
    public void waiting() {
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

这里需要注意的是,我发送字符串消息的时候,topic都是 string_message_topic,因为我这里是本地开发环境,并且我在配置文件中配置了

autoCreateTopicEnable=true

当该参数设置为true时,如果生产者在发送消息时使用了一个在Broker端不存在的Topic,则Broker会自动创建该Topic,允许消息正常发送和存储。

当该参数设置为false时,如果生产者使用了不存在的Topic,则Broker会直接拒绝发送请求,不会自动创建Topic。

官方对该参数的解释是:自动创建Topic的特性主要是为了方便,但也可能带来一些风险,比如有的应用程序由于编码上的低级错误导致无意中创建了大量的Topic。因此,生产环境建议将该参数设置为false,只有手工创建所需的Topic。

3.4 发送JSON消息

SendJsonMessageTest.java

package com.alian.sync;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@SpringBootTest
public class SendJsonMessageTest {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void sendJsonMessage() {
        String topic = "json_message_topic";
        JSONObject json = new JSONObject();
        json.put("name", "Alian");
        json.put("age", "28");
        json.put("hobby", "java");
        SendResult sendResult = rocketMQTemplate.syncSend(topic, json);
        log.info("同步发送返回的结果:{}", sendResult);
    }


    @Test
    public void sendJsonMessageWithBuilder() {
        String topic = "json_message_topic";
        JSONObject json = new JSONObject();
        json.put("name", "Alian");
        json.put("age", "28");
        json.put("hobby", "java");
        Message<JSONObject> msg = MessageBuilder.withPayload(json)
                // 消息类型
                .setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
                .build();
        SendResult sendResult = rocketMQTemplate.syncSend(topic, msg);
        log.info("同步发送返回的结果:{}", sendResult);
    }

    @Test
    public void sendMapMessage() {
        String topic = "json_message_topic";
        Map<String, String> map = new HashMap<>();
        map.put("1", "java");
        map.put("2", "go");
        map.put("3", "c");
        map.put("4", "vue");
        map.put("5", "react");
        SendResult sendResult = rocketMQTemplate.syncSend(topic, map);
        log.info("同步发送返回的结果:{}", sendResult);
    }

    @AfterEach
    public void waiting() {
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

这里需要注意的是,我发送JSON消息的时候,topic都是 json_message_topic这里Map消息也能被JSONObject消费。

3.5 发送Java对象消息

SendJavaObjectMessageTest.java

package com.alian.sync;

import com.alian.common.Member;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@SpringBootTest
public class SendJavaObjectMessageTest {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void sendJavaObjectMessage() {
        String topic = "java_object_message_topic";
        Member member = new Member();
        member.setId(10086L);
        member.setMemberName("Alian");
        member.setAge(28);
        member.setBirthday(new Date());
        SendResult sendResult = rocketMQTemplate.syncSend(topic, member);
        log.info("同步发送返回的结果:{}", sendResult);
    }

    @Test
    public void sendJavaObjectMessageWithBuilder() {
        String topic = "java_object_message_topic";
        Member member = new Member();
        member.setId(10086L);
        member.setMemberName("Alian");
        member.setAge(28);
        member.setBirthday(new Date());
        Message<Member> msg = MessageBuilder.withPayload(member)
                // 设置消息类型
                .setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
                .build();
        SendResult sendResult = rocketMQTemplate.syncSend(topic, msg);
        log.info("同步发送返回的结果:{}", sendResult);
    }

    @AfterEach
    public void waiting() {
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

这里需要注意的是,我发送java对象消息的时候,topic都是 java_object_message_topic

四、消费者

我们在父工程下新建一个模块用于发送同步消息。

4.1 Maven依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rocketmq</artifactId>
        <groupId>com.alian</groupId>
        <version>1.0.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>08-comsume-concurrent</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.alian</groupId>
            <artifactId>common-rocketmq-dto</artifactId>
            <version>1.0.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

4.2 application配置

application.properties

server.port=8008

# rocketmq地址
rocketmq.name-server=192.168.0.234:9876
# 默认的消费者组
rocketmq.consumer.group=CONCURRENT_CONSUMER_GROUP
# 批量拉取消息的数量
rocketmq.consumer.pull-batch-size=10
# 集群消费模式
rocketmq.consumer.message-model=CLUSTERING

实际上对于本文来说,下面两个配置不用配置,也不会生效。

# 默认的消费者组
rocketmq.consumer.group=CONCURRENT_CONSUMER_GROUP
# 集群消费模式
rocketmq.consumer.message-model=CLUSTERING

因为优先的是@RocketMQMessageListener 注解中设置 consumerGroup 和messageModel 参数。

4.3 消费字符串消息

StringMessageConsumer.java

package com.alian.concurrent;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RocketMQMessageListener(topic = "string_message_topic",  
						 consumerGroup = "CONCURRENT_GROUP_STRING",
						 consumeThreadNumber = 1)
public class StringMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("字符串消费者接收到的消息: {}", message);
        // 处理消息的业务逻辑
    }
}

如果要消费消息,我们需要实现RocketMQListener<T>,实现onMessage方法。发送的是什么对象,我们接收就是什么对象,也就是T是什么对象。生产者发送的字符串消息,我们这里就用String接收,也就是RocketMQListener<String>

package org.apache.rocketmq.spring.core;

public interface RocketMQListener<T> {
    void onMessage(T var1);
}

同时加上@RocketMQMessageListener注解,主要用到三个注解

需要注意的是:如果Topic不存在,只有在生产者发送消息时,并且autoCreateTopicEnable设置为true的情况下,Broker端才会自动创建该Topic。消费者启动时,即使autoCreateTopicEnable=true,也不会自动创建不存在的Topic。

具体来说:

  1. 生产者启动并发送消息到一个不存在的Topic时:
  1. 消费者启动订阅一个不存在的Topic时:

所以生产者发送消息时才可能自动创建Topic,而消费者启动时是不会自动创建Topic的。

4.4 消费JSON消息

JsonMessageConsumer.java

package com.alian.concurrent;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RocketMQMessageListener(topic = "json_message_topic", consumerGroup = "CONCURRENT_GROUP_JSON")
public class JsonMessageConsumer implements RocketMQListener<JSONObject> {

    @Override
    public void onMessage(JSONObject json) {
        log.info("json消费者接收到的消息: {}", json);
        // 处理消息的业务逻辑
    }
}

生产者发送的JSONObject消息,我们这里就用JSONObject接收,也就是RocketMQListener<JSONObject>

4.5 消费Java对象消息

JavaObjectMessageConsumer.java

package com.alian.concurrent;

import com.alian.common.Member;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RocketMQMessageListener(topic = "java_object_message_topic", consumerGroup = "CONCURRENT_GROUP_JAVA_OBJECT")
public class JavaObjectMessageConsumer implements RocketMQListener<Member> {

    @Override
    public void onMessage(Member member) {
        // 因为发序列化的原因Member必须是同一个包
        log.info("java对象消费者接收到的消息: {}", member);
        // 处理消息的业务逻辑
    }
}

生产者发送的Member消息,我们这里就用Member接收,也就是RocketMQListener<Member>。

这里需要再次说明下发送java对象消息时,因为反序列的原因,所以生产者和消费者使用的是公共包里同一个对象,也就是发送和接收的对象的包路径要一致。

五、部分运行结果

5.1、字符串消息

同步发送返回的结果:SendResult [sendStatus=SEND_OK, msgId=7F000001339418B4AAC22DB848C70000, offsetMsgId=C0A800EA00002A9F0000000000000000, messageQueue=MessageQueue [topic=string_message_topic, brokerName=broker-a, queueId=1], queueOffset=0]

字符串消费者接收到的消息: 我是一条同步文本消息:syncSendStringMessage

5.2、json消息

同步发送返回的结果:SendResult [sendStatus=SEND_OK, msgId=7F00000137FC18B4AAC22DB9D42F0000, offsetMsgId=C0A800EA00002A9F0000000000000146, messageQueue=MessageQueue [topic=json_message_topic, brokerName=broker-a, queueId=1], queueOffset=0]

json消费者接收到的消息: {"name":"Alian","age":"28","hobby":"java"}

5.3、java对象消息

同步发送返回的结果:SendResult [sendStatus=SEND_OK, msgId=7F000001098C18B4AAC22DBACEB50000, offsetMsgId=C0A800EA00002A9F0000000000000270, messageQueue=MessageQueue [topic=java_object_message_topic, brokerName=broker-a, queueId=0], queueOffset=0]

java对象消费者接收到的消息: Member(id=10086, memberName=Alian, age=28, birthday=Sat Mar 09 21:06:57 CST 2024)

5.4、现象说明

为什么在rocketmq中当autoCreateTopicEnable=true,先启动消费者,然后生产者第一次向一个未创建的topic中发送消息时,消息发送成功了(马上返回成功了),但是消费者要等一段时间才能收到?这种现象的原因是RocketMQ在创建Topic时,存在一个延迟同步的过程。具体来说:

这种延迟同步机制是RocketMQ的一个设计,目的是为了减小创建Topic时对消费者的影响,避免大量消费者同时更新元数据造成系统抖动。

我们可以通过调整Broker的配置项来控制这个延迟同步时间,比如:

通过调小topicDelayOffsetInterval可以缩短元数据同步延迟时间,但也会增加系统开销。

所以这种现象实际上是RocketMQ的一个正常设计行为,目的是为了系统整体的健壮性和可用性。如果应用对延迟时间不太敏感,保持默认配置即可;如果对延迟敏感,可以适当调小延迟同步时间。

以上就是SpringBoot整合RocketMQ实现发送同步消息的详细内容,更多关于SpringBoot RocketMQ发送同步消息的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文