java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Reactor定制生产WebClient

Reactor定制一个生产的WebClient实现示例

作者:六七十三

这篇文章主要为大家介绍了Reactor定制一个生产的WebClient实现示例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

1 为什么要用 WebClient

刚开始尝试使用 Spring WebFlux 的时候,很多人都会使用 Mono.fromFuture() 将异步请求转成 Mono 对象,或者 Mono.fromSupplier() 将请求转成 MOno 对象,这两种方式在响应式编程中都是不建议的,都会阻塞当前线程。

1.1 Mono.fromFuture() VS WebClient

Mono.fromFuture()方法和使用 WebClient 调用第三方接口之间存在以下区别:

Mono.fromFuture()方法适用于接收一个 java.util.concurrent.Future 对象,并将其转换为响应式的 Mono。这是一个阻塞操作,因为它会等待 Future 对象完成。而使用 WebClient 调用第三方接口是异步和非阻塞的,它不会直接阻塞应用程序的执行,而是使用事件驱动的方式处理响应。

可扩展性和灵活性:使用 WebClient 可以更灵活地进行配置和处理,例如设置超时时间、请求头、重试机制等。WebClient 还可以与许多其他 Spring WebFlux 组件集成,如 WebSockets、Server-Sent Events 等。而 Mono.fromFuture() 是适用于单个 Future 对象转化为 Mono 的情况,可扩展性较差。

WebClient 提供了更丰富的错误处理机制,可以通过 onStatus、onError 等方法来处理不同的 HTTP 状态码或异常。同时,WebClient 还提供了更灵活的重试和回退策略。Mono.fromFuture() 方法只能将 Future 对象的结果包装在 Mono 中,不提供特定的错误处理机制。

Mono.fromFuture() 会阻塞。当调用 Mono.fromFuture() 方法将 Future 转换为 Mono 时,它会等待 Future 对象的结果返回。在这个等待的过程中,Mono.fromFuture()方法会阻塞当前的线程。这意味着,如果 Future 的结果在运行过程中没有返回,则当前线程会一直阻塞,直到 Future 对象返回结果或者超时。因此,在使用 Mono.fromFuture() 时需要注意潜在的阻塞风险。另外,需要确保F uture 的任务在后台线程中执行,以免阻塞应用程序的主线程。

1.2 Mono.fromFuture VS Mono.fromSupplier

Mono.fromSupplier() 和 Mono.fromFuture() 都是用于将异步执行的操作转换为响应式的 Mono 对象,但它们的区别在于:

Mono.fromSupplier() 适用于一个提供者/生产者,可以用来表示某个操作的结果,该操作是一些纯计算并且没有阻塞的方法。也就是说,Mono.fromSupplier() 将其参数 (Supplier) 所提供的操作异步执行,并将其结果打包成一个 Mono 对象。

Mono.fromFuture() 适用于一个 java.util.concurrent.Future 对象,将其封装成 Mono 对象。这意味着调用 Mono.fromFuture() 方法将阻塞当前线程,直到异步操作完成返回一个 Future 对象。

因此,Mono.fromSupplier() 与 Mono.fromFuture() 的主要区别在于:

Mono.fromSupplier() 是一个非阻塞的操作,不会阻塞当前线程。这个方法用于执行计算型的任务,返回一个封装了计算结果的 Mono 对象。
Mono.fromFuture() 是阻塞操作,会阻塞当前线程,直到异步操作完毕并返回看,它适用于处理 java.util.concurrent.Future 对象。

需要注意的是,如果 Supplier 提供的操作是阻塞的,则 Mono.fromSupplier() 方法本身也会阻塞线程。但通常情况下,Supplier 提供的操作是纯计算型的,不会阻塞线程。

因此,可以使用 Mono.fromSupplier() 方法将一个纯计算型的操作转换为 Mono 对象,而将一个异步返回结果的操作转换为 Mono 对象时,可以使用 Mono.fromFuture() 方法。

2 定制化自己的 WebClient

2.1 初始化 WebClient

