java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Spring WebFlux之响应式编程

Spring WebFlux之响应式编程详解

作者:春哥的魔法书

这篇文章主要介绍了Spring WebFlux之响应式编程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

在软件开发领域,随着互联网应用的规模和复杂性不断增加,传统的编程模型逐渐暴露出一些局限性,尤其是在面对高并发、大规模数据流处理等场景时。

为了应对这些挑战,响应式编程(Reactive Programming)应运而生,它提供了一种更为高效、灵活的编程范式,以适应不断变化的系统需求。

1.Spring WebFlux 简介

WebFlux提供了一个非阻塞、异步的Web框架,允许开发者构建高性能、可伸缩的 Web 应用程序,特别适合处理大量并发连接,如在微服务架构和云环境中。

WebFlux是Spring Framework 5引入的一个重要组件,它代表了Spring对于响应式编程(Reactive Programming)的支持。

1.1.异步非阻塞

异步非阻塞是一种编程模式,它允许程序在等待某个操作完成时继续执行其他任务。这种模式是基于事件循环,可以在单个线程上处理多个I/O操作,大幅提高了系统的吞吐量和伸缩性。

1.2.响应式流(Reactive Streams)

响应式流是一个规范,它定义了异步流处理的接口和行为。

1.2.1.响应式编程

响应式编程是一种编程范式,是一种异步的、事件驱动的编程范式,它特别适用于构建能够处理实时数据流的应用程序。在这种模型中,数据和事件作为流进行处理,允许开发者以声明式的方式构建复杂的异步逻辑。

Spring WebFlux 遵循这一规范,使用 Publisher 作为响应式流的源头,Subscriber 作为流的消费者。这种模型允许开发者以声明式的方式处理异步数据流,同时保持了流的控制和背压管理。

1.2.2.背压管理

1.2.3.Project Reactor

响应式类型:Mono 和 Flux

操作符与响应式数据流

Project Reactor 提供了大量操作符,用于处理响应式流中的元素。这些操作符包括:

1.2.4.与传统Spring MVC的比较

Spring MVC是一个基于Servlet API的Web框架,它采用阻塞I/O模型,每个请求/响应对都与一个线程绑定。这在并发量较低时表现良好,但在高并发场景下,线程资源的消耗会急剧增加。

相比之下,Spring WebFlux基于响应式流,它不依赖于Servlet API,可以在如Netty、Undertow等非Servlet服务器上运行。这种模型使得WebFlux能够以非阻塞的方式处理并发请求,有效利用资源,提高性能。

1.3.函数式编程

函数式编程是一种编程范式 , 它强调的是将任务看作一系列可组合的函数调用。通过声明式的方式定义处理流程,让代码更简洁、易读,也更适合处理复杂的异步逻辑。WebFlux采用函数式编程范式,利用Lambda表达式简化了编程模型,路由和请求处理采用函数式编程的方式进行定义,这与传统的基于注解的控制器方法截然不同。

1.3.1.请求路由

使用 RouterFunction 定义请求路由

RouterFunction 是Spring WebFlux中用于定义请求路由的函数接口。通过实现 RouterFunction,可以精确控制请求的匹配和处理。

路由谓语和处理器

1.3.2.函数式端点

Spring WebFlux 还引入了函数式端点的概念,允许开发者以简单的函数形式处理请求和生成响应,这些函数通常返回 ServerResponse

1.3.3.函数式编程与响应式流

函数式编程是响应式编程模型的一个重要组成部分。它提倡使用无副作用的函数、不可变数据结构,并且推崇声明式编程。

这些原则与响应式流的概念相契合,响应式流强调数据流的声明式处理,以及在数据流中应用各种操作符来转换、过滤和组合数据。

2.Spring WebFlux 应用搭建

2.1 环境准备

项目依赖配置

基于Maven的Springboot项目,pom.xml文件中的依赖配置可能如下所示:

 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

2.2 定义路由与处理器

2.2.1.创建 RouterFunction Bean

在Spring WebFlux中,使用RouterFunction来定义请求的路由。

首先,创建一个配置类,并在其中定义路由:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Configuration
public class WebFluxConfig {

    @Bean
    public RouterFunction<ServerResponse> route(MyHandler handler) {
        
        return RouterFunctions.route(GET("/hello"), handler::hello);
    }
}
2.2.2.使用 HandlerFunction 处理请求

创建一个处理器类,它将包含处理请求的方法。

这些方法可以返回Mono<ServerResponse>Flux<ServerResponse>,这取决于它们处理的是单个响应还是响应流:

