java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Spring Boot 消息队列

Spring Boot 消息队列与异步处理的应用小结

作者:星辰徐哥

这篇文章给大家介绍了Spring Boot 消息队列与异步处理的应用小结,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧

Spring Boot 消息队列与异步处理

36.1 学习目标与重点提示

学习目标:掌握Spring Boot消息队列与异步处理的核心概念与使用方法,包括消息队列的定义与特点、异步处理的定义与特点、Spring Boot与消息队列的集成、Spring Boot与异步处理的集成、Spring Boot的实际应用场景,学会在实际开发中处理消息队列与异步处理问题。
重点:消息队列的定义与特点异步处理的定义与特点Spring Boot与消息队列的集成Spring Boot与异步处理的集成Spring Boot的实际应用场景

36.2 消息队列与异步处理概述

消息队列与异步处理是Java开发中的重要组件,用于实现系统的异步处理和消息传递。

36.2.1 消息队列的定义

定义:消息队列是一种用于存储和传递消息的中间件,支持异步通信和消息处理。
作用

常见的消息队列

✅ 结论:消息队列是一种用于存储和传递消息的中间件,作用是实现系统的异步处理、提高系统的响应速度、实现系统的解耦。

36.2.2 异步处理的定义

定义:异步处理是指系统在处理请求时,不需要等待请求完成就可以继续处理其他请求。
作用

常见的异步处理

✅ 结论:异步处理是指系统在处理请求时,不需要等待请求完成就可以继续处理其他请求,作用是提高系统的响应速度、提高系统的吞吐量、实现系统的解耦。

36.3 Spring Boot与消息队列的集成

Spring Boot与消息队列的集成是Java开发中的重要内容。

36.3.1 集成RabbitMQ的步骤

定义:集成RabbitMQ的步骤是指使用Spring Boot与RabbitMQ集成的方法。
步骤

  1. 创建Spring Boot项目。
  2. 添加所需的依赖。
  3. 配置RabbitMQ。
  4. 创建消息发送类。
  5. 创建消息接收类。
  6. 创建控制器类。
  7. 测试应用。

示例
pom.xml文件中的依赖:

<dependencies>
    <!-- Web依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- 消息队列依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!-- 数据验证依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-validation</artifactId>
    </dependency>
    <!-- 测试依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

application.properties文件中的配置:

# 服务器端口
server.port=8080
# RabbitMQ配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

消息发送类:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class ProductMessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendProductMessage(String message) {
        rabbitTemplate.convertAndSend("product-exchange", "product-routing-key", message);
        System.out.println("发送产品消息:" + message);
    }
}

消息接收类:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ProductMessageReceiver {
    @RabbitListener(queues = "product-queue")
    public void receiveProductMessage(String message) {
        System.out.println("接收产品消息:" + message);
        // 处理消息
        processProductMessage(message);
    }
    private void processProductMessage(String message) {
        // 模拟处理消息的耗时操作
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("处理产品消息完成:" + message);
    }
}

配置类:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue productQueue() {
        return new Queue("product-queue", true);
    }
    @Bean
    public DirectExchange productExchange() {
        return new DirectExchange("product-exchange");
    }
    @Bean
    public Binding productBinding(Queue productQueue, DirectExchange productExchange) {
        return BindingBuilder.bind(productQueue).to(productExchange).with("product-routing-key");
    }
}

控制器类:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.BindingResult;
import org.springframework.validation.FieldError;
import org.springframework.web.bind.annotation.*;
import javax.validation.Valid;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/api/products")
public class ProductController {
    @Autowired
    private ProductMessageSender productMessageSender;
    @PostMapping("/send")
    public Map<String, Object> sendProductMessage(@RequestParam String message) {
        Map<String, Object> result = new HashMap<>();
        productMessageSender.sendProductMessage(message);
        result.put("success", true);
        result.put("message", "产品消息发送成功");
        return result;
    }
}

应用启动类:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitMQApplication {
    public static void main(String[] args) {
        SpringApplication.run(RabbitMQApplication.class, args);
    }
}

测试类:

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.http.*;
import java.util.HashMap;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class RabbitMQApplicationTests {
    @LocalServerPort
    private int port;
    @Autowired
    private TestRestTemplate restTemplate;
    @Test
    void contextLoads() {
    }
    @Test
    void testSendProductMessage() {
        ResponseEntity<Map> response = restTemplate.getForEntity("http://localhost:" + port + "/api/products/send?message=测试产品消息", Map.class);
        assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
        assertThat(response.getBody().get("success")).isEqualTo(true);
    }
}

✅ 结论:集成RabbitMQ的步骤包括创建Spring Boot项目、添加所需的依赖、配置RabbitMQ、创建消息发送类、创建消息接收类、创建控制器类、测试应用。

36.4 Spring Boot与异步处理的集成

Spring Boot与异步处理的集成是Java开发中的重要内容。

36.4.1 集成Spring Boot异步处理的步骤

定义:集成Spring Boot异步处理的步骤是指使用Spring Boot与异步处理集成的方法。
步骤

  1. 创建Spring Boot项目。
  2. 添加所需的依赖。
  3. 配置异步处理。
  4. 创建异步任务类。
  5. 创建控制器类。
  6. 测试应用。

示例
pom.xml文件中的依赖:

<dependencies>
    <!-- Web依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- 异步处理依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-task</artifactId>
    </dependency>
    <!-- 数据验证依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-validation</artifactId>
    </dependency>
    <!-- 测试依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

application.properties文件中的配置:

# 服务器端口
server.port=8080
# 异步处理配置
spring.task.execution.pool.core-size=5
spring.task.execution.pool.max-size=10
spring.task.execution.pool.queue-capacity=100