WebClient 支持建造者模式,使用 WebClient 建造者模式支持开发自己的个性化 WebClient,比如支持设置接口调用统一耗时、自定义底层 Http 客户端、调用链路、打印接口返回日志、监控接口耗时等等。

WebClient builder 支持以下方法

interface Builder {
        /**
         * 配置请求基础的url,如:baseUrl = "https://abc.go.com/v1";和 uriBuilderFactory 冲突,如果有 uriBuilderFactory ,则忽略 baseUrl
         */
        Builder baseUrl(String baseUrl);
        /**
         * URI 请求的默认变量。也和 uriBuilderFactory 冲突,如果有 uriBuilderFactory ,则忽略 defaultUriVariables
         */
        Builder defaultUriVariables(Map<String, ?> defaultUriVariables);
        /**
         * 提供一个预配置的UriBuilderFactory实例
         */
        Builder uriBuilderFactory(UriBuilderFactory uriBuilderFactory);
        /**
         * 默认 header
         */
        Builder defaultHeader(String header, String... values);
        /**
         * 默认cookie
         */
        Builder defaultCookie(String cookie, String... values);
        /**
         * 提供一个 consumer 来定制每个请求
         */
        Builder defaultRequest(Consumer<RequestHeadersSpec<?>> defaultRequest);
        /**
         * 添加一个filter,可以添加多个
         */
        Builder filter(ExchangeFilterFunction filter);
        /**
         * 配置要使用的 ClientHttpConnector。这对于插入或自定义底层HTTP 客户端库(例如SSL)的选项非常有用。
         */
        Builder clientConnector(ClientHttpConnector connector);
        /**
         * Configure the codecs for the {@code WebClient} in the
         * {@link #exchangeStrategies(ExchangeStrategies) underlying}
         * {@code ExchangeStrategies}.
         * @param configurer the configurer to apply
         * @since 5.1.13
         */
        Builder codecs(Consumer<ClientCodecConfigurer> configurer);
        /**
         * 提供一个预先配置了ClientHttpConnector和ExchangeStrategies的ExchangeFunction。
这是对 clientConnector 的一种替代,并且有效地覆盖了它们。
         */
        Builder exchangeFunction(ExchangeFunction exchangeFunction);
        /**
         * Builder the {@link WebClient} instance.
         */
        WebClient build();
  // 其他方法
    }

2.2 日志打印及监控

.doOnSuccess(response-> {
    log.info("get.success, url={}, response={}, param={}", url, response);
})
.doOnError(error-> {
    log.info("get.error, url={}", url, error);
    // 监控
})
.doFinally(res-> {
  //监控
})

2.3 返回处理

retrieve() // 声明如何提取响应。例如,提取一个ResponseEntity的状态,头部和身体:

.bodyToMono(clazz) 将返回body内容转成clazz对象,clazz 对象可以自己指定类型。如果碰到有问题的无法转化的,也可以先转成String,然后自己实现一个工具类,将String转成 class 对象。

2.3.1 get

public <T> Mono<T> get(String url, Class<T> clazz, T defaultClass) {
long start = System.currentTimeMillis();
return webClient.get()
        .uri(url)
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(clazz)
        .doOnSuccess(response-> {
            log.info("get.success, url={}, response={}, param={}", url, response);
        })
        .doOnError(error-> {
            log.info("get.param.error, url={}", url, error);
        })
        .onErrorReturn(defaultClass)
        .doFinally(res-> {
        })
        .publishOn(customScheduler);
}

2.3.2 get param 请求