import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

@Component
public class MyHandler {

    public Mono<ServerResponse> hello(ServerRequest request) {
        String name = request.queryParam("name").orElse("World");
        String message = "Hello, " + name + "!";
        return ServerResponse.ok()
                .contentType(MediaType.TEXT_PLAIN)
                .body(Mono.just(message), String.class);
    }
}

hello的方法,用于处理HTTP请求并返回一个响应。

提取查询参数:

String name = request.queryParam("name").orElse("World");

构建HTTP响应的:

2.3 全局异常处理

全局异常处理是任何Web应用程序的重要部分

import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class GlobalExceptionHandler {

    public Mono<Void> handleException(ServerWebExchange exchange, Throwable ex) {
        ServerHttpResponse response = exchange.getResponse();
        response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
        DataBuffer buffer = response.bufferFactory().wrap("{\"error\": \"Internal Server Error\"}".getBytes());
        return response.writeWith(Mono.just(buffer));
    }
}

这段代码定义了一个全局异常处理器,用于在Spring WebFlux应用中捕获未处理的异常,并向客户端返回统一的错误响应。下面是各部分的详细说明:

@Order(Ordered.HIGHEST_PRECEDENCE)

handleException 方法

参数

功能

3.应用细节

3.1.RouterFunction

RouterFunction 是 Spring WebFlux 提供的一种用于定义和处理 HTTP 路由的功能性接口,它是构建响应式 Web 应用的基础组件之一。与传统的基于注解(如 @GetMapping, @PostMapping 等)的控制器相比,RouterFunction 提供了一种更为灵活和强大的方式来定义路由逻辑,特别适合函数式编程风格。

下面详细介绍其用法:

3.1.1. 基本概念

3.1.2. 创建 RouterFunction

创建 RouterFunction 通常涉及定义路由规则和处理逻辑。

以下是一个简单的示例,展示如何定义一个路由来处理 GET 请求:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
@Configuration
public class GreetingRouter {
    @Bean
    public  RouterFunction<ServerResponse> routingFunction() {
        return route(GET("/hello2"), request -> ok().bodyValue("Hello, Spring WebFlux!"));
    }
}

这段代码定义了一个简单的RouterFunction<ServerResponse>实例,用于处理一个HTTP GET请求到/hello路径的路由逻辑。

return route(GET("/hello"), request -> ok().bodyValue("Hello, Spring WebFlux!"));

routeRouterFunctions类中的一个静态方法,用于创建一个基础的路由定义。它接受两个参数:

request -> ...:Lambda表达式定义了如何根据请求生成响应。

ok():这是ServerResponse的静态工厂方法,用于创建一个表示成功(HTTP状态码200 OK)的响应。

.bodyValue("Hello, Spring WebFlux!"):此方法设置了响应体的内容为给定的字符串,并且指定了内容类型(默认情况下,如果没有显式指定,会根据内容推断)。

3.1.3. 注册 RouterFunction

要在 Spring Boot 应用中注册 RouterFunction,通常在配置类中声明为 @Bean,以便 Spring 自动发现并配置:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

@Configuration
public class WebConfig {

    @Bean
    public RouterFunction<ServerResponse> routes(GreetingHandler handler) {
        return RouterFunctions.route(GET("/hello"), handler::sayHello);
    }
}

其中,GreetingHandler 是一个包含业务逻辑的 HandlerFunction

3.1.4. 组合 RouterFunction

RouterFunction 可以组合起来形成复杂的路由结构。这使得路由配置更加模块化和可维护。

    @Bean
    public RouterFunction<ServerResponse> route(MyHandler handler) {
        return RouterFunctions.route(GET("/haha")
                .and(accept(MediaType.TEXT_PLAIN)), handler::haha)

                .andRoute(GET("/reactor")
                .and(accept(MediaType.APPLICATION_JSON)), handler::reactorExample);
    }
    // 处理简单的文本请求
    public Mono<ServerResponse> haha(ServerRequest request) {
        return ServerResponse.ok()
                .contentType(MediaType.TEXT_PLAIN)
                .bodyValue("haha!");
    }

    // 处理复杂的响应式请求
    public Mono<ServerResponse> reactorExample(ServerRequest request) {
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue("{'message': 'This is a reactive response!'}");
    }

3.1.5. 处理请求参数

RouterFunction 可以方便地处理请求参数,无论是路径参数、查询参数还是请求体。例如,处理带路径参数的请求:

