java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Reactive反应式编程使用

Reactive反应式编程及使用介绍

作者:kl

这篇文章主要介绍了为什使用Reactive反应式编程的原因分析,有需要的朋友可以借鉴参考下,希望能够有所帮助祝大家多多进步,早日升职加薪

前言

前一篇分析了Spring WebFlux的设计及实现原理后,反应式编程又来了,Spring WebFlux其底层还是基于Reactive编程模型的,在java领域中,关于Reactive,有一个框架规范,叫【Reactive Streams】,在java9的ava.util.concurrent.Flow包中已经实现了这个规范。其他的优秀实现还有Reactor和Rxjava。在Spring WebFlux中依赖的就是Reactor。虽然你可能没用过Reactive开发过应用,但是或多会少你接触过异步Servlet,同时又有这么一种论调:异步化非阻塞io并不能增强太多的系统性能,但是也不可否认异步化后并发性能上去了。听到这种结论后在面对是否选择Reactive编程后,是不是非常模棱两可。因为我们不是很了解反应式编程,所以会有这种感觉。没关系,下面看看反应式编程集大者Reactor是怎么阐述反应式编程的。

反应式编程简介

Reactor是Reactive Programming范例的一个实现,可以概括为:
反应式编程是一种涉及数据流和变化传播的异步编程范例。这意味着可以通过所采用的编程语言轻松地表达静态(例如阵列)或动态(例如事件发射器)数据流。
作为反应式编程方向的第一步,Microsoft在.NET生态系统中创建了Reactive Extensions(Rx)库。然后RxJava在JVM上实现了响应式编程。随着时间的推移,通过Reactive Streams工作出现了Java的标准化 ,这一规范定义了JVM上的反应库的一组接口和交互规则。它的接口已经集成到父Flow类下的Java 9中。

反应式编程范例通常以面向对象的语言呈现,作为Observer设计模式的扩展。人们还可以将主要的反应流模式与熟悉的迭代器设计模式进行比较,因为在所有这些库中对Iterable- Iterator对存在双重性 。一个主要的区别是,虽然迭代器是基于拉的,但是反应流是基于推的。

使用迭代器是一种命令式编程模式,即使访问值的方法完全由其负责Iterable。实际上,开发人员可以选择何时访问next()序列中的项目。在反应流中,相当于上述对Publisher-Subscriber。但是, 当它们出现时,Publisher它会通知订阅者新的可用值,而这一推动方面是被动反应的关键。此外,应用于推送值的操作以声明方式而非命令方式表示:程序员表达计算的逻辑而不是描述其精确的控制流。

除了推送值之外,还以明确定义的方式涵盖错误处理和完成方面。A Publisher可以将新值推送到Subscriber(通过调用onNext),但也可以发出错误(通过调用onError)或完成(通过调用onComplete)。错误和完成都会终止序列。这可以概括为:

onNext x 0..N [onError | onComplete]

这种方法非常灵活。该模式支持没有值,一个值或n值的用例(包括无限的值序列,例如时钟的连续滴答)。

但是我们首先考虑一下,为什么我们首先需要这样的异步反应库?

阻塞可能会浪费资源

现代应用程序可以覆盖大量并发用户,即使现代硬件的功能不断提高,现代软件的性能仍然是一个关键问题。
人们可以通过两种方式来提高计划的绩效:

通常,Java开发人员使用阻塞代码编写程序。这种做法很好,直到出现性能瓶颈,此时需要引入额外的线程,运行类似的阻塞代码。但是,资源利用率的这种扩展会很快引入争用和并发问题。

更糟糕的是,阻止浪费资源。如果仔细观察,一旦程序涉及一些延迟(特别是I / O,例如数据库请求或网络调用),资源就会被浪费,因为线程(或许多线程)现在处于空闲状态,等待数据。

所以并行化方法不是灵丹妙药。为了获得硬件的全部功能是必要的,但是理由也很复杂并且易受资源浪费的影响。

使用异步来解决?

