java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Project Reactor 响应式编程

Project Reactor 响应式范式编程

作者:一只小小的Bug

这篇文章主要为大家介绍了Project Reactor 响应式范式编程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

什么是响应式编程?

响应式编程是一种编程范式,它关注数据的变化和传播,而不是控制流。响应式编程可以提高程序的性能、弹性和可伸缩性,使程序能够及时响应用户的需求和环境的变化。在本文中,我们将介绍Java中的响应式编程的基本概念、原理和实践。

响应式编程的核心思想是将数据和行为抽象为流(Stream),流可以表示任何异步的事件或值,比如用户输入、网络请求、数据库查询等。流可以被观察(Observable),也就是说,可以有一个或多个观察者(Observer)订阅流,并在流发生变化时接收通知。流还可以被操作(Operator),也就是说,可以对流进行各种转换、过滤、组合等操作,从而生成新的流。

响应式编程的优势:

响应式编程的缺点:

Java中的响应式编程

Java中有多种框架和库可以实现响应式编程,比如RxJava、Spring Reactor、Vert.x等。这些框架和库都遵循了Reactive Streams规范,这是一套定义了非阻塞背压的异步流处理标准的接口。

Reactive Streams规范主要包括四个接口:

Project Reactor

Project Reactor是一个完全非阻塞的包含背压支持的响应式编程基石。它是Spring生态系统中Spring Reactive的基础,被用于如Spring WebFlux, Spring Data和Spring Cloud Gateway等项目中。

基本概念

Project Reactor的核心思想是将数据和事件看作是流(stream),流可以被创建,转换,过滤,合并,分组,缓冲,错误处理等等。流是惰性的,只有当有订阅者(subscriber)订阅时才会开始发射数据或事件。流可以是有限的,也可以是无限的,可以是同步的,也可以是异步的,可以是单线程的,也可以是多线程的。流还可以支持背压(backpressure),即订阅者可以控制流的速度,避免被过多的数据或事件淹没。

核心组件

Project Reactor提供了两个主要的接口来表示流:

它们都是Publisher<T>的实现,可以发出0-N个元素的异步序列,并根据订阅者的需求推送元素。

Flux表示的是包含0到N个元素的异步序列,可以被onComplete信号或者onError信号所终止。

Mono表示的是包含0或1个元素的异步序列,也可以被onComplete信号或者onError信号所终止。

Flux和Mono之间可以进行转换。

代码示例

Mono

// 创建一个Mono对象,包含一个字符串元素
Mono<String> mono = Mono.just("Hello World");

// 订阅这个Mono对象,并打印元素值
mono.subscribe(System.out::println);

使用Mono.just方法创建了一个包含一个字符串元素的Mono对象,然后使用subscribe方法订阅了这个对象,并提供了一个回调函数来打印元素值。当Mono对象发出元素值时,回调函数就会被调用。

Mono to Flux

把Mono转换成Flux的一种方法是使用flux()方法,它会返回一个包含Mono发出的元素的Flux,或者如果Mono为空,则返回一个空的Flux。例如:

// 创建一个Mono对象,包含一个整数元素
Mono<Integer> mono = Mono.just(1);

// 使用flux()方法把Mono转换成Flux
Flux<Integer> flux = mono.flux();

// 订阅这个Flux对象,并打印元素值
flux.subscribe(System.out::println); // 输出1

另一种方法是使用concatWith()方法,它会将Mono与另一个Publisher连接起来,形成一个Flux。例如:

// 创建一个Mono对象,包含一个整数元素
Mono<Integer> mono = Mono.just(1);

// 创建一个Flux对象,包含两个整数元素
Flux<Integer> flux = Flux.just(2, 3);

// 使用concatWith()方法把Mono和Flux连接起来
Flux<Integer> result = mono.concatWith(flux);

// 订阅这个Flux对象,并打印元素值
result.subscribe(System.out::println); // 输出1, 2, 3

Mono常用的操作

// 从一个固定的值创建一个Mono
Mono.just("Hello").subscribe(System.out::println); // 输出Hello

// 从一个Callable对象创建一个Mono
Callable<String> callable = () -> "World";
Mono.fromCallable(callable).subscribe(System.out::println); // 输出World