public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
    return route(GET("/users/{id}"), handler::getUserById);
}

3.1.6. 错误处理

Spring WebFlux 支持全局或局部的错误处理,可以通过提供一个处理特定异常类型的 RouterFunction 实现:

3.1.6.1.自带异常处理

    @Bean
    public RouterFunction<ServerResponse> route(MyHandler handler) {
        return RouterFunctions.route()
                .GET("/greeting", handler::reactorExample)
                // 全局错误处理
                .onError(Exception.class, (exception, request) -> {
                    System.out.println("An exception occurred: " + exception.getMessage());
                    return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
                            .contentType(MediaType.TEXT_PLAIN)
                            .bodyValue("Oops! Something went wrong.");
                })
     
                .build();
    }

3.1.6.2.全局异常处理

    // 错误处理路由
    private RouterFunction<ServerResponse> errorRoute() {
        return RouterFunctions.route(RequestPredicates.all(),
                request -> ServerResponse.status(HttpStatus.BAD_REQUEST)
                        .contentType(MediaType.TEXT_PLAIN)
                        .bodyValue("这是一条用于未匹配请求的回退处理。"));
    }

RequestPredicates.all(),意味着这个谓词将匹配所有的HTTP请求,无论方法(GET、POST等)或路径是什么。

3.1.7. 高级用法

3.2.请求接参

Spring WebFlux 完全支持接收 RESTful 风格的传参。RESTful 风格的接口通常通过URL路径、查询参数、请求头以及请求体来传递参数。

在Spring WebFlux中,你可以使用函数式和注解两种方式来定义端点以接收这些参数。下面是几种常见的接收参数方式:

3.2.1. 路径变量(Path Variables)

在路由定义中,你可以使用 {variableName} 来标记路径变量,然后在处理器方法中通过 @PathVariable 注解接收它们。

@Bean
public RouterFunction<ServerResponse> userRoute(UserHandler handler) {
    return RouterFunctions.route(GET("/users/{id}"), handler::getUserById);
}

@Component
public class UserHandler {

    public Mono<ServerResponse> getUserById(ServerRequest request) {
        String id = request.pathVariable("id");
        // 处理逻辑...
    }
}

3.2.2. 查询参数(Query Parameters)

查询参数可以直接从 ServerRequest 中获取,或者使用 @RequestParam 注解。

public Mono<ServerResponse> searchUsers(ServerRequest request) {
    String keyword = request.queryParam("keyword").orElse("");
    // 处理逻辑...
}

或者使用 @RequestParam

public Mono<ServerResponse> searchUsers(@RequestParam(name = "keyword", defaultValue = "") String keyword) {
    // 处理逻辑...
}

3.2.3. 请求体(Request Body)

对于POST、PUT等方法,你可能需要从请求体中读取数据。可以使用 @RequestBody 注解,并指定相应的对象类型来自动绑定JSON或表单数据。

public Mono<ServerResponse> createUser(@RequestBody UserDTO user) {
    // 处理逻辑...
}

3.2.4. 请求头(Request Headers)

请求头可以通过 ServerRequest.headers() 获取,或者使用 @RequestHeader 注解。

public Mono<ServerResponse> handleRequestWithHeader(@RequestHeader("Authorization") String authHeader) {
    // 处理逻辑...
}

3.3.响应内容

3.3.1.ServerResponse对象

在Spring WebFlux中,响应的返回内容通常是通过ServerResponse对象来构建和管理的,它代表了即将发送给客户端的HTTP响应。

3.3.1.1. 状态码(Status Code)

每个HTTP响应都会有一个状态码,用于表示请求的处理结果。在Spring WebFlux中,你可以通过如下方式设置状态码:

3.3.1.2. 响应体(Body)

响应体是响应的主要内容,可以是文本、JSON、XML等各种格式的数据。构建响应体的方式有多种:

3.3.1.3. 内容类型(Content-Type)

通过.contentType(MediaType)方法指定响应的内容类型,如MediaType.APPLICATION_JSON_UTF8表示JSON格式,MediaType.TEXT_PLAIN表示纯文本等。

3.3.1.4. 头部(Headers)

可以在响应中添加自定义头部信息,如.header("X-Custom-Header", "value")

3.3.1.5. 构建响应

一旦定义好状态码、响应体、内容类型和头部等信息,通过.build()方法完成ServerResponse的构建。这一步是必要的,它将之前设置的所有配置整合成一个待发送的响应对象。

3.3.1.6.示例

