java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > 微服务架构使用RabbitMQ进行异步处理

微服务架构之使用RabbitMQ进行异步处理方式

作者:记得开心一点嘛

本文介绍了RabbitMQ的基本概念、异步调用处理逻辑、RabbitMQ的基本使用方法以及在Spring Boot项目中使用RabbitMQ解决高并发问题,RabbitMQ是一种流行的消息队列实现,支持异步通信,可以有效解耦应用程序的不同部分,并将任务分发给多个消费者

一.什么是RabbitMQ?

RabbitMQ 是一种流行的消息队列(Message Queue)实现,基于 AMQP 协议(Advanced Message Queuing Protocol)。它支持异步通信,使多个系统之间以非阻塞的方式交换数据。

在我们使用微服务的时候,微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于 OpenFeign 的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用,也可以叫同步通

如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。

二.异步调用处理逻辑:

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

除此之外还有:

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。这样,发送消息的人和接收消息的人就完全解耦了。

异步调用的优势包括:

当然,异步通信也并非完美无缺,它存在下列缺点:

三.RabbitMQ的基本使用

下面是RabbitMQ的官网:https://www.rabbitmq.com/

1.安装

首先将RabbitMQ的镜像拉取下来,然后运行下面命令:

docker run \
 -e RABBITMQ_DEFAULT_USER=hmall \
 -e RABBITMQ_DEFAULT_PASS=123 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 --network hm-net\
 -d \
 rabbitmq:3.8-management

docker run 命令中,您将容器的两个端口暴露到主机上:

随后我们访问http://虚拟机IP地址:15672来打开RabbitMQ的控制台,默认账号密码是 guest / guest。

在控制台上主要可以关注三个信息:Exchanges(交换机),Queues(队列),Admin(用户管理)。

2.架构图

其中包含几个概念:

RabbitMQ 使用 AMQP 协议,核心的消息模型包括:

  1. Producer 将消息发送到 Exchange
  2. Exchange 将消息路由到 Queue
  3. ConsumerQueue 获取消息进行处理。

RabbitMQ 消息生命周期

  1. 消息发布:Producer 将消息发送到 RabbitMQ。
  2. 消息存储:消息通过交换机路由到一个或多个队列,队列暂存这些消息。
  3. 消息消费:Consumer 从队列中获取并处理消息。

3.RabbitMQ控制台的使用

RabbitMQ 中,交换机(Exchange)队列(Queue) 是核心概念。它们之间的关系决定了消息的路由和存储方式。

(1)Exchanges 交换机

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

RabbitMQ 提供了四种常用的交换机类型,每种类型的路由规则不同:

我们可以再这里创建交换机,Name表示创建的交换机的名字,Type表示可以选择交换机的四种类型。创建成功后就可以在上面看到创建的交换机名字:

比如我们点击amq.fanout查看交换机数据并且可以发送消息给消费者。

注意!!!如果我们不将交换机指定队列的话,由于没有消费者存在,最终消息丢失了,这样说明交换机没有存储消息的能力

所以下面我们要先创建队列,然后让生产者推送的消息经过交换机的传递后,到达消息队列,然后再给消费者。所以生产者无需知道队列的存在以此来达到解耦的效果

(2)Queues 队列

交换机与队列的关系:

交换机(Exchange)是消息的路由器,它决定了将消息发送到哪个队列。队列(Queue)是消息的存储和处理地方。交换机本身并不知道具体的队列,它只是通过绑定(Binding)决定消息应该被路由到哪些队列。

交换机将消息通过路由键(Routing Key)发送到绑定的队列,但交换机和队列之间的连接并不是自动的,需要显式地设置绑定。绑定指定了 交换机队列 之间的关系,以及 路由规则(例如,路由键匹配的规则)。

在这里我们填写队列名字即可,其他暂时可以不用填写。

随后我们向交换机进行绑定(bind)队列,随后通过队列传输给消费者。

这里的Routing key的出现是为了让 Direct (交换机的类型)能够选择队列而存在的。

我们在绑定队列完成后会出现下面这样,这样证明我们成功为交换机绑定好两个队列:

随后我们在下面窗口推送消息:

(3)Admin

①Users(用户管理):

管理 RabbitMQ 中的用户账号,在这里 添加、删除用户,并设置每个用户的权限。

每个用户可分配不同的 角色

②Virtual Hosts(虚拟主机):

