Spring响应式编程之Reactor操作符详解
作者:minh_coo
文章介绍了Reactor库中常用的响应式流操作符,分为创建、转换、组合、条件和错误处理五类,详细列举了每类操作符的功能和用途,这些操作符旨在提高响应式流的可读性和开发效率,帮助开发者更高效地处理数据流
操作符Processo<T,R>
操作符并不是响应式流规范的一部分,但为了改进响应式代码的可读性并降低开发成本,Reactor 库中的 API 提供了一组丰富的操作符,这些操作符为响应式流规范提供了最大的附加值。
下面介绍一些常用的操作符。
(1)创建操作符
just
:创建一个包含单个元素的Mono或多个元素的Flux;empty
:创建一个空的Flux或Mono;defer
:在订阅时动态创建一个新的Flux或Mono;fromArray
:从数组创建Flux;fromIterable
:从Iterable对象创建Flux;range
:创建一个从start到end的整数序列Flux;interval
:创建一个按时间间隔发布数据的Flux;
import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; public class CreationExample { public static void main(String[] args) { // 示例 1: 使用 Mono 创建操作符 Mono<String> monoJust = Mono.just("Hello, Mono"); Mono<String> monoEmpty = Mono.empty(); Mono<String> monoDefer = Mono.defer(() -> Mono.just("Deferred Mono")); // 订阅 Mono 并打印结果 monoJust.subscribe(System.out::println); monoEmpty.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Completed")); monoDefer.subscribe(System.out::println); // 示例 2: 使用 Flux 创建操作符 Flux<String> fluxJust = Flux.just("A", "B", "C"); Flux<String> fluxFromArray = Flux.fromArray(new String[]{"A", "B", "C"}); List<String> list = Arrays.asList("A", "B", "C"); Flux<String> fluxFromIterable = Flux.fromIterable(list); Flux<String> fluxFromStream = Flux.fromStream(Stream.of("A", "B", "C")); Flux<Integer> fluxRange = Flux.range(1, 5); Flux<Long> fluxInterval = Flux.interval(Duration.ofSeconds(1)); Flux<String> fluxDefer = Flux.defer(() -> Flux.just("Deferred Flux")); // 订阅 Flux 并打印结果 fluxJust.subscribe(System.out::println); fluxFromArray.subscribe(System.out::println); fluxFromIterable.subscribe(System.out::println); fluxFromStream.subscribe(System.out::println); fluxRange.subscribe(System.out::println); fluxInterval.take(5).subscribe(System.out::println); fluxDefer.subscribe(System.out::println); } }
(2)转换操作符
map
:将Mono中的值或Flux中的每个元素转换为另一种类型;flatmap
:将Mono中的值或Flux中的每个元素转换为另一个Mono或另一个Publisher,并展平结果;flatMapSequential
:类似于flatMap,但保持顺序并并行处理;flatMapMany
:将Mono中的值转换为Flux;collectList
: 将Flux中的所有元素收集到一个List中,返回Mono<List<T>>;collectMap
:将Flux中的元素收集到一个Map中,返回Mono<Map<K,V>>;reduce
:聚合Flux中的元素,返回Mono;buffer
:将Flux中的元素收集到List中,按指定大小进行分组;window
:将Flux中的元素分组到Flux中,每组包含指定数量的元素;
import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.List; public class ConversionExample { public static void main(String[] args) { // 示例 1: 使用 Mono 转换操作符 Mono<Integer> mono = Mono.just("123") .map(Integer::parseInt) .flatMap(i -> Mono.just(i * 2)) .doOnNext(System.out::println); mono.subscribe(); // 示例 2: 使用 Flux 转换操作符 Flux<Integer> flux = Flux.just("1", "2", "3", "4", "5") .map(Integer::parseInt) .filter(i -> i % 2 == 0) .flatMap(i -> Flux.just(i * 2)) .concatMap(i -> Flux.just(i + 1)) .buffer(2) .doOnNext(System.out::println); flux.subscribe(); } }
(3)组合操作符
zipWith
:将两个Mono的值组合成一个新的Mono;zip
:将多个Flux的元素组合成一个Flux;then
:在当前Mono或Flux完成后执行另一个Mono或Flux;thenReturn
:在当前Mono或Flux完成后返回一个指定的值;thenMany
:在当前Mono完成后返回一个Flux;when
:等待多个Publisher完成
import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class CombinationExample { public static void main(String[] args) { // 示例 1: 使用 Mono 组合操作符 Mono<String> mono1 = Mono.just("Hello"); Mono<String> mono2 = Mono.just("World"); Mono<String> combined = mono1.zipWith(mono2, (a, b) -> a + " " + b); combined.subscribe(System.out::println); // 输出: Hello World Mono<Void> when = Mono.when(mono1, mono2); when.subscribe(null, Throwable::printStackTrace, () -> System.out.println("Completed")); // 输出: Completed // 示例 2: 使用 Flux 组合操作符 Flux<String> flux1 = Flux.just("A", "B", "C"); Flux<String> flux2 = Flux.just("1", "2", "3"); Flux<String> merged = Flux.merge(flux1, flux2); merged.subscribe(System.out::println); // 输出: A 1 B 2 C 3 Flux<String> concatenated = Flux.concat(flux1, flux2); concatenated.subscribe(System.out::println); // 输出: A B C 1 2 3 Flux<String> zipped = Flux.zip(flux1, flux2, (a, b) -> a + b); zipped.subscribe(System.out::println); // 输出: A1 B2 C3 Flux<String> combinedLatest = Flux.combineLatest(flux1, flux2, (a, b) -> a + b); combinedLatest.subscribe(System.out::println); // 输出: C3 Flux<String> started = flux1.startWith("Start"); started.subscribe(System.out::println); // 输出: Start A B C } }
(4)条件操作符
hasElement
:判断Mono是否包含元素;hasElements
:判断Flux是否包含元素;hasElementWith
:判断Mono是否包含与给定Predicate匹配的元素;all
:判断Flux中的所有元素是否都满足给定的条件;any
:判断Flux中是否有任意一个元素满足给定的条件;isEmpty
:判断Flux是否为空;switchIfEmpty
:如果Mono或Flux为空,则切换到另一个Mono或Flux;defaultIfEmpty
:如果Mono或Flux为空,则返回默认值;
import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class ConditionalExample { public static void main(String[] args) { // 示例 1: 使用 Mono 条件操作符 Mono<String> mono = Mono.just("Hello"); Mono<Boolean> hasElement = mono.hasElement(); hasElement.subscribe(System.out::println); // 输出: true Mono<String> emptyMono = Mono.<String>empty(); Mono<String> switchIfEmptyMono = emptyMono.switchIfEmpty(Mono.just("Default")); switchIfEmptyMono.subscribe(System.out::println); // 输出: Default Mono<String> defaultIfEmptyMono = emptyMono.defaultIfEmpty("Default"); defaultIfEmptyMono.subscribe(System.out::println); // 输出: Default // 示例 2: 使用 Flux 条件操作符 Flux<Integer> flux = Flux.just(1, 2, 3, 4); Mono<Boolean> allMatch = flux.all(i -> i > 0); allMatch.subscribe(System.out::println); // 输出: true Mono<Boolean> anyMatch = flux.any(i -> i > 3); anyMatch.subscribe(System.out::println); // 输出: true Mono<Boolean> hasElements = flux.hasElements(); hasElements.subscribe(System.out::println); // 输出: true Mono<Boolean> isEmpty = flux.isEmpty(); isEmpty.subscribe(System.out::println); // 输出: false Flux<Integer> emptyFlux = Flux.<Integer>empty(); Flux<Integer> switchIfEmptyFlux = emptyFlux.switchIfEmpty(Flux.just(10, 20, 30)); switchIfEmptyFlux.subscribe(System.out::println); // 输出: 10 20 30 Flux<Integer> defaultIfEmptyFlux = emptyFlux.defaultIfEmpty(999); defaultIfEmptyFlux.subscribe(System.out::println); // 输出: 999 } }
(5)错误处理操作符
onErrorResume
:当发生错误时,切换到另一个数据流;onErrorReturn
:当发生错误时,返回一个默认值;onErrorMap
:将错误映射为另一个错误;retry
重试操作一定次数;retryWhen
:当错误发生时,根据提供的Publisher逻辑重试;doOnError
:当发生错误时执行一些额外的逻辑;
import reactor.core.publisher.Mono; import reactor.core.publisher.Flux; public class ErrorHandlingExample { public static void main(String[] args) { // 示例 1: 使用 Mono 错误处理操作符 Mono<String> mono1 = Mono.error(new RuntimeException("Error")) .onErrorResume(e -> Mono.just("Recovered")); mono1.subscribe(System.out::println); // 输出: Recovered Mono<String> mono2 = Mono.error(new RuntimeException("Error")) .onErrorReturn("Default"); mono2.subscribe(System.out::println); // 输出: Default Mono<String> mono3 = Mono.error(new RuntimeException("Error")) .onErrorMap(e -> new IllegalArgumentException("Mapped Error", e)); mono3.subscribe(System.out::println, Throwable::printStackTrace); // 输出: Mapped Error Mono<String> mono4 = Mono.error(new RuntimeException("Error")) .retry(3); mono4.subscribe(System.out::println, Throwable::printStackTrace); Mono<String> mono5 = Mono.error(new RuntimeException("Error")) .retryWhen(companion -> companion.take(3)); mono5.subscribe(System.out::println, Throwable::printStackTrace); Mono<String> mono6 = Mono.error(new RuntimeException("Error")) .doOnError(e -> System.out.println("Error occurred: " + e.getMessage())); mono6.subscribe(System.out::println, Throwable::printStackTrace); // 示例 2: 使用 Flux 错误处理操作符 Flux<String> flux1 = Flux.just("A", "B") .concatWith(Mono.error(new RuntimeException("Error"))) .onErrorResume(e -> Flux.just("Recovered")); flux1.subscribe(System.out::println); // 输出: A B Recovered Flux<String> flux2 = Flux.just("A", "B") .concatWith(Mono.error(new RuntimeException("Error"))) .onErrorReturn("Default"); flux2.subscribe(System.out::println); // 输出: A B Default Flux<String> flux3 = Flux.just("A", "B") .concatWith(Mono.error(new RuntimeException("Error"))) .onErrorMap(e -> new IllegalArgumentException("Mapped Error", e)); flux3.subscribe(System.out::println, Throwable::printStackTrace); // 输出: Mapped Error Flux<String> flux4 = Flux.just("A", "B") .concatWith(Mono.error(new RuntimeException("Error"))) .retry(3); flux4.subscribe(System.out::println, Throwable::printStackTrace); Flux<String> flux5 = Flux.just("A", "B") .concatWith(Mono.error(new RuntimeException("Error"))) .retryWhen(companion -> companion.take(3)); flux5.subscribe(System.out::println, Throwable::printStackTrace); Flux<String> flux6 = Flux.just("A", "B") .concatWith(Mono.error(new RuntimeException("Error"))) .doOnError(e -> System.out.println("Error occurred: " + e.getMessage())); flux6.subscribe(System.out::println, Throwable::printStackTrace); } }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。