Kotlin协程的线程调度示例详解
作者:rencai
引言
在第一篇文章中我们分析了协程启动创建过程启动过程,在本文中,我们将着重剖析协程中协程调度的逻辑流程。主要是分析解答如下2个问题:
- 涉及到协程方法器是如何将协程代码调度到特定的线程执行?
- 子协程执行完又是如何切换0回父协程的线程环境?
一、协程的分发器作用
1.1 测试代码
GlobalScope.launch { //协程体1 Log.d(TAG, "before suspend job.") withContext(Dispatchers.Main) { //协程体2 Log.d(TAG, "print in Main thread.") } Log.d(TAG, "after suspend job.") }
- 此次的协程测试用例中,我们默认的
launch
一个协程,我们简单的将launch
需要执行的这外层逻辑为协程体1。 - 在协程体1中,我们使用
withContext
将协程切换到主线程执行,打印日志。我们将这里面执行的协程逻辑为协程体2。 - 协程体2执行完成后,切回协程体1中执行并打印Log。
- 注意,根据我们之前《协程的创建与启动》文章中分析的,Kotlin编译器针对协程体1和协程体2分别生成一个继承与
SuspenLamabda
的类型,比如:class MainActivity#onCreate$1 : SuspenLambda{...}
。我们在讲协程体时,也同时代指这个类实例。
继续跟踪launch()
函数执行逻辑,这次跟踪过程不同与《协程的创建与启动》篇章,我们会将侧重点放在启动过程中协程调度器是如何起作用的?接下来见1.2
1.2 CoroutineScope.launch
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { //1. 见1.2.1 val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) //2. 详见1.3 coroutine.start(start, coroutine, block) return coroutine }
- 这里会新建一个
CoroutineContext
,详见1.2.1 - 根据之前的分析,这个里最终会调用到
startCoroutineCancellable()
方法,详见1.3流程。
1.2.1 newCoroutineContext
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { val combined = foldCopies(coroutineContext, context, true) val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) debug + Dispatchers.Default else debug }
coroutineContext
:coroutineContext
是CoroutineScope
的成员变量,当此时为GlobalScope.coroutineContext==EmptyCoroutineContext
context
:由于调用launch
时没有指定Context
,所以传到此处也是EmptyCoroutineContext
。foldCopies()
函数将2个context相加并拷贝,最终combied==EmptyCoroutineContext
。
而在return这最后判断返回的是debug+Dispatchers.Defatult
,所以此时默认的分发器为Dispatchers.Defatult
。
这里涉及到的协程Context运算不做深入剖析,简单可以认为协程重写了“+”运算,使得Context之间可以使用“+”来叠加,没有的Element类型会被添加到Element集合,集合中已有的Element类型会被覆盖。
1.3 startCoroutineCancellable
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable( receiver: R, completion: Continuation<T>, onCancellation: ((cause: Throwable) -> Unit)? = null ) = runSafely(completion) { //1. 创建SuspendLambda协程体 createCoroutineUnintercepted(receiver, completion) //2. 拦截:取出分发器,并构建方法器Continuation。详见1.3.1 .intercepted() //3. 调用方法器Continuation的resume方法,详见1.4 .resumeCancellableWith(Result.success(Unit), onCancellation) }
- 这里的构建协程体在《协程的创建与启动》一节中已经剖析,不再赘述。
- 进行拦截,注意:这里其实会根据方法器再构建出一个
DispatchedContinuation
对象,它也是一个续体类型,这是对协程体的一次包装。详见1.3.1小节。 - 调用拦截器续体的
resumeCancellableWith()
开始状态机流转,执行分发流程详见1.4小节。
1.3.1 intercepted()
public fun intercepted(): Continuation<Any?> = intercepted?: ( //1. 取出拦截器 context[ContinuationInterceptor]? //2.构建拦截器续体 .interceptContinuation(this)?: this) .also { intercepted = it }
- 取出当前上下文中的拦截器类型,根据之前1.2.1小节的分析,这里取出来的是
Dispatchers.Defatult
。 interceptContinuation(this)
为构建拦截器续体,注意这里传入的this
是协程体1。 详见1.3.2。
1.3.2 CoroutineDispatcher
//Base class to be extended by all coroutine dispatcher implementations. public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { public final override fun <T> interceptContinuation(continuation: Continuation<T>): //详见1.4 Continuation<T> = DispatchedContinuation(this, continuation) }
直接新建了一个DispatchedContinuation
对象实例这里需要注意传入的构建参数:
- this:当前
Dispatcher
,也就是Dispatchers.Defatult
。 - continuation:协程体1。
1.3.3 小结
自此Continuation.intercepted()
方法就分析结束,最终的结果是:用上下文中的Dispatcher
和当前Contination
对象也就是协程体1,共同作为构建参数,新建了一个DispatchedContinuation
对象。
接下来接着1.3中的第三点,调用DispatchedContinuation.resumeCancellableWith()
方法开始分析。
1.4 DispatchedContinuation
internal class DispatchedContinuation<in T>( //1. 分发器 @JvmField val dispatcher: CoroutineDispatcher, //2. 注意这里将Continuation的实现委托给了continuation成员变量。 @JvmField val continuation: Continuation<T> ) : DispatchedTask<T>(MODE_UNINITIALIZED) , CoroutineStackFrame, Continuation<T> by continuation { //3. 复写属性delegate为自己 override val delegate: Continuation<T> get() = this ... // We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher) // It is used only in Continuation<T>.resumeCancellableWith @Suppress("NOTHING_TO_INLINE") inline fun resumeCancellableWith( result: Result<T>, noinline onCancellation: ((cause: Throwable) -> Unit)? ) { val state = result.toState(onCancellation) //默认为true if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE //4. 详细见 dispatcher.dispatch(context, this) } else { executeUnconfined(state, MODE_CANCELLABLE) { if (!resumeCancelled(state)) { resumeUndispatchedWith(result) } } } } }
这里的dispatcher==Dispatchers.Defatult
,所以接下来需要解析Dispatchers.Defatult
到底是什么东西。详见1.5
- 成员变量
dispatcher==Dispatchers.Default
。 - 成员变量
continucation==协程体1(SuspenLambda类型实例)
。同时DispatchedContinuation
继承于Continuation
接口,它将Continuation
接口的实现委托给了成员变量continuation
。 deleagte
为复写了DispatchedTask.delegate
属性,将其返回自己。- 调用分发器也就是
Dispatchers.Defatult
的dispatch()
方法,注意这里传入的参数:
context
:来自Continuation
接口的属性,由于委托给了成员变量continuation
,所以此context
==continuation.context
。
this
:分发器本身Dispatchers.Defatult
自此这个方法的分析结束:调用分发器的进行分发,接下来分析就开始分析协程方法器CoroutineDispatcher
1.5 DefaultScheduler
//Dispathcer.kt @JvmStatic public actual val Default: CoroutineDispatcher = DefaultScheduler //Dispathcer.kt // Instance of Dispatchers.Default internal object DefaultScheduler : SchedulerCoroutineDispatcher( CORE_POOL_SIZE, MAX_POOL_SIZE, IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME ) { ... }
实际上是继承 SchedulerCoroutineDispatcher
类型。详见1.5.1
1.5.1 SchedulerCoroutineDispatcher
internal open class SchedulerCoroutineDispatcher( private val corePoolSize: Int = CORE_POOL_SIZE, private val maxPoolSize: Int = MAX_POOL_SIZE, private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, private val schedulerName: String = "CoroutineScheduler", ) : ExecutorCoroutineDispatcher() { override val executor: Executor get() = coroutineScheduler // This is variable for test purposes, so that we can reinitialize from clean state private var coroutineScheduler = createScheduler() private fun createScheduler() = //1. 详见1.5.2 CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) //2. 详见1.5.2 override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block) ... } //Executors.kt //2. 实际上是继承ExecutorCoroutineDispatcher public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable { ... }
- 可以看到实际上调用了
CoroutineScheduler.dispatch
方法。此时发现,第二个参数是Runnable
类型的,而在1.4小节中,我们知道传入的是this
也就是DispatchedContinuation
,所以DispatchedContinuation
继承的父类中,必定有继承了Runnable
接口,而他的run方法的实现也在父类中,这块我们暂时按下不表,接着看继续跟踪coroutineScheduler.dispatch(block)
。
1.5.2 CoroutineScheduler
internal class CoroutineScheduler( @JvmField val corePoolSize: Int, @JvmField val maxPoolSize: Int, @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME ) : Executor, Closeable { ... override fun execute(command: Runnable) = dispatch(command) fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) { trackTask() // this is needed for virtual time support val task = createTask(block, taskContext) // try to submit the task to the local queue and act depending on the result val currentWorker = currentWorker() val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch) if (notAdded != null) { if (!addToGlobalQueue(notAdded)) { // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted throw RejectedExecutionException("$schedulerName was terminated") } } val skipUnpark = tailDispatch && currentWorker != null // Checking 'task' instead of 'notAdded' is completely okay if (task.mode == TASK_NON_BLOCKING) { if (skipUnpark) return signalCpuWork() } else { // Increment blocking tasks anyway signalBlockingWork(skipUnpark = skipUnpark) } } }
- 该类继承了
Executor
类,而且它的构建参数可看到是线程池的参数,所以可以知道这个其实是Kotlin协程实现的一个线程池,具体就不跟进去了。 execute()
过程也是dispatch
过程:将任务投递到任务队列,然后通知线程去取任务执行,自此完成了线程切换动作。- 而在新线程里执行的
Runnable
为1.4中的调用代码:dispatcher.dispatch(context, this)
中的this
,也就是DispatchedContinuation
。DispatchedContinuation.kt
并没有实现run
方法,那么一定是他继承的父类实现了Runnable
接口并实现,所以需要接着看它继承的父类:DispatchedTask
类。
1.6 DispatchedTask.run()
internal abstract class DispatchedTask<in T>( @JvmField public var resumeMode: Int ) : SchedulerTask() { ... internal abstract val delegate: Continuation<T> @Suppress("UNCHECKED_CAST") internal open fun <T> getSuccessfulResult(state: Any?): T = state as T internal open fun getExceptionalResult(state: Any?): Throwable? = (state as? CompletedExceptionally)?.cause public final override fun run() { assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching val taskContext = this.taskContext var fatalException: Throwable? = null try { val delegate = delegate as DispatchedContinuation<T> //1. 取出代理商的续体 val continuation = delegate.continuation withContinuationContext(continuation, delegate.countOrElement) { val context = continuation.context val state = takeState() // NOTE: Must take state in any case, even if cancelled val exception = getExceptionalResult(state) val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null if (job != null && !job.isActive) { val cause = job.getCancellationException() cancelCompletedResult(state, cause) continuation.resumeWithStackTrace(cause) } else { if (exception != null) { continuation.resumeWithException(exception) } else { //1. 被包装的续体的resume方法,真正的开始出发其协程状态机代码。 continuation.resume(getSuccessfulResult(state)) } } } } catch (e: Throwable) { // This instead of runCatching to have nicer stacktrace and debug experience fatalException = e } finally { val result = runCatching { taskContext.afterTask() } handleFatalException(fatalException, result.exceptionOrNull()) } } }
- 将
delegate
转为DispatchedContinuation
,应该注意1.4 小节中DispatchedContinuation
继承DispatchTask
时,便对此delegate
进行了复写:
override val delegate: Continuation
get() = this
而此delegate.continucation
便是当初newDispatchedContinuation(this)
时传入的this,此this就是Kotlin编译器一开始为协程体生成的SuspendLambda
类型对象。具体可以回看1.3小节。
- 调用了
continuation.resume()
方法触发了协程的状态机进而开始执行协程业务逻辑代码,结合之前1.5.2的分析可以知道,这个方法的调用已经是被dispatch
到特定线程,完成线程切换后执行的。所以协程状态机的代码也是跑在新线程上的。
1.7 总结
至此,协程的线程调度分析结束,关键有如下几个要点:
- 创建
SuspendLambda
时,他的协程上下文对象来自于comletion.context
,默认就是Dispatcher.Default
。 SuspendLambda
启动时调用了intercept()
进行一层包装,得到DispatchedContinuation
,后续协程启动是启动的DispatchedContinuation
协程。DispatchedContinuation
继承于Runnable
接口,协程启动时将自己投递到分发器dispatcher
执行run
方法,从而达到了线程切换效果。- 在
DispatchedContinuation
的run
方法中,调用SuspendLambda.resume()
启动状态机。在新线程执行协程状态机代码。
这一小节中,介绍了如何将协程调度到目的线程执行,接下来分析如何做到随意切换线程后,然后再恢复到原来线程的。
二、协程中的线程切换
在第一小节中,我们搞清楚了协程启动时,协程调度器是如何在其中起作用的。这一小节旨在剖析在协程用分发器切换线程执行新的挂起函数后,是如何切换会原来线程继续执行剩下的逻辑的。
为此,我们需要将1.1的测试代码反编译出来实际代码进而分析。
2.1 反编译代码
2.1.1 MainActivityonCreateonCreateonCreate1
final class MainActivity$onCreate$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> { ... @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl public final Object invokeSuspend(Object $result) { Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch (this.label) { case 0: ResultKt.throwOnFailure($result); Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4147xf96cab04()); this.label = 1; //1. 新建编译器自动生成的继承于SuspendLambda的类型。 AnonymousClass1 anonymousClass1 = new AnonymousClass1(null); //2. 调用withContext Object res = BuildersKt.withContext(Dispatchers.getIO(), anonymousClass1, this); if (res != coroutine_suspended) { break; } else { //挂起 return coroutine_suspended; } case 1: ResultKt.throwOnFailure($result); break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4148xe0c1b328()); return Unit.INSTANCE; } }
根据之前的文章分析,这里suspend lambda
的类型都自动生成继承于SuspendLambda
的类型。详见2.1.2。
将anonymousClass1
传入withContext
,而且注意这里传入了this==MainActivity$onCreate$1
,详见2.2。
2.1.2 AnonymousClass1
/* compiled from: MainActivity.kt */ public static final class AnonymousClass1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Integer>, Object> { int label ... @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl public final Object invokeSuspend(Object obj) { IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch (this.label) { case 0: ResultKt.throwOnFailure(obj); return Boxing.boxInt(Log.d(MainActivity.TAG, LiveLiterals$MainActivityKt.INSTANCE.m4146x7c0f011f())); default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } } }
2.2 withContext
public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T ): T { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } //1. 获取当前协程, 注意这里的uCont就是当前续体,也就是MainActivity$onCreate$1 return suspendCoroutineUninterceptedOrReturn sc@ { uCont -> //2. 计算获的新的协程上下文 val oldContext = uCont.context val newContext = oldContext + context //3. 快速判断:新上下文和旧上下文一致的情况快速处理。 // always check for cancellation of new context newContext.ensureActive() // FAST PATH #1 -- new context is the same as the old one if (newContext === oldContext) { val coroutine = ScopeCoroutine(newContext, uCont) return@sc coroutine.startUndispatchedOrReturn(coroutine, block) } // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed) // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher) if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) { val coroutine = UndispatchedCoroutine(newContext, uCont) // There are changes in the context, so this thread needs to be updated withCoroutineContext(newContext, null) { return@sc coroutine.startUndispatchedOrReturn(coroutine, block) } } // SLOW PATH -- use new dispatcher //4. 新建一个DispatchedCoroutine val coroutine = DispatchedCoroutine(newContext, uCont) //5. 启动协程 block.startCoroutineCancellable(coroutine, coroutine) coroutine.getResult() } }
suspendCoroutineUninterceptedOrReturn
这个函数直接步进是看不到实现的,它的实现是由Kotlin编译器生成的,它的作用是用来获取当前续体的,并且通过uCont
返回,这里就是MainActivity$onCreate$1
。- 将旧协程上下文和新的上下文一起。计算得到最终的上下文。这里的
context==Dispatchers.getIO()
。 - 快速判断,不用看。
- 新建一个
DispatchedCoroutine
,注意这里传入了新的协程上下文和当前续体对象。 - 调用
startCoroutineCancellable()
启动协程。这里的同1.3.2小节分析一样,详见 2.2.1
2.2.1 startCoroutineCancellable
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable( receiver: R, completion: Continuation<T>, onCancellation: ((cause: Throwable) -> Unit)? = null ) = runSafely(completion) { //1. 创建SuspendLambda协程体 createCoroutineUnintercepted(receiver, completion) //2. 拦截:取出分发器,并构建方法器Continuation。详见1.3.1 .intercepted() //3. 调用方法器Continuation的resume方法,详见1.4 .resumeCancellableWith(Result.success(Unit), onCancellation) }
此方法在之前1.3小节已经分析过,针对此此次调用,其中的改变是协程上下文中的分发器已经被设置为Dispatchers.Main
。
- 创建了
SuspendLambda
对象,此对象的CoroutineContext
为completion.context
。而其中的ContinuationInterceptor
类型Element
就是我们之前传入的Dispatchers.Main
。 - 创建一个
DispatchedContinuation
。 - 将协程
SuspendLambda
的状态机逻辑通过Dispatcher.Main
调度到主线程执行,调度过程参考第一下节。分发逻辑详见2.7小节。 - 当
SuspendLambda
的状态机invokeSuspend()
逻辑执行完成后,会返回到BaseContinuationImpl.resumeWith()
,我们需要接此方法分析,来得到协程在切换到主线程执行后,又是怎么切回协程体1的执行线程的,详见2.3。
2.3 resumeWith
public final override fun resumeWith(result: Result<Any?>) { // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var current = this var param = result while (true) { // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure // can precisely track what part of suspended callstack was already resumed probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast when trying to resume continuation without completion val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // unrolling recursion via loop current = completion param = outcome } else { //1. 进入此判断 // top-level completion reached -- invoke and return completion.resumeWith(outcome) return } } } }
当状态机执行完后, 后进入到completion的类型判断,由2.2和2.2.1可以知道,当初传入的completion是DispatchedCoroutine
类型,所以加入到else分支,调用了DispatchedCoroutine.resumeWith()
,接下来分析此方法。
在此之前,我们需要看下DispatchedCoroutine
的继承关系,详见2.4.1。如果想直接跟踪流程,可以直接看2.4.2。
2.4 DispatchedCoroutine
2.4.1 DispatchedCoroutine 的继承关系
internal class DispatchedCoroutine<in T>( context: CoroutineContext, uCont: Continuation<T> ) : ScopeCoroutine<T>(context, uCont) { }
继承于ScopeCoroutine
internal open class ScopeCoroutine<in T>( context: CoroutineContext, @JvmField val uCont: Continuation<T> // unintercepted continuation ) : AbstractCoroutine<T>(context, true, true), CoroutineStackFrame { }
继承于AbstractCoroutine
public abstract class AbstractCoroutine<in T>( parentContext: CoroutineContext, initParentJob: Boolean, active: Boolean ) : JobSupport(active), Job, Continuation<T>, CoroutineScope { }
2.5 协程线程的恢复
2.5.1 AbstractCoroutine.resumeWith()
public final override fun resumeWith(result: Result<T>) { val state = makeCompletingOnce(result.toState()) if (state === COMPLETING_WAITING_CHILDREN) return afterResume(state) }
调用了afterResume
方法,此方法在DispatchedCoroutine
类型有具体实现。见2.5.2
2.5.2 afterResume
//DispatchedCoroutine override fun afterResume(state: Any?) { if (tryResume()) return // completed before getResult invocation -- bail out // Resume in a cancellable way because we have to switch back to the original dispatcher uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont)) }
- 取出当前续体
uCont
,这个续体根据之前的分析:2.2小节,可以知道它等于MainActivity$onCreate$1
。 intercepted()
:取出其分发拦截器resumeCancellableWith
:使用方法拦截器协程体,将uCont续体的状态机逻辑调度到相对应的线程环境执行,这里就是之前的Dispatcher.Default
。注意其注释:“将其切换到原先的分发器”。2⃣而这一过程其实和1.3小节的过程一致。- 恢复到
Dispatcher.Default
继续执行状态机时,由于label已经被更新,所以会往下继续执行,打印最后一句log。
2.6 总结
withContext(Dispatcher.Main)
启动的协程时,取得当前协程续体uCount
也就是MainActivity$onCreate$1
,会计算出新的协程context
,然后用它们创建一个DispatchedCoroutine
。
AnonymousClass1
协程启动时,用DispatchedCoroutine
作为completion
参数,然后启动,此时会调度主线程执行协程。
当协程执行完成后,AnonymousClass1.resumeWith()
方法会调用completion.resumeWith()
。
DispatchedCoroutine.resumeWith()
方法会调用uCount.intercepted().resumeCancellableWith()
,使得父协程进行调度并接着执行状态机逻辑。
2.7 Dispatchers.Main
@JvmStatic public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
直接详见2.7.1
2.7.1 MainDispatcherLoader
internal object MainDispatcherLoader { private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true) @JvmField val dispatcher: MainCoroutineDispatcher = loadMainDispatcher() private fun loadMainDispatcher(): MainCoroutineDispatcher { return try { val factories = if (FAST_SERVICE_LOADER_ENABLED) { FastServiceLoader.loadMainDispatcherFactory() } else { // We are explicitly using the // `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()` // form of the ServiceLoader call to enable R8 optimization when compiled on Android. // 1.获得MainDispatcherFactory的实现类 ServiceLoader.load( MainDispatcherFactory::class.java, MainDispatcherFactory::class.java.classLoader ).iterator().asSequence().toList() } @Suppress("ConstantConditionIf") factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories) ?: createMissingDispatcher() } catch (e: Throwable) { // Service loader can throw an exception as well createMissingDispatcher(e) } } }
- 通过ServiceLoad机制获取
MainDispatcherFactory
的实现类,而在源码里面,其实现类为AndroidDispatcherFactory
- 调用
tryCreateDispatcher()
创建分发器,详见2.7.2。
2.7.2 AndroidDispatcherFactory
internal class AndroidDispatcherFactory : MainDispatcherFactory { override fun createDispatcher(allFactories: List<MainDispatcherFactory>) = HandlerContext(Looper.getMainLooper().asHandler(async = true)) override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used" override val loadPriority: Int get() = Int.MAX_VALUE / 2 }
根据createDispatcher
分发,主线程分发器的实现类为HandlerContext
类型,传入用MainLooper
构建的Handler
。详见2.7.3。
2.7.3 HandlerContext
internal class HandlerContext private constructor( private val handler: Handler, private val name: String?, private val invokeImmediately: Boolean ) : HandlerDispatcher(), Delay { /** * Creates [CoroutineDispatcher] for the given Android [handler]. * * @param handler a handler. * @param name an optional name for debugging. */ constructor( handler: Handler, name: String? = null ) : this(handler, name, false) @Volatile private var _immediate: HandlerContext? = if (invokeImmediately) this else null override val immediate: HandlerContext = _immediate ?: HandlerContext(handler, name, true).also { _immediate = it } override fun isDispatchNeeded(context: CoroutineContext): Boolean { return !invokeImmediately || Looper.myLooper() != handler.looper } override fun dispatch(context: CoroutineContext, block: Runnable) { if (!handler.post(block)) { cancelOnRejection(context, block) } } override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) { val block = Runnable { with(continuation) { resumeUndispatched(Unit) } } if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) { continuation.invokeOnCancellation { handler.removeCallbacks(block) } } else { cancelOnRejection(continuation.context, block) } } ... }
HandlerContext
继承于HandlerDispatcher
,而他的dispatch
方法,可以看到,就是将block丢到设置MainLooper
的handler
执行。所以续体将会在主线程执行状态机,达到切换到主线程执行协程的目的。
以上就是Kotlin协程的线程调度示例详解的详细内容,更多关于Kotlin协程的线程调度的资料请关注脚本之家其它相关文章!