// 从一个Supplier对象创建一个Mono
Supplier<String> supplier = () -> "Supplier!";
Mono.fromSupplier(supplier).subscribe(System.out::println); // 输出Supplier!

// 对Mono发出的元素进行映射操作
Mono.just("Hello").map(s -> s + " World").subscribe(System.out::println); // 输出Hello World

// 对Mono发出的元素进行扁平化操作
Mono.just("Hello")
    .flatMap(s -> Mono.just(s + " World"))
    .subscribe(System.out::println); // 输出Hello World

// 对Mono发出的元素进行过滤操作
Mono.just(1).filter(i -> i > 0).subscribe(System.out::println); // 输出1

// 将多个Mono合并为一个Mono
Mono.zip(Mono.just("Hello"), Mono.just("World"))
    .subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2())); // 输出Hello World

// 将多个Mono合并为一个Flux
Mono.just("Hello")
    .mergeWith(Mono.just("World"))
    .subscribe(System.out::println); // 输出Hello World

// 在这个Mono完成后,继续处理另一个发布者
Mono.just("Hello").then(Mono.just("World")).subscribe(System.out::println); // 输出World

// 在这个Mono发出元素时,执行一个副作用操作
Mono.just("Hello")
    .doOnNext(s -> System.out.println("Before: " + s))
    .map(s -> s + " World")
    .doOnNext(s -> System.out.println("After: " + s))
    .subscribe(); 
// 输出
// Before: Hello 
// After: Hello World

Flux

// 创建一个Flux对象,包含三个整数元素
Flux<Integer> flux = Flux.just(1, 2, 3);

// 订阅这个Flux对象,并打印元素值
flux.subscribe(System.out::println);

使用Flux.just方法创建了一个包含三个整数元素的Flux对象,然后使用subscribe方法订阅了这个对象,并提供了一个回调函数来打印元素值。当Flux对象发出元素值时,回调函数就会被调用。

Flux to Mono

把Flux转换成Mono的一种方法是使用next()方法,它会返回Flux发出的第一个元素,或者如果Flux为空,则返回一个空的Mono。

例如:

// 创建一个Flux对象,包含三个整数元素
Flux<Integer> flux = Flux.just(1, 2, 3);

// 使用next()方法把Flux转换成Mono
Mono<Integer> mono = flux.next();

// 订阅这个Mono对象,并打印元素值
mono.subscribe(System.out::println); // 输出1

另一种方法是使用collectList()方法,它会把Flux发出的所有元素收集到一个列表中,并返回一个包含这个列表的Mono。

例如:

// 创建一个Flux对象,包含三个整数元素
Flux<Integer> flux = Flux.just(1, 2, 3);

// 使用collectList()方法把Flux转换成Mono
Mono<List<Integer>> mono = flux.collectList();

// 订阅这个Mono对象,并打印元素值
mono.subscribe(System.out::println); // 输出[1, 2, 3]

Flux常用的操作

// 从多个固定的值创建一个Flux
Flux.just("Hello", "World").subscribe(System.out::println); // 输出Hello World

// 从一个数组对象创建一个Flux
String[] array = {"Hello", "World"};
Flux.fromArray(array).subscribe(System.out::println); // 输出Hello World

// 从一个Iterable对象创建一个Flux
List<String> list = Arrays.asList("Hello", "World");
Flux.fromIterable(list).subscribe(System.out::println); // 输出Hello World

// 从一个Stream对象创建一个Flux
Stream<String> stream = Stream.of("Hello", "World");
Flux.fromStream(stream).subscribe(System.out::println); // 输出Hello World

// 创建一个包含指定范围内整数的Flux
Flux.range(1, 5).subscribe(System.out::println); // 输出1 2 3 4 5

// 创建一个按照指定时间间隔从0整数递增的Flux
Duration duration = Duration.ofSeconds(1);
Flux<Long> interval = Flux.interval(duration);
interval.subscribe(System.out::println);
// 使用blockLast阻塞主线程,防止程序立即退出
interval.blockLast();
// 输出结果每秒打印一次
// 0
// 1
// 2
// 3
// 4
// ...

