Kotlin协程launch启动流程原理详解
作者:无糖可乐爱好者
1.launch启动流程
已知协程的启动方式之一是Globalscope.launch
,那么Globalscope.launch
的流程是怎样的呢,直接进入launch
的源码开始看起。
fun main() { coroutineTest() Thread.sleep(2000L) } val block = suspend { println("Hello") delay(1000L) println("Kotlin") } private fun coroutineTest() { CoroutineScope(Job()).launch { withContext(Dispatchers.IO) { block.invoke() } } }
反编译后的Java代码
public final class CoroutineDemoKt { @NotNull private static final Function1 block; public static final void main() { coroutineTest(); Thread.sleep(2000L); } // $FF: synthetic method public static void main(String[] var0) { main(); } @NotNull public static final Function1 getBlock() { return block; } private static final void coroutineTest() { BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope((CoroutineContext)JobKt.Job$default((Job)null, 1, (Object)null)), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) { int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(this.label) { case 0: ResultKt.throwOnFailure($result); CoroutineContext var10000 = (CoroutineContext)Dispatchers.getIO(); Function2 var10001 = (Function2)(new Function2((Continuation)null) { int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(this.label) { case 0: ResultKt.throwOnFailure($result); Function1 var10000 = CoroutineDemoKt.getBlock(); this.label = 1; if (var10000.invoke(this) == var2) { return var2; } break; case 1: ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } return Unit.INSTANCE; } @NotNull public final Continuation create(@Nullable Object value, @NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion"); Function2 var3 = new <anonymous constructor>(completion); return var3; } public final Object invoke(Object var1, Object var2) { return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); } }); this.label = 1; if (BuildersKt.withContext(var10000, var10001, this) == var2) { return var2; } break; case 1: ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } return Unit.INSTANCE; } @NotNull public final Continuation create(@Nullable Object value, @NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion"); Function2 var3 = new <anonymous constructor>(completion); return var3; } public final Object invoke(Object var1, Object var2) { return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE); } }), 3, (Object)null); } static { Function1 var0 = (Function1)(new Function1((Continuation)null) { int label; @Nullable public final Object invokeSuspend(@NotNull Object $result) { Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); String var2; switch(this.label) { case 0: ResultKt.throwOnFailure($result); var2 = "Hello"; System.out.println(var2); this.label = 1; if (DelayKt.delay(1000L, this) == var3) { return var3; } break; case 1: ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } var2 = "Kotlin"; System.out.println(var2); return Unit.INSTANCE; } @NotNull public final Continuation create(@NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion"); Function1 var2 = new <anonymous constructor>(completion); return var2; } public final Object invoke(Object var1) { return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE); } }); block = var0; } }
先分析一下上面代码的流程:
- 首先声明了一个Function1类型的block变量,这个变量就是demo中的block,然后会在static函数中会被赋值。
- 接下来就是coroutineTest函数的调用。这个函数中的第一行代码就是CoroutineScope的传参和一些默认值
- 然后通过89行的invoke进入到了外层状态机流转的过程
- 95行的static表示的是内部的挂起函数就是demo中的block.invoke,它是以匿名内部类的方式实现,然后执行内部的状态机流转过程,最后给block赋值。
- block被赋值后最终在
Function1 var10000 = CoroutineDemoKt.getBlock();
被调用
那么这个过程又是如何实现的,进入launch
源码进行查看:
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }
这里的block
指的就是demo中的block代码段
再来看一下里面的几行代码的含义:
- newCoroutineContext: 通过默认的或者传入的context创建一个新的Context;
- coroutine: launch 会根据传入的启动模式来创建对应的协程对象。这里有两种,一种是标准的,一种是懒加载的。
- coroutine.start: 尝试启动协程
2.协程是如何被启动的
通过launch
的源码可知协程的启动是通过coroutine.start
启动的,那么协程的启动流程又是怎样的?
public abstract class AbstractCoroutine<in T>( parentContext: CoroutineContext, initParentJob: Boolean, active: Boolean ) : JobSupport(active), Job, Continuation<T>, CoroutineScope { ... /** * 用给定的代码块启动这个协程并启动策略。这个函数在这个协程上最多调用一次。 */ public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { start(block, receiver, this) } }
start
函数中传入了三个参数,只需要关注第一个参数即可。
public enum class CoroutineStart { ... /** * 用这个协程的启动策略启动相应的块作为协程。 */ public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(completion) ATOMIC -> block.startCoroutine(completion) UNDISPATCHED -> block.startCoroutineUndispatched(completion) LAZY -> Unit // will start lazily } }
启动策略的具体实现有三种方式,这里只需要分析startCoroutine
,另外两个其实就是它的基础上增加了一些功能,其中前者代表启动协程以后可以在等待调度时取消,后者表示协程启动后不会被分发。
/** * 创建没有接收方且结果类型为T的协程,这个函数每次调用时都会创建一个新的可挂起的实例。 */ public fun <T> (suspend () -> T).startCoroutine( completion: Continuation<T> ) { createCoroutineUnintercepted(completion).intercepted().resume(Unit) }
createCoroutineUnintercepted
在源代码中只是一个声明,它的具体实现是在IntrinsicsJvm.kt文件中。
//IntrinsicsJvm.kt#createCoroutineUnintercepted /** * 创建没有接收方且结果类型为T的非拦截协程。这个函数每次调用时都会创建一个新的可挂起的实例。 */ public actual fun <T> (suspend () -> T).createCoroutineUnintercepted( completion: Continuation<T> ): Continuation<Unit> { val probeCompletion = probeCoroutineCreated(completion) return if (this is BaseContinuationImpl) create(probeCompletion) else createCoroutineFromSuspendFunction(probeCompletion) { (this as Function1<Continuation<T>, Any?>).invoke(it) } }
actual
代表了 createCoroutineUnintercepted()
在 JVM 平台的实现。
createCoroutineUnintercepted
是一个扩展函数,接收者类型是一个无参数,返回值为 T 的挂起函数或者 Lambda。
第9行代码中的this
代表的是(suspend () -> T)
也就是invoke
函数中的block
变量,这个block
变量就是demo中的block
代码段。
第9行的BaseContinuationImpl
是一个抽象类它实现了Continuation
。
关于if (this is BaseContinuationImpl)
的结果暂且不分析,先分析两种情况下的create函数:
- create(probeCompletion):
//ContinuationImpl.kt#create public open fun create(completion: Continuation<*>): Continuation<Unit> { throw UnsupportedOperationException("create(Continuation) has not been overridden") } public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> { throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden") }
这个create
函数抛出一个异常,意思就是这个create()
没有被重写,而这个create()
的重写就是在反编译后的Java代码中的create
函数
@NotNull public final Continuation create(@Nullable Object value, @NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion"); Function2 var3 = new <anonymous constructor>(completion); return var3; }
- createCoroutineFromSuspendFunction(probeCompletion):
//IntrinsicsJvm.kt#createCoroutineFromSuspendFunction /** * 当一个被suspend修饰的lambda表达式没有继承BaseContinuationImpl类时,则通过此方法创建协程。 * * 它发生在两种情况下: * 1.lambda表达式中调用了其他的挂起方法 * 2.挂起方法是通过Java实现的 * * 必须将它封装到一个扩展[BaseContinuationImpl]的实例中,因为这是所有协程机制的期望。 */ private inline fun <T> createCoroutineFromSuspendFunction( completion: Continuation<T>, crossinline block: (Continuation<T>) -> Any? ): Continuation<Unit> { val context = completion.context // context为空创建一个受限协程 return if (context === EmptyCoroutineContext) //受限协程:只能调用协程作用域中提供的挂起方式挂起,其他挂起方法不能调用 object : RestrictedContinuationImpl(completion as Continuation<Any?>) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() // 如果试图以异常开始,则重新抛出异常(将被BaseContinuationImpl.resumeWith捕获) block(this) // 运行块,可以返回或挂起 } 1 -> { label = 2 result.getOrThrow() // 这是block挂起的结果 } else -> error("This coroutine had already completed") } } else //创建一个正常的协程 object : ContinuationImpl(completion as Continuation<Any?>, context) { private var label = 0 override fun invokeSuspend(result: Result<Any?>): Any? = when (label) { 0 -> { label = 1 result.getOrThrow() // 如果试图以异常开始,则重新抛出异常(将被BaseContinuationImpl.resumeWith捕获) block(this) // 运行块,可以返回或挂起 } 1 -> { label = 2 result.getOrThrow() // 这是block挂起的结果 } else -> error("This coroutine had already completed") } } }
createCoroutineFromSuspendFunction就是当一个被suspend修饰的Lambda表达式没有继承BaseContinuationImpl是才会被调用,然后根据上下文是否为空创建不同类型的协程。
两种情况都已经分析完了,那么现在if (this is BaseContinuationImpl)
会执行哪一个呢,首先这里的this
所指的就是demo中的block代码段,Kotlin编译器编译后会自动生成一个类就是上面的static
,它会继承SuspendLambda类,而这个SuspendLambda类继承自ContinuationImpl,ContinuationImpl继承自BaseContinuationImpl,因此可以得到判断结果为true,
createCoroutineUnintercepted
的过程就是协程创建的过程。
然后就是intercepted函数,这个函数的具体实现也在IntrinsicsJvm.kt中,那么intercepted
又做了什么呢
public expect fun <T> Continuation<T>.intercepted(): Continuation<T> //具体实现 //IntrinsicsJvm.kt#intercepted public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this
首先有个强转,通过上面的分析这个强转是一定会成功的,到这里intercepted
就进入到了ContinuationImpl
中了
internal abstract class ContinuationImpl( completion: Continuation<Any?>?, private val _context: CoroutineContext? ) : BaseContinuationImpl(completion) { ... @Transient private var intercepted: Continuation<Any?>? = null //如果没有缓存,则从上下文获取拦截器,调用interceptContinuation进行拦截 //将获取到的内容保存到全局变量 public fun intercepted(): Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it } }
这里的ContinuationInterceptor
指的就是Demo中传输的Dispatcher.IO
,默认值时Dispatcher.Default
。
再回到startContinue
中还剩最后一个resume
:
/** * 恢复执行相应的协程传递值作为最后一个挂起点的返回值。 */ public inline fun <T> Continuation<T>.resume(value: T): Unit = resumeWith(Result.success(value)) public interface Continuation<in T> { /** * 与此延续相对应的协程的上下文。 */ public val context: CoroutineContext /** * 恢复执行相应的协程传递值作为最后一个挂起点的返回值。 */ public fun resumeWith(result: Result<T>) }
这里的resume(Unit)
作用就相当与启动了一个协程。
上面的启动流程中为了方便分析的是CoroutineStart.ATOMIC
,而默认的是CoroutineStart.DEFAULT
,下面分析一下DEFAULT
的流程
//Cancellable.kt#startCoroutineCancellable public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit)) }
startCoroutineCancellable
对于协程的创建和拦截与ATOMIC
是一样的,区别就在于resumeCancellableWith
//DispatchedContinuation#resumeCancellableWith public fun <T> Continuation<T>.resumeCancellableWith( result: Result<T>, onCancellation: ((cause: Throwable) -> Unit)? = null ): Unit = when (this) { is DispatchedContinuation -> resumeCancellableWith(result, onCancellation) else -> resumeWith(result) } // 我们内联它来保存堆栈上的一个条目,在它显示的情况下(无限制调度程序) // 它只在Continuation<T>.resumeCancellableWith中使用 @Suppress("NOTHING_TO_INLINE") inline fun resumeCancellableWith( result: Result<T>, noinline onCancellation: ((cause: Throwable) -> Unit)? ) { val state = result.toState(onCancellation) //是否需要分发 if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE //将可运行块的执行分派给给定上下文中的另一个线程 dispatcher.dispatch(context, this) } else { executeUnconfined(state, MODE_CANCELLABLE) { //协程未被取消 if (!resumeCancelled(state)) { // 恢复执行 resumeUndispatchedWith(result) } } } } //恢复执行前判断协程是否已经取消执行 inline fun resumeCancelled(state: Any?): Boolean { //获取当前协程任务 val job = context[Job] //如果不为空且不活跃 if (job != null && !job.isActive) { val cause = job.getCancellationException() cancelCompletedResult(state, cause) //抛出异常 resumeWithException(cause) return true } return false } //我们需要内联它来在堆栈中保存一个条目 inline fun resumeUndispatchedWith(result: Result<T>) { withContinuationContext(continuation, countOrElement) { continuation.resumeWith(result) } }
以上就是Kotlin协程launch启动流程原理详解的详细内容,更多关于Kotlin协程launch启动流程的资料请关注脚本之家其它相关文章!