Kotlin协程之Flow的使用与原理解析
作者:淘淘养乐多
Flow的定义和特点
Flow是一种数据流,可以用于协程间的通信,具有冷、懒、响应式等特点。Flow是基于协程构建的,可以提供多个值。Flow在概念上类似于一个数据序列,但它可以使用挂起函数来异步地产生和消费值。这意味着,例如,Flow可以安全地发起网络请求来产生下一个值,而不会阻塞主线程。
Flow的特点主要有以下几点:
- 冷:Flow是冷的,也就是说,它不会在没有收集器的情况下开始执行。只有当有收集器订阅了Flow时,它才会开始发射值。这与热的数据流不同,热的数据流会在没有收集器的情况下也产生值。
- 懒:Flow是懒的,也就是说,它只会在需要时才计算值。每个收集器都会触发Flow的重新计算,而不是共享之前计算的结果。这与惰性序列类似,惰性序列只会在迭代时才计算元素。
- 响应式:Flow是响应式的,也就是说,它可以根据收集器的需求来调整发射速度。如果收集器不能及时处理值,Flow可以暂停或取消发射。这与反应式流规范(Reactive Streams Specification)中定义的背压(backpressure)机制类似。
Flow的创建和操作
Flow可以通过多种方式创建,最简单的方式是使用flow{}构建器函数,在其中使用emit函数手动发射值。例如,下面的代码创建了一个发射1到3的整数值的Flow:
// 创建一个Flow<Int> fun simple(): Flow<Int> = flow { // 发射1到3 for (i in 1..3) { emit(i) // 发射下一个值 } }
除了flow{}构建器函数外,还有一些其他方式可以创建Flow,例如:
- 使用flowOf()函数创建一个包含固定元素的Flow。
- 使用asFlow()扩展函数将各种集合和序列转换为Flow。
- 使用channelFlow()构建器函数创建一个基于通道(Channel)的Flow。
- 使用callbackFlow()构建器函数创建一个基于回调(Callback)的Flow。
创建好Flow后,可以使用各种操作符对数据进行处理。操作符分为两类:
- 中间操作符:中间操作符用于对数据进行转换、过滤、组合等操作,但不会终止流。中间操作符返回一个新的Flow,可以链式调用多个中间操作符。例如,filter、map、take等操作符都是中间操作符。
- 终止操作符:终止操作符用于对数据进行收集、聚合、统计等操作,并终止流。终止操作符返回一个非Flow类型的结果,并触发流的执行。例如,collect、first、toList等操作符都是终止操作符。
例如,下面的代码使用了map和filter两个中间操作符对simple()函数返回的Flow进行了转换和过滤,并使用了collect终止操作符对结果进行了打印:
// 对simple()返回的Flow进行处理 fun main() = runBlocking<Unit> { // 启动并发协程以验证主线程并未阻塞 launch { for (k in 1..3) { println("I'm not blocked $k") delay(100) } } // 收集这个流 simple() .map { it * it } // 数字求平方 .filter { it % 2 == 0 } // 过滤偶数 .collect { value -> // 终止操作符 println(value) // 打印结果 } }
输出结果为:
I'm not blocked 1
4
I'm not blocked 2
I'm not blocked 3
可以看到,Flow的执行是在协程中进行的,不会阻塞主线程。同时,Flow的操作符也是挂起函数,可以在其中进行异步操作,例如:
// 对simple()返回的Flow进行处理 fun main() = runBlocking<Unit> { // 收集这个流 simple() .map { request(it) } // 模拟异步请求 .collect { value -> // 终止操作符 println(value) // 打印结果 } } // 模拟异步请求,返回字符串 suspend fun request(i: Int): String { delay(1000) // 延迟1秒 return "response $i" }
输出结果为:
response 1
response 2
response 3
可以看到,每个请求都延迟了1秒,但是不会阻塞主线程或其他请求。
Flow的生命周期和异常处理
Flow提供了一些回调函数来监听流的生命周期,例如:
- onStart:在流开始收集之前调用,可以用于执行一些初始化操作,例如打开文件或数据库连接等。
- onEach:在每个元素被发射之后调用,可以用于执行一些通用操作,例如日志记录或更新UI等。
- onCompletion:在流完成收集之后调用,无论是正常完成还是异常终止。可以用于执行一些清理操作,例如关闭文件或数据库连接等。onCompletion可以接收一个可空的Throwable参数,表示流终止的原因,如果为null,则表示正常完成。
例如,下面的代码使用了onStart和onCompletion两个回调函数来打印流的开始和结束时间:
// 对simple()返回的Flow进行处理 fun main() = runBlocking<Unit> { // 收集这个流 simple() .onStart { println("Flow started at ${System.currentTimeMillis()}") } // 开始回调 .onCompletion { println("Flow completed at ${System.currentTimeMillis()}") } // 结束回调 .collect { value -> // 终止操作符 println(value) // 打印结果 } }
输出结果为:
Flow started at 1632828678656
1
2
3
Flow completed at 1632828678657
可以看到,流的开始和结束时间都被打印出来了。
Flow也提供了一些方式来处理异常,例如:
- catch:在流发生异常时调用,可以用于捕获和处理异常,并决定是否继续或终止流。catch操作符必须放在可能发生异常的操作符之后,否则无法捕获异常。
- try-catch:在收集流时使用try-catch块包裹collect操作符,可以用于捕获和处理异常,并决定是否继续或终止程序。try-catch块可以捕获任何位置发生的异常。
例如,下面的代码使用了catch和try-catch两种方式来处理异常:
// 创建一个可能发生异常的Flow<Int> fun foo(): Flow<Int> = flow { for (i in 1..3) { println("Emitting $i") emit(i) // 发射下一个值 } throw RuntimeException() // 抛出异常 } // 对foo()返回的Flow进行处理 fun main() = runBlocking<Unit> { // 使用catch操作符捕获异常,并打印错误信息,然后继续发射-1作为错误标识 foo() .catch { e -> println("Caught $e") } // 捕获异常 .emit(-1) // 发射错误标识 .collect { value -> println(value) } println("Done") // 使用try-catch块捕获异常,并打印错误信息,然后终止程序 try { foo().collect { value -> println(value) } } catch (e: Throwable) { println("Caught $e") } println("Done") }
输出结果为:
Emitting 1
1
Emitting 2
2
Emitting 3
3
Caught java.lang.RuntimeException
-1
Done
Emitting 1
1
Emitting 2
2
Emitting 3
3
Caught java.lang.RuntimeException
Done
可以看到,catch操作符可以在流中处理异常,并继续发射值,而try-catch块可以在程序中处理异常,并终止程序。
Flow的线程切换
Flow提供了一些操作符来切换上游和下游的上下文,例如:
- flowOn:flowOn操作符用于切换上游的上下文,也就是说,它会影响flow{}构建器函数和之前的中间操作符的执行上下文。flowOn操作符可以用于将耗时的计算操作放在后台线程中执行,而不影响主线程。
- launchIn:launchIn操作符用于切换下游的上下文,也就是说,它会影响collect操作符和之后的中间操作符的执行上下文。launchIn操作符可以用于将收集操作放在协程中执行,而不阻塞当前线程。
例如,下面的代码使用了flowOn和launchIn两个操作符来切换上下文:
// 创建一个Flow<Int> fun simple(): Flow<Int> = flow { // 发射1到3,并打印当前线程名 for (i in 1..3) { Thread.sleep(100) // 假装我们以消耗 CPU 的方式进行计算 log("Emitting $i") emit(i) // 发射下一个值 } }.flowOn(Dispatchers.Default) // 在流构建器中改变消耗 CPU 代码上下文 // 对simple()返回的Flow进行处理 fun main() = runBlocking<Unit> { // 收集这个流,并打印当前线程名 simple() .collect { value -> log("Collected $value") } println("Done") // 使用launchIn操作符将收集操作放在协程中执行,并打印当前线程名 simple() .onEach { value -> log("Collected $value") } .launchIn(this) // 在单独的协程中收集并打印结果 println("Done") }
输出结果为:
[DefaultDispatcher-worker-1] Emitting 1
[main] Collected 1
[DefaultDispatcher-worker-1] Emitting 2
[main] Collected 2
[DefaultDispatcher-worker-1] Emitting 3
[main] Collected 3
Done
Done
[DefaultDispatcher-worker-2] Emitting 1
[main] Collected 1
[DefaultDispatcher-worker-2] Emitting 2
[main] Collected 2
[DefaultDispatcher-worker-2] Emitting 3
[main] Collected 3
可以看到,flowOn操作符将发射操作放在了DefaultDispatcher线程中执行,而collect操作仍然在主线程中执行。launchIn操作符将收集操作放在了一个单独的协程中执行,而不阻塞主线程。
以上就是Kotlin协程之Flow的使用与原理解析的详细内容,更多关于Kotlin Flow使用与原理的资料请关注脚本之家其它相关文章!