将 RabbitMQ 服务器划分为多个 虚拟主机(vhost),类似于一个独立的命名空间。

四.SpringAMOP的使用

Spring AMQPSpring for Advanced Message Queuing Protocol)是 Spring 提供的一个消息队列集成模块,主要用于简化与 RabbitMQ 的集成。它通过 AMQP 协议来实现消息的生产和消费。

1.导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.添加配置

publisher以及consumer服务的application.yml中添加配置:

spring:
  rabbitmq:
    host: 192.168.150.101 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码
    listener:
      simple:
        prefetch: 1 # (能者多劳)每次只能获取一条消息,处理完成才能获取下一个消息

5672 端口用于 AMQP 协议,通常用于客户端与 RabbitMQ 进行消息传输。,所以这里port绑定的是5672。

在 RabbitMQ 中,guest 用户默认只能在 本地 连接到 RabbitMQ 实例。如果你希望使用 guest 用户进行远程连接(即从非本地机器连接 RabbitMQ),RabbitMQ 默认是 不允许 的。

3.在publisher服务中利用RabbitTemplate实现消息发送

RabbitTemplate 解释:

RabbitTemplate 是 Spring AMQP 的核心实现类,它实现了 AmqpTemplate 接口,并且对 AmqpTemplate 提供了一些更高层次的封装,简化了消息发送和接收的操作。

RabbitTemplate 是大多数 Spring AMQP 用户的首选,它提供了很多便捷的方法和默认的行为,使得消息交互变得更加简单。

主要方法:

RabbitTemplate 为发送消息提供了更丰富的功能,如消息转换器、默认交换机支持等,通常适用于大多数使用 Spring 的场景。

(1) 发送消息(convertAndSend)

rabbitTemplate.convertAndSend(String exchange, String routingKey, Object message);

此方法的作用是将消息发送到指定交换机,并且根据给定的路由键路由到相关队列。

实例:

rabbitTemplate.convertAndSend("pay.topic", "pay.success", "支付成功的消息");

这里消息 "支付成功的消息" 被发送到 pay.topic 交换机,使用 pay.success 路由键路由到对应的队列。

(2)接收消息(receiveAndConvert)

public <T> T receiveAndConvert(String queueName);

receiveAndConvert 会从指定的队列接收消息,并自动将消息转换成目标对象类型。如果消息体不是可转换的,方法将抛出异常。

String message = (String) rabbitTemplate.receiveAndConvert("pay.success.queue");
System.out.println("收到的消息: " + message);

这里,pay.success.queue 队列中的消息将被接收,并自动转换为 String 类型。

还有一种就是 receive 方法:

public Message receive(String queueName);

实例:

Message message = rabbitTemplate.receive("pay.success.queue");
if (message != null) {
    System.out.println("收到的消息: " + new String(message.getBody()));
}

(3)发送消息并获取响应(convertSendAndReceive)

public <T> T convertSendAndReceive(String exchange, String routingKey, Object message);

convertSendAndReceive 方法既发送消息到交换机,也等待从队列返回响应。它会根据指定的路由键将消息发送到交换机,并等待响应消息,然后将响应转换为返回类型。

实例:

String response = (String) rabbitTemplate.convertSendAndReceive("pay.topic", "pay.success", "请求处理");
System.out.println("收到响应: " + response);

(4)发送消息到特定队列

public void send(String exchange, String routingKey, Message message);

send 方法允许你将一个 Message 对象发送到指定的交换机,并根据路由键进行路由。

Message message = new Message("支付成功".getBytes());
rabbitTemplate.send("pay.topic", "pay.success", message);

(5)设置消息监听器

RabbitTemplate 本身并不直接处理消息监听(接收消息),但是可以通过设置 RabbitListener 来监听消息,并将其与 RabbitTemplate 配合使用。一般来说,消息接收和处理是在 @RabbitListener 注解的监听器方法中完成的。

@RabbitListener(queues = "pay.success.queue")
public void receiveMessage(String message) {
    System.out.println("接收到消息: " + message);
}

下面看一个代码实例:

@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

4.定义消费者实现异步调用

@Component
@RequiredArgsConstructor
public class PayStatusListener {

    private final IOrderService orderService;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "trade.pay.success.queue", durable = "true"),
            exchange = @Exchange(name = "pay.topic"),
            key = "pay.success"
    ))
    public void listenPaySuccess(Long orderId){
        //调用方法
        orderService.markOrderPaySuccess(orderId);
    }
}