public <T> Mono<T> getParam(String url, MultiValueMap<String, String> param, Class<T> clazz, T defaultClass) {
long start = System.currentTimeMillis();
URI uri = UriComponentsBuilder.fromUriString(url)
        .queryParams(param)
        .build()
        .toUri();
return webClient.get()
        .uri(uri)
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(clazz)
        .doOnSuccess(response-> {
            log.info("get.param.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(param));
        })
        .doOnError(error-> {
            log.error("get.param.error, url={}, param={}", url, JsonUtil.toJson(param), error);
        })
        .onErrorReturn(defaultClass)
        .doFinally(res-> {
        // 监控 or 打印日志 or 耗时
        })
        .publishOn(customScheduler);
}

2.3.3 post json 请求

public <T> Mono<T> postJson(String url, final HttpParameter4Json parameter, Class<T> clazz, T defaultClass) {
final long start = System.currentTimeMillis();
return webClient.post()
        .uri(url)
        .contentType(MediaType.APPLICATION_JSON)
        .cookies(cookies -> cookies.setAll(parameter.getCookies()))
        .body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType()))
        .headers(headers -> headers.setAll(parameter.getHeaders()))
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .bodyToMono(clazz)
        .doOnSuccess(response-> {
            log.info("post.json.success, url={}, response={}, param={}", url, response, parameter.getJsonBody());
        })
        .doOnError(error-> {
            log.error("get.param.error, url={}, param={}", url, parameter.getJsonBody(), error);
        })
        .onErrorReturn(defaultClass)
        .doFinally(res-> {
        })
        .publishOn(customScheduler);
}

2.3.4 post form Data 请求

