Spring WebFlux使用函数式编程模型构建异步非阻塞服务
作者:JavaEdge.
1 前言
上文引入了 Spring 框架中专门用于构建响应式 Web 服务的 WebFlux 框架,同时我也给出了两种创建 RESTful 风格 HTTP 端点实现方法中的一种,即注解编程模型。
本文介绍另一种实现方法——如何使用函数式编程模型创建响应式 RESTful 服务,这种编程模型与传统的基于 Spring MVC 构建 RESTful 服务的方法有较大差别。
2 WebFlux 函数式编程模型
回顾Spring WebFlux系统架构图:
图后半部分,Spring WebFlux 中,函数式编程模型的核心概念Router Functions,对标 Spring MVC 的 @Controller、@RequestMapping 等标准注解。而 Router Functions 则提供一套函数式风格API,最重要的就是 Router、Handler 接口。可简单将:
- Router 对应成 RequestMapping
- Controller 对应为 Handler
当我发起一个远程调用,传入的 HTTP 请求由 HandlerFunction 处理, HandlerFunction 本质上是一个接收 ServerRequest 并返回 Mono 的函数。ServerRequest 和 ServerResponse 是一对不可变接口,用来提供对底层 HTTP 消息的友好访问。
3 ServerRequest
代表请求对象,可访问各种 HTTP 请求元素,包括请求方法、URI 和参数,以及通过单独的 ServerRequest.Headers 获取 HTTP 请求头信息。
ServerRequest 通过一系列 bodyToMono() 和 bodyToFlux() 方法提供对请求消息体进行访问的途径。例如,如果我们希望将请求消息体提取为 Mono 类型的对象,可以使用如下方法。
Mono<String> string = request.bodyToMono(String.class);
而如果我们希望将请求消息体提取为 Flux 类型的对象,可以使用如下方法,其中 Order 是可以从请求消息体反序列化的实体类。
Flux<Order> order = request.bodyToFlux(Order.class);
上述的 bodyToMono() 和 bodyToFlux() 两个方法实际上是通用的 ServerRequest.body(BodyExtractor) 工具方法的快捷方式,该方法如下所示。
<T> T body(BodyExtractor<T, ? super ServerHttpRequest> extractor);
BodyExtractor 是一种请求消息体的提取器,允许我们编写自己的提取逻辑。请注意 BodyExtractor 提取的对象是一个 ServerHttpRequest 类型的实例,而这个 ServerHttpRequest 是非阻塞的,与之对应的还有一个 ServerHttpResponse 对象。
响应式 Web 操作的正是这组非阻塞的:
- ServerHttpRequest
- ServerHttpResponse
而不再是 Spring MVC 里的传统:
- HttpServletRequest
- HttpServletResponse
若不需要实现定制化的提取逻辑,可使用框架提供的常用的 BodyExtractors 实例。通过 BodyExtractors,上例可替换为。
Mono<String> string = request.body(BodyExtractors.toMono(String.class); Flux<Person> Order= request.body(BodyExtractors.toFlux(Order.class);
ServerResponse
与ServerRequest 对应,ServerResponse 提供对 HTTP 响应的访问。由于不可变,因此可用构建器创建一个新 ServerResponse。
构建器允许设置响应状态、添加响应标题并提供响应的具体内容。如下示例演示如何通过 ok() 方法创建代表 200 状态码的响应,其中我将响应体的类型设置为 JSON 格式,响应具体内容是 Mono 对象。
Mono<Order> order = …; ServerResponse.ok().contentType(MediaType.APPLICATION_JSON) .body(order);
通过 body() 方法来加载响应内容是构建 ServerResponse 最常见的方法,这里我们将 Order 对象作为返回值。如果想要返回各种类型的对象,我们也可以使用 BodyInserters 工具类所提供的构建方法,如常见的 fromObject() 和 fromPublisher() 方法等。以下示例代码中,我们通过 fromObject() 方法直接返回一个 “Hello World”。
ServerResponse.ok().body(BodyInserters.fromObject("Hello World"));
上述方法的背后实际上是利用 BodyBuilder 接口中的一组 body() 方法,来构建一个 ServerResponse 对象,典型的 body() 方法如下所示。
Mono<ServerResponse> body(BodyInserter<?, ? super ServerHttpResponse> inserter);
这里我们同样看到了非阻塞式的 ServerHttpResponse 对象。这种 body() 方法比较常见的用法是返回新增和更新操作的结果,你在本讲后续的内容中将会看到这种使用方法。
HandlerFunction
将 ServerRequest 和 ServerResponse 组合在一起就可创建 HandlerFunction。
public interface HandlerFunction<T extends ServerResponse> { Mono<T> handle(ServerRequest request); }
可通过实现 HandlerFunction#handle() 创建定制化的请求响应处理机制。
简单的“Hello World”处理函数。
public class HelloWorldHandlerFunction implements HandlerFunction<ServerResponse> { @Override public Mono<ServerResponse> handle(ServerRequest request) { return ServerResponse.ok().body( BodyInserters.fromObject("Hello World")); } };
这里使用了前面介绍的 ServerResponse 所提供的 body() 方法返回一个 String 类型的消息体。
通常,针对某个领域实体都存在 CRUD 等常见的操作,所以需要编写多个类似的处理函数,烦琐。推荐将多个处理函数分组到一个专门 Handler 类。
RouterFunction
已可通过 HandlerFunction 创建请求的处理逻辑,接下来需要把具体请求与这种处理逻辑关联起来,RouterFunction 可以帮助我们实现这一目标。RouterFunction 与传统 SpringMVC 中的 @RequestMapping 注解功能类似。
创建 RouterFunction 最常见做法是使用 route 方法,该方法通过使用请求谓词和处理函数创建一个 ServerResponse 对象。
public static <T extends ServerResponse> RouterFunction<T> route( RequestPredicate predicate, HandlerFunction<T> handlerFunction) { return new DefaultRouterFunction<>(predicate, handlerFunction); }
RouterFunction 的核心逻辑位于这里的 DefaultRouterFunction 类中,该类的 route() 方法如下所示。
public Mono<HandlerFunction<T>> route(ServerRequest request) { if (this.predicate.test(request)) { if (logger.isTraceEnabled()) { String logPrefix = request.exchange().getLogPrefix(); logger.trace(logPrefix + String.format("Matched %s", this.predicate)); } return Mono.just(this.handlerFunction); } else { return Mono.empty(); } }
该方法将传入的 ServerRequest 路由到具体的处理函数 HandlerFunction。如果请求与特定路由匹配,则返回处理函数的结果,否则就返回空Mono。
RequestPredicates 工具类提供了常用的谓词,能够实现包括基于路径、HTTP 方法、内容类型等条件的自动匹配。
简单 RouterFunction 示例,实现对 "/hello-world"请求路径的自动路由:
RouterFunction<ServerResponse> helloWorldRoute = RouterFunctions.route(RequestPredicates.path("/hello-world"), new HelloWorldHandlerFunction());
类似的,我们应该把 RouterFunction 和各种 HandlerFunction 按照需求结合起来一起使用,常见的做法也是根据领域对象来设计对应的 RouterFunction。
路由机制的优势在于它的组合型。两个路由功能可以组合成一个新的路由功能,并通过一定的评估方法路由到其中任何一个处理函数。如果第一个路由的谓词不匹配,则第二个谓词会被评估。请注意组合的路由器功能会按照顺序进行评估,因此在通用功能之前放置一些特定功能是一项最佳实践。在 RouterFunction 中,同样提供了对应的组合方法来实现这一目标,请看下面的代码。
default RouterFunction<T> and(RouterFunction<T> other) { return new RouterFunctions.SameComposedRouterFunction<>(this, other); } default RouterFunction<T> andRoute(RequestPredicate predicate, HandlerFunction<T> handlerFunction) { return and(RouterFunctions.route(predicate, handlerFunction)); }
我们可以通过调用上述两个方法中的任意一个来组合两个路由功能,其中后者相当于 RouterFunction.and() 方法与 RouterFunctions.route() 方法的集成。以下代码演示了 RouterFunctions 的组合特性。
RouterFunction<ServerResponse> personRoute = route(GET("/orders/{id}").and(accept(APPLICATION_JSON)), personHandler::getOrderById) .andRoute(GET("/orders").and(accept(APPLICATION_JSON)), personHandler::getOrders) .andRoute(POST("/orders").and(contentType(APPLICATION_JSON)), personHandler::createOrder);
RequestPredicates 工具类所提供的大多数谓词也具备组合特性。例如, RequestPredicates.GET(String) 方法的实现如下所示。
public static RequestPredicate GET(String pattern) { return method(HttpMethod.GET).and(path(pattern)); }
可以看到,该方法是 RequestPredicates.method(HttpMethod.GET) 和 RequestPredicates.path(String) 的组合。我们可以通过调用 RequestPredicate.and(RequestPredicate) 方法或 RequestPredicate.or(RequestPredicate) 方法来构建复杂的请求谓词。
案例集成:ReactiveSpringCSS 中的 Web 服务
customer-service 分别需要访问 account-service 和 order-service 服务中的 Web 服务。
已基于注解编程模型实现了 account-service 中的 AccountController。今天演示 order-service 中 Web 服务实现过程。
基于函数式编程模型,在 order-service 中,编写 OrderHandler 专门实现根据 OrderNumber 获取 Order 领域实体的处理函数
@Configuration public class OrderHandler { @Autowired private OrderService orderService; public Mono<ServerResponse> getOrderByOrderNumber(ServerRequest request) { String orderNumber = request.pathVariable("orderNumber"); return ServerResponse.ok().body(this.orderService.getOrderByOrderNumber(orderNumber), Order.class); } }
上述代码示例中,我们创建了一个 OrderHandler 类,然后注入 OrderService 并实现了一个 getOrderByOrderNumber() 处理函数。
现在我们已经具备了 OrderHandler,就可以创建对应的 OrderRouter 了,示例代码如下。
@Configuration public class OrderRouter { @Bean public RouterFunction<ServerResponse> routeOrder(OrderHandler orderHandler) { return RouterFunctions.route( RequestPredicates.GET("/orders/{orderNumber}") .and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), orderHandler::getOrderByOrderNumber); } }
在这个示例中,我们通过访问“/orders/{orderNumber}”端点就会自动触发 orderHandler 中的 getOrderByOrderNumber() 方法并返回相应的 ServerResponse。
接下来,假设我们已经分别通过远程调用获取了目标 Account 对象和 Order 对象,那么 generateCustomerTicket 方法的执行流程就可以明确了。基于响应式编程的实现方法,我们可以得到如下所示的示例代码。
public Mono<CustomerTicket> generateCustomerTicket(String accountId, String orderNumber) { // 创建 CustomerTicket 对象 CustomerTicket customerTicket = new CustomerTicket(); customerTicket.setId("C_" + UUID.randomUUID().toString()); // 从远程 account-service 获取 Account 对象 Mono<AccountMapper> accountMapper = getRemoteAccountByAccountId(accountId); // 从远程 order-service 中获取 Order 对象 Mono<OrderMapper> orderMapper = getRemoteOrderByOrderNumber(orderNumber); Mono<CustomerTicket> monoCustomerTicket = Mono.zip(accountMapper, orderMapper).flatMap(tuple -> { AccountMapper account = tuple.getT1(); OrderMapper order = tuple.getT2(); if(account == null || order == null) { return Mono.just(customerTicket); } // 设置 CustomerTicket 对象属性 customerTicket.setAccountId(account.getId()); customerTicket.setOrderNumber(order.getOrderNumber()); customerTicket.setCreateTime(new Date()); customerTicket.setDescription("TestCustomerTicket"); return Mono.just(customerTicket); }); // 保存 CustomerTicket 对象并返回 return monoCustomerTicket.flatMap(customerTicketRepository::save); }
显然,这里的 getRemoteAccountById 和 getRemoteOrderByOrderNumber 方法都涉及了非阻塞式的远程 Web 服务的调用,这一过程我们将放在下一讲中详细介绍。
请注意,到这里时使用了 Reactor 框架中的 zip 操作符,将 accountMapper 流中的元素与 orderMapper 流中的元素按照一对一的方式进行合并,合并之后得到一个 Tuple2 对象。然后,我们再分别从这个 Tuple2 对象中获取 AccountMapper 和 OrderMapper 对象,并将它们的属性填充到所生成的 CustomerTicket 对象中。最后,我们通过 flatMap 操作符调用了 customerTicketRepository 的 save 方法完成了数据的持久化。这是 zip 和 flatMap 这两个操作符非常经典的一种应用场景,你需要熟练掌握。
总结
好了,那么本讲内容就说到这。延续上一讲,我们接着讨论了 Spring WebFlux 的使用方法,并给出了基于函数式编程模型的 RESTful 端点创建方法。在这种开发模型中,重点把握:
- ServerRequest
- ServerResponse
- HandlerFunction
- RouterFunction
核心对象的使用方法。
FAQ
WebFlux 函数式编程模型中包含哪些核心编程对象吗?
现在,我们已经通过 WebFlux 构建了响应式 Web 服务,下一步就是如何来消费它们了。Spring 也专门提供了一个非阻塞式的 WebClient 工具类来完成这一目标,下一讲我就来和你系统地讨论这个工具类的使用方法,到时见。
到此这篇关于Spring WebFlux使用函数式编程模型构建异步非阻塞服务的文章就介绍到这了,更多相关Spring WebFlux构建异步非阻塞服务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!