①@RequiredArgsConstructor

这是 Lombok 提供的注解,自动为类中所有 final 修饰的字段生成一个包含这些字段的构造函数。

使用这个注解可以免去手动编写构造函数的麻烦,尤其是在使用依赖注入时(例如注入 IOrderService)。

②@RabbitListener

@RabbitListener 注解用于监听来自 RabbitMQ 队列的消息。

它会自动监听指定的队列,当有消息到达时,会触发 listenPaySuccess 方法进行处理。

③QueueBinding(队列绑定)

通过 @QueueBinding 注解,绑定了 队列交换机,并指定了 路由键

@Queue

@Exchange

key = "pay.success"

5.总流程处理过程

五.使用配置类管理定义交换机,队列及两者关系

在 Spring AMQP 中,交换机(Exchange)、队列(Queue)、以及绑定(Binding)可以通过配置类来定义和管理。配置类可以帮助你灵活地创建和绑定交换机与队列,并且可以根据业务需求自定义各种参数。

创建配置类效果展示:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // 创建队列
    @Bean
    public Queue queue() {
        return new Queue("trade.pay.success.queue", true); // durable=true 表示队列持久化
    }

    // 创建交换机
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("pay.topic"); // 创建主题交换机
    }

    // 创建绑定关系(队列与交换机通过 routing key 绑定)
    @Bean
    public Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("pay.success"); // 路由键是 pay.success
    }
}

1.创建队列:Queue

@Bean
public Queue queue() {
    return new Queue("trade.pay.success.queue", true);
}

作用:通过 @Bean 注解定义了一个队列 Bean。Spring 容器会自动管理这个队列,并在 RabbitMQ 上创建该队列。

参数

2.创建交换机:Exchange

@Bean
public TopicExchange exchange() {
    return new TopicExchange("pay.topic");
}

Topic Exchange 是一种交换机类型,它允许使用通配符来进行路由。例如,路由键可以是 "pay.*",可以匹配 "pay.success""pay.failure"。在这里可以使用四种交换机类型来定义交换机,具体场景具体分析使用。

3.创建绑定关系:Binding

@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with("pay.success");
}

作用:通过 @Bean 注解定义了队列和交换机的绑定关系。这个绑定决定了消息在何种条件下会从交换机路由到队列。

参数

4.多个队列绑定到同一个交换机

我们可以将多个队列绑定到同一个交换机,并使用不同的路由键。这样可以实现根据不同的路由键来发送不同类型的消息到各自的队列。

@Bean
public Queue paySuccessQueue() {
    return new Queue("pay.success.queue", true);
}

@Bean
public Queue payFailureQueue() {
    return new Queue("pay.failure.queue", true);
}

@Bean
public Binding paySuccessBinding(Queue paySuccessQueue, TopicExchange exchange) {
    return BindingBuilder.bind(paySuccessQueue).to(exchange).with("pay.success");
}

@Bean
public Binding payFailureBinding(Queue payFailureQueue, TopicExchange exchange) {
    return BindingBuilder.bind(payFailureQueue).to(exchange).with("pay.failure");
}

在 Spring AMQP 中,队列对象(如 paySuccessQueue)是由 @Bean 注解的 Queue 类型方法返回的。Spring 会自动将返回的队列对象放入到 Spring 容器中,并注入到需要它的地方。所以当我们在 Binding 中使用 paySuccessQueue 时,实际上是在引用之前构造并注册到 Spring 容器中的队列实例。

在 Spring 的上下文中,paySuccessQueuepayFailureQueue 是已经被创建并管理的队列对象,我们不需要手动创建队列,只需要在 Binding 中通过引用这些对象来建立队列与交换机之间的关系。

5.配置不同类型的交换机

除了 Topic Exchange,RabbitMQ 还支持其他几种常见的交换机类型。这里分别演示如何创建 Direct ExchangeFanout ExchangeHeaders Exchange

(1)Direct Exchange

@Bean
public DirectExchange directExchange() {
    return new DirectExchange("direct.exchange");
}

@Bean
public Binding directBinding(Queue queue, DirectExchange directExchange) {
    return BindingBuilder.bind(queue).to(directExchange).with("direct.routing.key");
}

Direct Exchange:直接交换机会根据 完全匹配的路由键 将消息发送到队列。只有当消息的路由键和绑定的路由键 完全一致 时,消息才会被路由到指定队列。