第二种方法(前面提到过),寻求更高的效率,可以解决资源浪费问题。通过编写异步,非阻塞代码,您可以使用相同的底层资源将执行切换到另一个活动任务,然后在异步处理完成后返回到当前进程。

但是如何在JVM上生成异步代码?Java提供了两种异步编程模型:

回调:异步方法没有返回值,但需要额外的 callback参数(lambda或匿名类),在结果可用时调用它们。一个众所周知的例子是Swing的EventListener层次结构。

期货:异步方法Future立即返回。异步进程计算一个T值,但该Future对象包含对它的访问。该值不会立即可用,并且可以轮询对象,直到该值可用。例如,ExecutorService运行Callable任务使用Future对象。

这些技术是否足够好?不适用于所有用例,两种方法都有局限性。

回调难以组合在一起,很快导致难以阅读和维护的代码(称为“Callback Hell”)。

考虑一个示例:在用户界面上显示用户的前五个收藏夹,或者如果她没有收藏夹则提出建议。这通过三个服务(一个提供喜欢的ID,第二个提取喜欢的详细信息,第三个提供详细建议):

回调地狱的例子

userService.getFavorites(userId, new Callback() { 
  public void onSuccess(Listlist) { 
    if (list.isEmpty()) { 
      suggestionService.getSuggestions(new Callback() {
        public void onSuccess(Listlist) { 
          UiUtils.submitOnUiThread(() -> { 
            list.stream()
                .limit(5)
                .forEach(uiList::show); 
            });
        }
        public void onError(Throwable error) { 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() 
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, 
            new Callback() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }
              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }
  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});

这是很多代码,它有点难以遵循并且具有重复的部分。考虑它在Reactor中的等价物:

与回调代码等效的Reactor代码示例

userService.getFavorites(userId) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions()) 
           .take(5) 
           .publishOn(UiUtils.uiThreadScheduler()) 
           .subscribe(uiList::show, UiUtils::errorPopup);

如果您想确保在不到800毫秒内检索到喜欢的ID,或者如果需要更长时间从缓存中获取它们,该怎么办?在基于回调的代码中,这是一项复杂的任务。在Reactor中,它变得像timeout在链中添加运算符一样简单:

具有超时和回退的Reactor代码示例

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) 
           .onErrorResume(cacheService.cachedFavoritesFor(userId)) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);

尽管Java 8中带来了改进,但期货比回调要好一些,但它们在构图方面仍然表现不佳CompletableFuture。一起编排多个未来是可行但不容易的。此外,Future还有其他问题:Future通过调用get() 方法很容易结束对象的另一个阻塞情况,它们不支持延迟计算,并且它们不支持多个值和高级错误处理。

考虑另一个例子:我们得到一个ID列表,我们要从中获取一个名称和一个统计信息,然后将它们成对地组合在一起,所有这些都是异步的。

CompletableFuture组合的例子