假设我们要构建一个返回JSON格式数据的响应:

import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

public Mono<ServerResponse> getUserInfo(ServerRequest request) {
    // 假设getUserDetails()是一个Mono<User>,User是自定义的Java对象
    Mono<User> userDetails = userService.getUserDetails(request.pathVariable("id"));

    return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(userDetails, User.class) // 自动转换User对象为JSON
            .switchIfEmpty(ServerResponse.notFound().build()); // 如果找不到用户,返回404
}

在这个例子中,我们首先指定了响应的状态码为200 OK,内容类型为JSON,然后将从数据库查询到的用户详情(User对象)转换为JSON响应体。

如果查询结果为空(即用户不存在),则通过.switchIfEmpty()方法切换到返回404 Not Found的响应。

3.3.2.响应式类型

在响应式编程领域,尤其是在使用Spring WebFlux和Reactor框架时,MonoFlux是两个核心的响应式类型,它们都是Reactor库提供的对Reactive Streams规范的实现。这两种类型都实现了Publisher接口,这意味着它们可以作为异步数据流的生产者,在响应式系统中扮演着至关重要的角色。

3.3.2.1. Mono

1. map

2. flatMap

3. then

4. zipWith

Mono<String> monoStr = Mono.just("Hello");
Mono<Integer> monoLength = monoStr.map(String::length);
Mono<User> monoUser = ...; // 假设获取用户信息
Mono<Address> monoAddress = monoUser.flatMap(user -> getAddressForUser(user.getId()));
Mono<Void> saveUser = userRepository.save(newUser);
Mono<User> findUser = saveUser.then(userRepository.findById(newUser.getId()));
Mono<User> monoUser = ...;
Mono<Order> monoOrder = ...;
Mono<Tuple2<User, Order>> combined = monoUser.zipWith(monoOrder);

3.3.2.2. Flux

1. map

2. filter

3. flatMap

4. buffer

5. concatMap

Flux<String> names = Flux.fromIterable(Arrays.asList("Alice", "Bob", "Charlie"));
Flux<Integer> lengths = names.map(String::length);
Flux<String> names = ...;
Flux<String> longNames = names.filter(name -> name.length() > 5);
Flux<User> users = ...;
Flux<Order> orders = users.flatMap(user -> getOrderListForUser(user.getId()));
Flux<Integer> numbers = Flux.interval(Duration.ofMillis(100)).take(10);
Flux<List<Integer>> buffered = numbers.buffer(3); // 每3个元素打包一次

功能:类似于flatMap,但保证按源流的顺序依次处理每个元素产生的流。

Flux<User> users = ...;
Flux<Order> ordersSequential = users.concatMap(user -> getOrderListForUser(user.getId()));

3.3.3.3.共同特点

3.3.3.4.转换关系

MonoFlux之间可以通过操作符互相转换,例如,Mono可以通过flatMapMany转换为Flux,而多个MonoFlux可以通过concatmerge等操作合并。

4.WebClient

WebClient是Spring Framework 5引入的一个非阻塞、响应式的HTTP客户端,它属于Spring WebFlux模块的一部分。它设计用于构建高性能、异步的Web服务客户端,特别适合用于处理大量的并发请求和与响应式服务交互。下面是对WebClient的详细讲解:

核心特性

基本使用

// 最简单的创建方式
WebClient client = WebClient.create();

// 配置化创建
WebClient client = WebClient.builder()
        .baseUrl("http://example.com")
        .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
        .build();
Mono<String> response = client.get()
        .uri("/api/data")
        .retrieve() // 获取响应体
        .bodyToMono(String.class); // 将响应体转换为String类型
Mono<String> response = client.post()
        .uri("/api/data")
        .body(BodyInserters.fromObject(someObject)) // 发送对象
        .retrieve()
        .bodyToMono(String.class);

处理响应

错误处理

可以使用.onErrorResume等操作符来优雅地处理错误情况,例如重试逻辑或返回默认值。

Mono<String> withErrorHandling = client.get()
        .uri("/api/data")
        .retrieve()
        .onErrorResume(WebClientResponseException.class, e -> {
            // 处理错误,比如返回默认值
            return Mono.just("Default Value");
        })
        .bodyToMono(String.class);

总结

WebClient是一个强大且灵活的工具,它在现代Web应用和服务间通信中扮演着重要角色,特别是当需要构建高性能、可扩展的系统时。通过充分利用响应式编程的优势,开发者可以构建出更加高效、易维护的客户端逻辑。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文