(2)Fanout Exchange

@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanout.exchange");
}

@Bean
public Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(queue).to(fanoutExchange);  // 不需要路由键
}

Fanout Exchange:扇出交换机会将消息发送到所有绑定的队列,不需要考虑路由键。这个交换机通常用于广播消息。

(3)Headers Exchange

@Bean
public HeadersExchange headersExchange() {
    return new HeadersExchange("headers.exchange");
}

@Bean
public Binding headersBinding(Queue queue, HeadersExchange headersExchange) {
    return BindingBuilder.bind(queue).to(headersExchange).where("header-key").matches("header-value");
}

Headers Exchange:头交换机根据消息头的内容进行路由,而不是依赖路由键。适用于按消息的元数据进行路由的场景。

总结:

通过 Spring AMQP 的配置类,你可以非常灵活地定义 RabbitMQ 的 交换机队列绑定关系,并通过不同的路由键和交换机类型实现复杂的消息路由逻辑。

以下是一些关键要点:

  1. 队列(Queue):消息的临时存储地,可以是持久化的。
  2. 交换机(Exchange):控制消息如何分发到不同的队列。
    • Direct Exchange:严格匹配路由键。
    • Topic Exchange:支持通配符匹配路由键。
    • Fanout Exchange:广播消息到所有绑定的队列。
    • Headers Exchange:根据消息头的内容进行路由。
  3. 绑定(Binding):将队列与交换机连接起来,使用路由键来决定消息的流向。

通过配置类来定义这些组件,能够简化 RabbitMQ 与 Spring 应用的集成,并且通过灵活的路由规则支持复杂的消息传递需求。

六.在Springboot项目中使用RabbitMQ解决高并发问题

在 Spring Boot 项目中使用 RabbitMQ 来实现消息传输,处理并发问题是非常常见的一种方式。RabbitMQ 可以通过解耦应用程序的不同部分,并将任务分发给多个消费者,从而有效地解决并发和负载均衡问题。

1.引入依赖

首先,需要在 pom.xml 文件中引入 spring-boot-starter-amqp 依赖,这样 Spring Boot 就可以与 RabbitMQ 集成:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

2.配置RabbitMQ

application.ymlapplication.properties 文件中,配置 RabbitMQ 的连接信息:

spring:
  rabbitmq:
    host: localhost
    port: 15672
    username: hmall
    password: 123
    virtual-host: /
    listener:
      simple:
        concurrency: 3  # 设置消费者的最小数量
        max-concurrency: 10  # 设置消费者的最大数量

3.RabbitMQ配置类

创建一个配置类,用于声明队列、交换机、绑定等。

package com.example.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // 声明队列
    @Bean
    public Queue taskQueue() {
        return new Queue("taskQueue", true);  // true表示队列持久化
    }

    // 声明交换机
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("taskExchange");
    }

    // 绑定队列和交换机
    @Bean
    public Binding binding(Queue taskQueue, TopicExchange topicExchange) {
        return new Binding("taskQueue",
                Binding.DestinationType.QUEUE,
                "taskExchange",
                "task.#", // 使用路由键
                null);
    }
}

4.创建消息生产者(Producer)

创建一个消息生产者(Producer),它将消息发送到 RabbitMQ 队列。

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TaskProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 发送消息
    public void sendMessage(String message) {
        System.out.println("Sending message: " + message);
        rabbitTemplate.convertAndSend("taskExchange", "task.new", message);  // 发送到交换机,使用路由键
    }
}

5.创建消息消费者(Consumer)

消费者从 RabbitMQ 队列中异步接收消息,并进行并发处理。在消费者类中,使用 @RabbitListener 注解监听队列,确保多个消费者可以同时处理消息。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TaskConsumer {

    // 监听队列并异步处理消息
    @RabbitListener(queues = "taskQueue")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
        
        // 模拟处理消息的业务逻辑
        try {
            Thread.sleep(1000);  // 模拟耗时操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Finished processing message: " + message);
    }
}

6.增加并发消费者(多个消费者)

为了处理高并发,我们可以通过设置 concurrencymax-concurrency 来控制消费者的最小和最大并发数。这样,多个消费者可以同时处理来自队列的消息。我们已经在 application.yml 中配置了 concurrency,现在的配置允许最多启动 20 个消费者来处理消息。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文