// 对Flux发出的每个元素进行映射操作
Flux.just("Hello", "World").map(s -> s + "!")
    .subscribe(System.out::println); // 输出Hello! World!

// 对Flux发出的每个元素进行扁平化操作
Flux.just("Hello", "World")
    .flatMap(s -> Flux.just(s + "!"))
    .subscribe(System.out::println); //输出Hello! World!

// 对Flux发出的每个元素进行过滤操作
Flux.range(1, 5).filter(i -> i % 2 == 0).subscribe(System.out::println); 
// 输出2 4

// 将多个Flux合并为一个Flux
Flux.zip(Flux.just("Hello"), Flux.just("World"))
    .subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2())); 
// 输出Hello World

// 将多个Flux合并为一个Flux
Flux.just("Hello").mergeWith(Flux.just("World")).subscribe(System.out::println); 
//  输出Hello World

// 将多个Flux合并为一个Flux
Flux.just("Hello").concatWith(Flux.just("World")).subscribe(System.out::println); 
// 输出Hello World

// 将所有元素收集到一个List中
Flux.just("Hello", "World").collectList().subscribe(list -> System.out.println(list)); // 输出[Hello, World]

Flux的zip、mergeWith、concatWith区别

zip、mergeWith和concatWith都是用来将多个Flux合并为一个Flux的操作,但是它们有一些区别:

zip会将多个Flux的元素按照一对一的方式进行合并,形成一个包含元组的Flux,每个元组中包含了每个源Flux的一个元素。如果源Flux的元素个数不一致,那么zip会以最短的Flux为基准,多余的元素会被丢弃。

  Flux<String> flux1 = Flux.just("A", "B", "C");
  Flux<Integer> flux2 = Flux.just(1, 2, 3, 4);
  Flux<Tuple2<String, Integer>> flux3 = Flux.zip(flux1, flux2);
  flux3.subscribe(tuple -> System.out.println(tuple.getT1() + " " + tuple.getT2())); 
  // 输出A 1 B 2 C 3
  // 4不会输出,因为最短的Flux是flux1,长度是3

mergeWith会将多个Flux的元素按照时间顺序进行合并,形成一个包含所有元素的Flux。如果源Flux的元素有重叠,那么mergeWith会保留所有的元素。

  Duration duration1 = Duration.ofMillis(100);
  Duration duration2 = Duration.ofMillis(200);
  Flux<String> flux1 = Flux.interval(duration1).map(i -> "A" + i);
  Flux<String> flux2 = Flux.interval(duration2).map(i -> "B" + i);
  Flux<String> flux = flux1.mergeWith(flux2);
  flux.subscribe(System.out::println); 
  // 输出A0 B0 A1 B1 A2 A3 B2 A4 B3 A5 ...

concatWith会将多个Flux的元素按照订阅顺序进行合并,形成一个包含所有元素的Flux。如果源Flux的元素有重叠,那么concatWith会保留所有的元素。concatWith会等待上一个源Flux完成后才订阅下一个源Flux。

  Duration duration1 = Duration.ofMillis(100);
  Duration duration2 = Duration.ofMillis(200);
  // 每100ms递增1,打印5次结束
  Flux<String> flux1 = Flux.interval(duration1).map(i -> "A" + i).take(5);
  Flux<String> flux2 = Flux.interval(duration2).map(i -> "B" + i).take(5);
  Flux<String> flux3 = flux1.concatWith(flux2);
  flux3.subscribe(System.out::println);
  // 避免程序立即退出
  flux3.blockLast();
  // 输出 
  // A0
  // A1
  // A2
  // A3
  // A4
  // B0
  // B1
  // B2
  // B3
  // B4

可以看到,Project Reactor和Java 8 Stream的用法看起来很像,因为它们都提供了一些函数式编程的方法,用来对数据流进行操作,例如map、filter、reduce等。但是它们的本质是不同的,主要有以下几个区别:

本文只介绍了Project Reactor的基本概念和用法,包括Flux和Mono的创建、转换、过滤、合并等操作。如果你想深入学习Project Reactor,你可以参考官方文档或者其他相关的资料,更多关于Project Reactor 响应式编程的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文