CompletableFutureids = ifhIds(); 
CompletableFutureresult = ids.thenComposeAsync(l -> { 
	Streamzip =
			l.stream().map(i -> { 
				CompletableFuturenameTask = ifhName(i); 
				CompletableFuturestatTask = ifhStat(i); 
				return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); 
			});
	ListcombinationList = zip.collect(Collectors.toList()); 
	CompletableFuture[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
	CompletableFutureallDone = CompletableFuture.allOf(combinationArray); 
	return allDone.thenApply(v -> combinationList.stream()
			.map(CompletableFuture::join) 
			.collect(Collectors.toList()));
});
Listresults = result.join(); 
assertThat(results).contains(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121");

由于Reactor具有更多开箱即用的组合运算符,因此可以简化此过程:

与未来代码等效的Reactor代码示例

Fluxids = ifhrIds(); 
Fluxcombinations =
		ids.flatMap(id -> { 
			MononameTask = ifhrName(id); 
			MonostatTask = ifhrStat(id); 
			return nameTask.zipWith(statTask, 
					(name, stat) -> "Name " + name + " has stats " + stat);
		});
Monoresult = combinations.collectList(); 
Listresults = result.block(); 
assertThat(results).containsExactly( 
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);

Callback和Future的这些风险是相似的,并且是反应式编程与该Publisher-Subscriber对的关系。

从命令式到反应式编程

诸如Reactor之类的反应库旨在解决JVM上“经典”异步方法的这些缺点,同时还关注一些其他方面:

可组合性和可读性

通过可组合性,我们指的是编排多个异步任务的能力,使用先前任务的结果将输入提供给后续任务或以fork-join方式执行多个任务,以及将异步任务重用为更高级别系统中的分立组件。

编排任务的能力与代码的可读性和可维护性紧密相关。随着异步过程层数量和复杂性的增加,能够编写和读取代码变得越来越困难。正如我们所看到的,回调模型很简单,但其主要缺点之一是,对于复杂的进程,您需要从回调执行回调,本身嵌套在另一个回调中,依此类推。那个混乱被称为Callback Hell。正如你可以猜到的(或者从经验中得知),这样的代码很难回归并推理。

Reactor提供了丰富的组合选项,其中代码反映了抽象过程的组织,并且所有内容通常都保持在同一级别(嵌套最小化)。

类比装配线工作流程

您可以将响应式应用程序处理的数据视为在装配线中移动。反应器既是传送带又是工作站。原材料从原料(原始Publisher)中倒出,最终成为成品,准备推送给消费者(或Subscriber)。

原材料可以经历各种转换和其他中间步骤,或者是将中间件聚集在一起的较大装配线的一部分。如果在某一点出现毛刺或堵塞(也许装箱产品需要不成比例的长时间),受影响的工作站可向上游发出信号以限制原材料的流动。

操作符(运算符)

在Reactor中,运算符是我们的汇编类比中的工作站。每个操作符都将行为添加到a Publisher并将上一步骤包装Publisher到新实例中。因此,整个链被链接,使得数据源自第一Publisher链并且向下移动链,由每个链转换。最终,Subscriber完成了整个过程。请记住,在Subscriber订阅a 之前没有任何事情发生Publisher,下面就会提到。

了解操作员创建新实例可以帮助您避免一个常见错误,该错误会导致您认为您的链中使用的操作员未被应用。看到这个项目的常见问题。
虽然Reactive Streams规范根本没有指定运算符,但Reactor等反应库的最佳附加值之一是它们提供的丰富的运算符。这些涉及很多方面,从简单的转换和过滤到复杂的编排和错误处理。

在你订阅之前什么都不会发生

在Reactor中,当您编写Publisher链时,默认情况下数据不会启动。相反,您可以创建异步过程的抽象描述(这可以帮助重用和组合)。

通过订阅行为,您将Publishera 绑定到a Subscriber,从而触发整个链中的数据流。这是通过上游传播的单个request 信号在内部实现的Subscriber,一直传回源 Publisher。

背压

上游传播信号也用于实现背压,我们在装配线中将其描述为当工作站比上游工作站处理速度慢时向线路发送的反馈信号。

Reactive Streams规范定义的真实机制非常接近于类比:订阅者可以在无限制模式下工作,让源以最快的速度推送所有数据,或者可以使用该request机制向源发送信号表明它已准备就绪处理最多的n元素。

中间操作员也可以在途中更改请求。想象一个buffer 运算符,它将元素分组为10个。如果订阅者请求1个缓冲区,则源可以生成10个元素。一些操作员还实施 预取策略,这避免了request(1)往返,并且如果在请求之前生成元素并不太昂贵,则是有益的。

这将推模型转换为推拉式混合动力,如果它们随时可用,下游可以从上游拉出n个元素。但是如果元素没有准备好,它们就会在生成时被上游推动。

热与冷

在反应库的Rx家族中,人们可以区分两大类反应序列:热和冷。这种区别主要与反应流如何对订阅的用户做出反应有关:

冷序列的含义是不论订阅者在何时订阅该序列,总是能收到序列中产生的全部消息。

而与之对应的热序列,则是在持续不断地产生消息,订阅者只能获取到在其订阅之后产生的消息。

以上就是Reactive反应式编程及使用介绍的详细内容,更多关于Reactive反应式编程使用的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文