异步任务类:

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class ProductAsyncTask {
    @Async
    public void processProduct(String productId) {
        System.out.println("开始处理产品:" + productId);
        // 模拟处理产品的耗时操作
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("产品处理完成:" + productId);
    }
}

控制器类:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.BindingResult;
import org.springframework.validation.FieldError;
import org.springframework.web.bind.annotation.*;
import javax.validation.Valid;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/api/products")
public class ProductController {
    @Autowired
    private ProductAsyncTask productAsyncTask;
    @PostMapping("/process")
    public Map<String, Object> processProduct(@RequestParam String productId) {
        Map<String, Object> result = new HashMap<>();
        productAsyncTask.processProduct(productId);
        result.put("success", true);
        result.put("message", "产品处理任务已提交");
        return result;
    }
}

应用启动类:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableAsync
public class AsyncApplication {
    public static void main(String[] args) {
        SpringApplication.run(AsyncApplication.class, args);
    }
}

测试类:

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.http.*;
import java.util.HashMap;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class AsyncApplicationTests {
    @LocalServerPort
    private int port;
    @Autowired
    private TestRestTemplate restTemplate;
    @Test
    void contextLoads() {
    }
    @Test
    void testProcessProduct() {
        ResponseEntity<Map> response = restTemplate.getForEntity("http://localhost:" + port + "/api/products/process?productId=P001", Map.class);
        assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
        assertThat(response.getBody().get("success")).isEqualTo(true);
    }
}

✅ 结论:集成Spring Boot异步处理的步骤包括创建Spring Boot项目、添加所需的依赖、配置异步处理、创建异步任务类、创建控制器类、测试应用。

36.5 Spring Boot的实际应用场景

在实际开发中,Spring Boot消息队列与异步处理的应用场景非常广泛,如:

示例

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
class ProductMessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendProductMessage(String message) {
        rabbitTemplate.convertAndSend("product-exchange", "product-routing-key", message);
        System.out.println("发送产品消息:" + message);
    }
}
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
class ProductMessageReceiver {
    @RabbitListener(queues = "product-queue")
    public void receiveProductMessage(String message) {
        System.out.println("接收产品消息:" + message);
        processProductMessage(message);
    }
    private void processProductMessage(String message) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("处理产品消息完成:" + message);
    }
}
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
class RabbitMQConfig {
    @Bean
    public Queue productQueue() {
        return new Queue("product-queue", true);
    }
    @Bean
    public DirectExchange productExchange() {
        return new DirectExchange("product-exchange");
    }
    @Bean
    public Binding productBinding(Queue productQueue, DirectExchange productExchange) {
        return BindingBuilder.bind(productQueue).to(productExchange).with("product-routing-key");
    }
}
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
class ProductAsyncTask {
    @Async
    public void processProduct(String productId) {
        System.out.println("开始处理产品:" + productId);
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("产品处理完成:" + productId);
    }
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.BindingResult;
import org.springframework.validation.FieldError;
import org.springframework.web.bind.annotation.*;
import javax.validation.Valid;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/api/products")
class ProductController {
    @Autowired
    private ProductMessageSender productMessageSender;
    @Autowired
    private ProductAsyncTask productAsyncTask;
    @PostMapping("/send")
    public Map<String, Object> sendProductMessage(@RequestParam String message) {
        Map<String, Object> result = new HashMap<>();
        productMessageSender.sendProductMessage(message);
        result.put("success", true);
        result.put("message", "产品消息发送成功");
        return result;
    }
    @PostMapping("/process")
    public Map<String, Object> processProduct(@RequestParam String productId) {
        Map<String, Object> result = new HashMap<>();
        productAsyncTask.processProduct(productId);
        result.put("success", true);
        result.put("message", "产品处理任务已提交");
        return result;
    }
}
@SpringBootApplication
@EnableAsync
public class AsyncAndRabbitMQApplication {
    public static void main(String[] args) {
        SpringApplication.run(AsyncAndRabbitMQApplication.class, args);
    }
}
// 测试类
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class AsyncAndRabbitMQApplicationTests {
    @LocalServerPort
    private int port;
    @Autowired
    private TestRestTemplate restTemplate;
    @Test
    void contextLoads() {
    }
    @Test
    void testSendProductMessage() {
        ResponseEntity<Map> response = restTemplate.getForEntity("http://localhost:" + port + "/api/products/send?message=测试产品消息", Map.class);
        assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
        assertThat(response.getBody().get("success")).isEqualTo(true);
    }
    @Test
    void testProcessProduct() {
        ResponseEntity<Map> response = restTemplate.getForEntity("http://localhost:" + port + "/api/products/process?productId=P001", Map.class);
        assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
        assertThat(response.getBody().get("success")).isEqualTo(true);
    }
}

输出结果

✅ 结论:在实际开发中,Spring Boot消息队列与异步处理的应用场景非常广泛,需要根据实际问题选择合适的消息队列和异步处理方法。

总结

本章我们学习了Spring Boot消息队列与异步处理,包括消息队列的定义与特点、异步处理的定义与特点、Spring Boot与消息队列的集成、Spring Boot与异步处理的集成、Spring Boot的实际应用场景,学会了在实际开发中处理消息队列与异步处理问题。其中,消息队列的定义与特点、异步处理的定义与特点、Spring Boot与消息队列的集成、Spring Boot与异步处理的集成、Spring Boot的实际应用场景是本章的重点内容。从下一章开始,我们将学习Spring Boot的其他组件、微服务等内容。

到此这篇关于Spring Boot 消息队列与异步处理的文章就介绍到这了,更多相关Spring Boot 消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文