public <T> Mono<T> postFormData(String url, HttpParameter parameter, Class<T> clazz, T defaultClass) {
    final long start = System.currentTimeMillis();
    return webClient.post()
            .uri(url)
            .contentType(MediaType.APPLICATION_FORM_URLENCODED)
            .cookies(cookies -> cookies.setAll(parameter.getCookies()))
            .body(BodyInserters.fromFormData(parameter.getMultiValueMapParam()))
            .headers(headers -> headers.setAll(parameter.getMapHeaders()))
            .accept(MediaType.APPLICATION_JSON)
            .retrieve()
            .bodyToMono(clazz)
            .doOnSuccess(response-> {
                log.info("post.fromData.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(parameter));
            })
            .doOnError(error-> {
                log.info("get.param.error, url={}, param={}", url, JsonUtil.toJson(parameter), error);
            })
            .onErrorReturn(defaultClass)
            .doFinally(res-> {
            })
            .publishOn(customScheduler);
}

2.4 异常处理

异常返回兜底

onErrorReturn 发现异常返回兜底数据

异常处理

状态码转成异常抛出

.onStatus(HttpStatus::isError, response -> Mono.error(new RuntimeException("Request failed with status code: " + response.statusCode())))

监控异常

.doOnError(error -> {
    // log and monitor
})

3 完整的 WebClient

package com.geniu.reactor.webclient;
import com.geniu.utils.JsonUtil;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.SslProvider;
import reactor.netty.tcp.TcpClient;
import java.net.URI;
import java.time.Duration;
import java.util.function.Function;
/**
 * @Author: prepared
 * @Date: 2023/8/15 11:05
 */
@Slf4j
public class CustomerWebClient {
    public static final CustomerWebClient instance = new CustomerWebClient();
    /**
     * 限制并发数 100
     */
    Scheduler customScheduler = Schedulers.newParallel("CustomScheduler", 100);
    private final WebClient webClient;
    private CustomerWebClient() {
        final SslContextBuilder sslBuilder = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE);
        final SslProvider ssl = SslProvider.builder().sslContext(sslBuilder)
                .defaultConfiguration(SslProvider.DefaultConfigurationType.TCP).build();
        final int cpuCores = Runtime.getRuntime().availableProcessors();
        final int selectorCount = Math.max(cpuCores / 2, 4);
        final int workerCount = Math.max(cpuCores * 2, 8);
        final LoopResources pool = LoopResources.create("HCofSWC", selectorCount, workerCount, true);
        final Function<? super TcpClient, ? extends TcpClient> tcpMapper = tcp -> tcp
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                .option(ChannelOption.SO_TIMEOUT, 10000)
                .secure(ssl)
                .runOn(pool);
        ConnectionProvider.Builder httpClientOfSWC = ConnectionProvider
                .builder("HttpClientOfSWC")
                .maxConnections(100_000)
                .pendingAcquireTimeout(Duration.ofSeconds(6));
        final ConnectionProvider connectionProvider = httpClientOfSWC.build();
        final HttpClient hc = HttpClient.create(connectionProvider)
                .tcpConfiguration(tcpMapper);
        final Function<HttpClient, HttpClient> hcMapper = rhc -> rhc
                .compress(true);
        final WebClient.Builder wcb = WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(hcMapper.apply(hc)));
//                .filter(new TraceRequestFilter()); 可以通过Filter 增加trace追踪
        this.webClient = wcb.build();
    }
    public <T> Mono<T> get(String url, Class<T> clazz, T defaultClass) {
        long start = System.currentTimeMillis();
        return webClient.get()
                .uri(url)
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .onStatus(HttpStatus::isError, response -> Mono.error(new RuntimeException("Request failed with status code: " + response.statusCode())))
                .bodyToMono(clazz)
                .doOnSuccess(response-> {
                    log.info("get.success, url={}, response={}, param={}", url, response);
                })
                .doOnError(error-> {
                    log.info("get.param.error, url={}", url, error);
                })
                .onErrorReturn(defaultClass)
                .doFinally(res-> {
                })
                .publishOn(customScheduler);
    }
    public <T> Mono<T> getParam(String url, MultiValueMap<String, String> param, Class<T> clazz, T defaultClass) {
        long start = System.currentTimeMillis();
        URI uri = UriComponentsBuilder.fromUriString(url)
                .queryParams(param)
                .build()
                .toUri();
        return webClient.get()
                .uri(uri)
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(clazz)
                .doOnSuccess(response-> {
                    log.info("get.param.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(param));
                })
                .doOnError(error-> {
                    log.error("get.param.error, url={}, param={}", url, JsonUtil.toJson(param), error);
                })
                .onErrorReturn(defaultClass)
                .doFinally(res-> {
                })
                .publishOn(customScheduler);
    }
    public <T> Mono<T> postJson(String url, final HttpParameter4Json parameter, Class<T> clazz, T defaultClass) {
        final long start = System.currentTimeMillis();
        return webClient.post()
                .uri(url)
                .contentType(MediaType.APPLICATION_JSON)
                .cookies(cookies -> cookies.setAll(parameter.getCookies()))
                .body(Mono.just(parameter.getJsonBody()), ParameterizedTypeReference.forType(parameter.getBodyType()))
                .headers(headers -> headers.setAll(parameter.getHeaders()))
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(clazz)
                .doOnSuccess(response-> {
                    log.info("post.json.success, url={}, response={}, param={}", url, response, parameter.getJsonBody());
                })
                .doOnError(error-> {
                    log.error("get.param.error, url={}, param={}", url, parameter.getJsonBody(), error);
                })
                .onErrorReturn(defaultClass)
                .doFinally(res-> {
                })
                .publishOn(customScheduler);
    }
    public <T> Mono<T> postFormData(String url, HttpParameter parameter, Class<T> clazz, T defaultClass) {
        final long start = System.currentTimeMillis();
        return webClient.post()
                .uri(url)
                .contentType(MediaType.APPLICATION_FORM_URLENCODED)
                .cookies(cookies -> cookies.setAll(parameter.getCookies()))
                .body(BodyInserters.fromFormData(parameter.getMultiValueMapParam()))
                .headers(headers -> headers.setAll(parameter.getMapHeaders()))
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(clazz)
                .doOnSuccess(response-> {
                    log.info("post.fromData.success, url={}, response={}, param={}", url, response, JsonUtil.toJson(parameter));
                })
                .doOnError(error-> {
                    log.info("get.param.error, url={}, param={}", url, JsonUtil.toJson(parameter), error);
                })
                .onErrorReturn(defaultClass)
                .doFinally(res-> {
                })
                .publishOn(customScheduler);
    }
}

以上就是Reactor定制一个生产的WebClient实现示例的详细内容,更多关于Reactor定制生产WebClient的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文