kotlin协程Source

总结

类设计

协程的三层包装

  1. 常用的launch和async返回的Job、Deferred,里面封装了协程状态,提供了取消协程接口,而它们的实例都是继承自AbstractCoroutine,它是协程的第一层包装。
  2. 第二层包装是编译器生成的SuspendLambda的子类,封装了协程的真正运算逻辑,继承自BaseContinuationImpl,其中completion属性就是协程的第一层包装。
  3. 第三层包装是前面分析协程的线程调度时提到的DispatchedContinuation,封装了线程调度逻辑,包含了协程的第二层包装。三层包装都实现了Continuation接口,通过代理模式将协程的各层包装组合在一起,每层负责不同的功能。

resumeWith

CoroutineContext

graph LR
coroutineContext-->Element1["Element1: a singleton context by itself."]
coroutineContext-->Element2["Element2: a singleton context by itself."]
coroutineContext-->ContinuationInterceptor["ContinuationInterceptor: DefaultDispatcher."]
coroutineContext-->CoroutineExceptionHandler["CoroutineExceptionHandler: CoroutineExceptionHandlerImpl."]
coroutineContext-->ElementXxx["ElementXxx: a singleton context by itself."]

挂起和恢复设计

sequenceDiagram
CurrentThreadType->>DispatchedThreadType: dispatch
activate DispatchedThreadType
CurrentThreadType->>CurrentThreadType: suspend coroutine and release thread
DispatchedThreadType-->>CurrentThreadType: resume
deactivate DispatchedThreadType
graph LR
subgraph 基础层
suspendCoroutine-->suspendCoroutineUninterceptedOrReturn
suspendCancellableCoroutine-->suspendCoroutineUninterceptedOrReturn
end

subgraph 上层功能
delay-->suspendCancellableCoroutine
withContext-->suspendCoroutineUninterceptedOrReturn
awaitSuspend-->suspendCoroutineUninterceptedOrReturn
yield-->suspendCoroutineUninterceptedOrReturn
coroutineScope-->suspendCoroutineUninterceptedOrReturn
end

Coroutine构造和启动

CoroutineScope.launch

public val coroutineContext: CoroutineContext

launch本质上也是将用户配置的协程闭包作为一个suspend函数(()->Unit,这里查看编译产物看到实际上是new了对应的suspendLamda子类但是尚未关联第一层的abstractCoroutine,需要在后续的create过程中重新构造该子类对象并提供第一层的对象),并将该函数在指定的dispatcher上执行,和withContext类似

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)//main
    return coroutine
}

CoroutineScope.newCoroutineContext

intercepted过程会用到(context[ContinuationInterceptor]: DefaultScheduler实例DefaultDispatcher作为后续resume时的线程调度器

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = coroutineContext + context
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}

coroutine.start

CoroutineStart.start–>invoke

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    initParentJob()
    start(block, receiver, this)
}
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
    when (this) {
        //receiver and completion both are StandaloneCoroutine instance,same one
        CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)//main
        CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
        CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
        CoroutineStart.LAZY -> Unit // will start lazily
    }

commonMain\intrinsics\Cancellable.kt

/**
 * Use this function to start coroutine in a cancellable way, so that it can be cancelled
 * while waiting to be dispatched.
 */
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)
    }
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
    try {
        block()
    } catch (e: Throwable) {
        completion.resumeWith(Result.failure(e))
    }
}

kotlin/coroutines/intrinsics/IntrinsicsJvm.kt

(suspend R.() -> T).createCoroutineUnintercepted

public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        //new编译产物SuspendLambda的子类对象,并将第一层对象StandaloneCoroutine作为参数传递给构造方法
        create(receiver, probeCompletion)//main,receiver和probeCompletion都是StandaloneCoroutine{Active}@7e4d11a
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}
// Suspension lambdas inherit from this class
internal abstract class SuspendLambda(
    public override val arity: Int,
    completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
CoroutineCompileOutput

build/tmp/kotlin-classes/debug/com/xxx/CoroutineTest.class

使用jd-gui打开,对比

object CoroutineTest {
    fun test() {
        GlobalScope.launch(Dispatchers.Main.immediate) {
            println("I'm sleeping ... thread name =  ${Thread.currentThread().name}")
            delay(500L)//release current thread while suspend current coroutine
            println("I'm sleeping over ... thread name =  ${Thread.currentThread().name}")
        }
        println("finished thread name =  ${Thread.currentThread().name}")
    }
}

对应

public final class CoroutineTest {
 
  public final void test() {
    BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getMain().getImmediate(), null, new CoroutineTest$test$1(null), 2, null);
    Intrinsics.checkExpressionValueIsNotNull(Thread.currentThread(), "Thread.currentThread()");
    String str = "finished thread name =  " + Thread.currentThread().getName();
    boolean bool = false;
    System.out.println(str);
  }
  
static final class CoroutineTest$test$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
    private CoroutineScope p$;
    Object L$0;
    int label;
    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
      CoroutineScope $this$launch;
      String str;
      boolean bool;
      Object object = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch (this.label) {
        case 0:
          ResultKt.throwOnFailure($result);
          $this$launch = this.p$;
          Intrinsics.checkExpressionValueIsNotNull(Thread.currentThread(), "Thread.currentThread()");
          str = "I'm sleeping ... thread name =  " + Thread.currentThread().getName();
          bool = false;
          System.out.println(str);
          this.L$0 = $this$launch;
          this.label = 1;
          if (DelayKt.delay(500L, (Continuation)this) == object)
            return object; 
          DelayKt.delay(500L, (Continuation)this);
          Intrinsics.checkExpressionValueIsNotNull(Thread.currentThread(), "Thread.currentThread()");
          str = "I'm sleeping over ... thread name =  " + Thread.currentThread().getName();
          bool = false;
          System.out.println(str);
          return Unit.INSTANCE;
        case 1:
          $this$launch = (CoroutineScope)this.L$0;
          ResultKt.throwOnFailure($result);
          Intrinsics.checkExpressionValueIsNotNull(Thread.currentThread(), "Thread.currentThread()");
          str = "I'm sleeping over ... thread name =  " + Thread.currentThread().getName();
          bool = false;
          System.out.println(str);
          return Unit.INSTANCE;
      } 
      throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }
}
  
internal fun Result<*>.throwOnFailure() {
    if (value is Result.Failure) throw value.exception
}

ContinuationImpl.intercepted()

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this
// State machines for named suspend functions extend from this class
internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)

    public override val context: CoroutineContext
        get() = _context!!
    
    @Transient
    private var intercepted: Continuation<Any?>? = null

    public fun intercepted(): Continuation<Any?> =
        //context是combineContext:[StandaloneCoroutine{Active}@7e4d11a, DefaultDispatcher]
        //(context[ContinuationInterceptor]返回DefaultScheduler实例DefaultDispatcher
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

CoroutineDispatcher.interceptContinuation

public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
    DispatchedContinuation(this, continuation)

DispatchedContinuation.resumeCancellable

internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
    is DispatchedContinuation -> resumeCancellable(value)
    else -> resume(value)
}
inline fun resumeCancellable(value: T) {
    if (dispatcher.isDispatchNeeded(context)) {
        _state = value
        resumeMode = MODE_CANCELLABLE
        dispatcher.dispatch(context, this)
    } else {
        executeUnconfined(value, MODE_CANCELLABLE) {
            if (!resumeCancelled()) {
                resumeUndispatched(value)
            }
        }
    }
}
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
    try {
        coroutineScheduler.dispatch(block)
    } catch (e: RejectedExecutionException) {
        DefaultExecutor.dispatch(context, block)
    }
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
    trackTask() // this is needed for virtual time support
    val task = createTask(block, taskContext)
    // try to submit the task to the local queue and act depending on the result
    when (submitToLocalQueue(task, fair)) {
        ADDED -> return
        NOT_ADDED -> {
            // try to offload task to global queue
            if (!globalQueue.addLast(task)) {//main
                // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
                throw RejectedExecutionException("$schedulerName was terminated")
            }
            requestCpuWorker()
        }
        else -> requestCpuWorker() // ask for help
    }
}
fun addLast(element: E): Boolean {
    _cur.loop { cur ->
        when (cur.addLast(element)) {
            Core.ADD_SUCCESS -> return true
            Core.ADD_CLOSED -> return false
            Core.ADD_FROZEN -> _cur.compareAndSet(cur, cur.next()) // move to next
        }
    }
}

线程调度

jvmMain\scheduling\CoroutineScheduler.kt

internal inner class Worker private constructor() : Thread() {
        override fun run() {
            var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive
            while (!isTerminated && state != WorkerState.TERMINATED) {
                val task = findTask()//类似java ThreadPoolExecutor,循环读取WorkQueue中的Task,并执行task
                if (task == null) {
                    // Wait for a job with potential park
                    if (state == WorkerState.CPU_ACQUIRED) {
                        cpuWorkerIdle()
                    } else {
                        blockingWorkerIdle()
                    }
                    wasIdle = true
                } else {
                    // Note: read task.mode before running the task, because Task object will be reused after run
                    val taskMode = task.mode
                    if (wasIdle) {
                        idleReset(taskMode)
                        wasIdle = false
                    }
                    beforeTask(taskMode, task.submissionTime)
                    runSafely(task)//main
                    afterTask(taskMode)
                }
            }
            tryReleaseCpu(WorkerState.TERMINATED)
        }
}
        
internal fun findTask(): Task? {
    if (tryAcquireCpuPermit()) return findTaskWithCpuPermit()
    /*
     * If the local queue is empty, try to extract blocking task from global queue.
     * It's helpful for two reasons:
     * 1) We won't call excess park/unpark here and someone's else CPU token won't be transferred,
     *    which is a performance win
     * 2) It helps with rare race when external submitter sends depending blocking tasks
     *    one by one and one of the requested workers may miss CPU token
     */
    return localQueue.poll() ?: globalQueue.removeFirstWithModeOrNull(TaskMode.PROBABLY_BLOCKING)
}

CoroutineScheduler.runSafely

private fun runSafely(task: Task) {
    try {
        task.run()
    } catch (e: Throwable) {
        val thread = Thread.currentThread()
        thread.uncaughtExceptionHandler.uncaughtException(thread, e)
    } finally {
        unTrackTask()
    }
}

runTask

resume过程参考visio类图

resumewith

DispatchedTask<in T>

public final override fun run() {
   val taskContext = this.taskContext
   try {
     val delegate = delegate as DispatchedContinuation<T>

    //continuation为BaseContinuationImpl子类,实现对第二层的代理
    val continuation = delegate.continuation
     val context = continuation.context
     val job = if (resumeMode.isCancellableMode) context[Job] else null
     val state = takeState() // NOTE: Must take state in any case, even if cancelled
     withCoroutineContext(context, delegate.countOrElement) {
       if (job != null && !job.isActive)
         continuation.resumeWithException(job.getCancellationException())
       else {
         val exception = getExceptionalResult(state)
         if (exception != null)
           continuation.resumeWithStackTrace(exception)//cancel和其他异常时进入
         else
          //调用resumeWith(Result.success(value))
           continuation.resume(getSuccessfulResult(state))//main
       }
     }
   } catch (e: Throwable) {
     throw DispatchException("Unexpected exception running $this", e)
   } finally {
     taskContext.afterTask()
   }
@Suppress("NOTHING_TO_INLINE")
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
    resumeWith(Result.failure(recoverStackTrace(exception, this)))
}

SuspendLambda_resumeWith

//BaseContinuationImpl
//封装了协程的运算逻辑,用以协程的启动和恢复
public final override fun resumeWith(result: Result<Any?>) {
    // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
    var current = this
    var param = result
    while (true) {
        // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
        // can precisely track what part of suspended callstack was already resumed
        probeCoroutineResumed(current)
        with(current) {
            val completion = completion!! // fail fast when trying to resume continuation without completion
            val outcome: Result<Any?> =
                try {
                    val outcome = invokeSuspend(param)//main
                    if (outcome === COROUTINE_SUSPENDED) return
                    Result.success(outcome)
                } catch (exception: Throwable) {//catch了所有异常,封装到outcome里
                    Result.failure(exception)
                }
            releaseIntercepted() // this state machine instance is terminating
            if (completion is BaseContinuationImpl) {
                // unrolling recursion via loop
                current = completion
                param = outcome
            } else {//是AbstractCoroutine实例时
                // top-level completion reached -- invoke and return
                //completion为AbstractCoroutine实例,实现对第一层的代理
                completion.resumeWith(outcome)
                return
            }
        }
    }
}

coroutinecompileoutput

AbstractCoroutine.resumeWith

/** commonMain/AbstractCoroutine.kt
 * Completes execution of this with coroutine with the specified result.
 */
public final override fun resumeWith(result: Result<T>) {
    makeCompletingOnce(result.toState(), defaultResumeMode)
}

//commonMain/JobSupport.kt
 internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean = loopOnState { state ->
        when (tryMakeCompleting(state, proposedUpdate, mode)) {
            COMPLETING_ALREADY_COMPLETING -> throw IllegalStateException("Job $this is already complete or completing, " +
                "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
            COMPLETING_COMPLETED -> return true
            COMPLETING_WAITING_CHILDREN -> return false
            COMPLETING_RETRY -> return@loopOnState
            else -> error("unexpected result")
        }
    }

withContext(基于suspendCoroutineUninterceptedOrReturn)

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
    // compute new context
    val oldContext = uCont.context
    val newContext = oldContext + context
    // always check for cancellation of new context
    newContext.checkCompletion()
    // FAST PATH #1 -- new context is the same as the old one
    if (newContext === oldContext) {
        val coroutine = ScopeCoroutine(newContext, uCont) // MODE_DIRECT
        return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
    }
    // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
    // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
    if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
        val coroutine = UndispatchedCoroutine(newContext, uCont) // MODE_UNDISPATCHED
        // There are changes in the context, so this thread needs to be updated
        withCoroutineContext(newContext, null) {
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        }
    }
    // SLOW PATH -- use new dispatcher
    val coroutine = DispatchedCoroutine(newContext, uCont) // MODE_CANCELLABLE,传递uCont用于在old dispatcher上resume
    coroutine.initParentJob()
    block.startCoroutineCancellable(coroutine, coroutine)//该方法和launch时的主流程类似,恢复流程由DispatchedCoroutine指定
    coroutine.getResult()
}

dispatched

// Used by withContext when context dispatcher changes
private class DispatchedCoroutine<in T>(
    context: CoroutineContext,
    uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {

监听completion通知,执行resume

internal open class ScopeCoroutine<in T>(
    context: CoroutineContext,
    @JvmField val uCont: Continuation<T> // unintercepted continuation
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
  
      override fun afterCompletionInternal(state: Any?, mode: Int) {
        if (state is CompletedExceptionally) {
            val exception = if (mode == MODE_IGNORE) state.cause else recoverStackTrace(state.cause, uCont)
            uCont.resumeUninterceptedWithExceptionMode(exception, mode)
        } else {
            uCont.resumeUninterceptedMode(state as T, mode)//uCont是old,eg:Continuation at com.example.myapplication.coroutine.CoroutineTest$postItem$1.invokeSuspend(CoroutineTest.kt:55)
        }
    }
}
internal fun <T> Continuation<T>.resumeUninterceptedMode(value: T, mode: Int) {
    when (mode) {
        MODE_ATOMIC_DEFAULT -> intercepted().resume(value)
        MODE_CANCELLABLE -> intercepted().resumeCancellable(value)
        MODE_DIRECT -> resume(value)
        MODE_UNDISPATCHED -> withCoroutineContext(context, null) { resume(value) }
        MODE_IGNORE -> {}
        else -> error("Invalid mode $mode")
    }
}

resume

undispatched

internal fun <T, R> AbstractCoroutine<T>.startUndispatchedOrReturn(receiver: R, block: suspend R.() -> T): Any? {
    initParentJob()
    return undispatchedResult({ true }) {
        block.startCoroutineUninterceptedOrReturn(receiver, this)
    }
}

kotlin/coroutines/intrinsics/IntrinsicsJvm.kt

public actual inline fun <T> (suspend () -> T).startCoroutineUninterceptedOrReturn(
    completion: Continuation<T>
): Any? = (this as Function1<Continuation<T>, Any?>).invoke(completion)

delay(基于suspendCancellableCoroutine)

/**
 * Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
 * This suspending function is cancellable.
 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
 * immediately resumes with [CancellationException].
 */
public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}

/** Returns [Delay] implementation of the given context */
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay

suspendCancellableCoroutine

/**
 * Suspends coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
 * the [block]. This function throws [CancellationException] if the coroutine is cancelled or completed while suspended.
 */
public suspend inline fun <T> suspendCancellableCoroutine(
    crossinline block: (CancellableContinuation<T>) -> Unit
): T =
    suspendCoroutineUninterceptedOrReturn { uCont ->//uCont是生成的SuspendLambda子类,也就是第二层
        val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
        // NOTE: Before version 1.1.0 the following invocation was inlined here, so invocation of this
        // method indicates that the code was compiled by kotlinx.coroutines < 1.1.0
        // cancellable.initCancellability()
        block(cancellable)
        cancellable.getResult()
    }

suspendCoroutineUninterceptedOrReturn

//Obtains the current continuation instance inside suspend functions and either suspends currently running coroutine or returns result immediately without suspension.
suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(
    crossinline block: (Continuation<T>) -> Any?
): T

main线程delay,执行block

/**
 * Implements [CoroutineDispatcher] on top of an arbitrary Android [Handler].
 */
internal class HandlerContext

  override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
    val block = Runnable {
        with(continuation) { resumeUndispatched(Unit) }
    }
    handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
    continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}

非main线程delay,执行block

public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
    val timeNanos = delayToNanos(timeMillis)
    if (timeNanos < MAX_DELAY_NS) {
        val now = nanoTime()
        DelayedResumeTask(now + timeNanos, continuation).also { task ->
            continuation.disposeOnCancellation(task)
            schedule(now, task)
        }
    }
}

CancellableContinuation<*>.disposeOnCancellation(handle: DisposableHandle)

public fun CancellableContinuation<*>.disposeOnCancellation(handle: DisposableHandle) =
    invokeOnCancellation(handler = DisposeOnCancel(handle).asHandler)

private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHandler() {
    override fun invoke(cause: Throwable?) = handle.dispose()
}
private inner class DelayedResumeTask(
    nanoTime: Long,
    private val cont: CancellableContinuation<Unit>
) : DelayedTask(nanoTime) {
    override fun run() { with(cont) { resumeUndispatched(Unit) } }
    override fun toString(): String = super.toString() + cont.toString()
}
    internal abstract class DelayedTask(
        /**
         * This field can be only modified in [scheduleTask] before putting this DelayedTask
         * into heap to avoid overflow and corruption of heap data structure.
         */
        @JvmField var nanoTime: Long
    ) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
        
        @Synchronized
        final override fun dispose() {
            val heap = _heap
            if (heap === DISPOSED_TASK) return // already disposed
            @Suppress("UNCHECKED_CAST")
            (heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first)
            _heap = DISPOSED_TASK // never add again to any heap
        }
    }

DefaultExecutor.schedule

public fun schedule(now: Long, delayedTask: DelayedTask) {
    when (scheduleImpl(now, delayedTask)) {
        SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
        SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
        SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
        else -> error("unexpected result")
    }
}

getResult

@PublishedApi
internal fun getResult(): Any? {
    installParentCancellationHandler()
    if (trySuspend()) return COROUTINE_SUSPENDED
    // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
    val state = this.state
    if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
    // if the parent job was already cancelled, then throw the corresponding cancellation exception
    // otherwise, there is a race is suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
    // before the block returns. This getResult would return a result as opposed to cancellation
    // exception that should have happened if the continuation is dispatched for execution later.
    if (resumeMode == MODE_CANCELLABLE) {
        val job = context[Job]
        if (job != null && !job.isActive) {
            val cause = job.getCancellationException()
            cancelResult(state, cause)
            throw recoverStackTrace(cause, this)
        }
    }
    return getSuccessfulResult(state)
}

trySuspend释放线程

private const val UNDECIDED = 0
private const val SUSPENDED = 1
private const val RESUMED = 2

private fun trySuspend(): Boolean {
    _decision.loop { decision ->
        when (decision) {
            UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
            RESUMED -> return false
            else -> error("Already suspended")
        }
    }
}

main线程delay,恢复协程的后续执行

with(continuation) { resumeUndispatched(Unit) }
/**
 * Calls the specified function [block] with the given [receiver] as its receiver and returns its result.
 */
@kotlin.internal.InlineOnly
public inline fun <T, R> with(receiver: T, block: T.() -> R): R {
    return receiver.block()
}

commonMain/CancellableContinuationImpl.kt

override fun CoroutineDispatcher.resumeUndispatched(value: T) {
    val dc = delegate as? DispatchedContinuation
    resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
}
// returns null when successfully dispatched resumed, CancelledContinuation if too late (was already cancelled)
private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int): CancelledContinuation? {
    _state.loop { state ->
        when (state) {
            is NotCompleted -> {
                if (!_state.compareAndSet(state, proposedUpdate)) return@loop // retry on cas failure
                disposeParentHandle()
                dispatchResume(resumeMode)//main
                return null
            }
    }
}
    private fun dispatchResume(mode: Int) {
        if (tryResume()) return // completed before getResult invocation -- bail out
        // otherwise, getResult has already commenced, i.e. completed later or in other thread
        dispatch(mode)
    }

tryResume

private fun tryResume(): Boolean {
    _decision.loop { decision ->
        when (decision) {
            UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
            SUSPENDED -> return false
            else -> error("Already resumed")
        }
    }
}

DispatchedTask.dispatch

internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
    val delegate = this.delegate
    if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
        // dispatch directly using this instance's Runnable implementation
        val dispatcher = delegate.dispatcher
        val context = delegate.context
        if (dispatcher.isDispatchNeeded(context)) {
            dispatcher.dispatch(context, this)
        } else {
            resumeUnconfined()
        }
    } else {
        resume(delegate, mode)//main
    }
}
internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: Int) {
    // slow-path - use delegate
    val state = takeState()
    val exception = getExceptionalResult(state)
    if (exception != null) {
        /*
         * Recover stacktrace for non-dispatched tasks.
         * We usually do not recover stacktrace in a `resume` as all resumes go through `DispatchedTask.run`
         * and we recover stacktraces there, but this is not the case for a `suspend fun main()` that knows nothing about
         * kotlinx.coroutines and DispatchedTask
         */
        val recovered = if (delegate is DispatchedTask<*>) exception else recoverStackTrace(exception, delegate)
        delegate.resumeWithExceptionMode(recovered, useMode)
    } else {
        delegate.resumeMode(getSuccessfulResult(state), useMode)//main
    }
}
internal fun <T> Continuation<T>.resumeMode(value: T, mode: Int) {
    when (mode) {
        MODE_ATOMIC_DEFAULT -> resume(value)
        MODE_CANCELLABLE -> resumeCancellable(value)
        MODE_DIRECT -> resumeDirect(value)
        MODE_UNDISPATCHED -> (this as DispatchedContinuation).resumeUndispatched(value)
        MODE_IGNORE -> {}
        else -> error("Invalid mode $mode")
    }
}
inline fun resumeUndispatched(value: T) {
  //this: DispatchedContinuation[Main [immediate]...]
    withCoroutineContext(context, countOrElement) {
        continuation.resume(value)//走后续的resume流程
    }
}
public inline fun <T> Continuation<T>.resume(value: T): Unit =
   resumeWith(Result.success(value))
//协程的恢复
override fun resumeWith(result: Result<T>) {
    val context = continuation.context
    val state = result.toState()
    if (dispatcher.isDispatchNeeded(context)) {
        _state = state
        resumeMode = MODE_ATOMIC_DEFAULT
        dispatcher.dispatch(context, this)
    } else {
        executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
            withCoroutineContext(this.context, countOrElement) {
                continuation.resumeWith(result)
            }
        }
    }
}

继续中间层的resume流程

coroutineScope(基于suspendCoroutineUninterceptedOrReturn)

commonMain/CoroutineScope.kt

public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return suspendCoroutineUninterceptedOrReturn { uCont ->
        val coroutine = ScopeCoroutine(uCont.context, uCont)
        coroutine.startUndispatchedOrReturn(coroutine, block)
    }
}

Job

* A job has the following states:
*
* | **State**                        | [isActive] | [isCompleted] | [isCancelled] |
* | -------------------------------- | ---------- | ------------- | ------------- |
* | _New_ (optional initial state)   | `false`    | `false`       | `false`       |
* | _Active_ (default initial state) | `true`     | `false`       | `false`       |
* | _Completing_ (transient state)   | `true`     | `false`       | `false`       |
* | _Cancelling_ (transient state)   | `false`    | `false`       | `true`        |
* | _Cancelled_ (final state)        | `false`    | `true`        | `true`        |
* | _Completed_ (final state)        | `false`    | `true`        | `false`       |
* ```
*                                       wait children
* +-----+ start  +--------+ complete   +-------------+  finish  +-----------+
* | New | -----> | Active | ---------> | Completing  | -------> | Completed |
* +-----+        +--------+            +-------------+          +-----------+
*                  |  cancel / fail       |
*                  |     +----------------+
*                  |     |
*                  V     V
*              +------------+                           finish  +-----------+
*              | Cancelling | --------------------------------> | Cancelled |
*              +------------+                                   +-----------+
* ```

async

async时释放线程,但不挂起协程

await被调用时不释放线程,但挂起协程

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
public interface Deferred<out T> : Job {
  public suspend fun await(): T
  ......
}
private open class DeferredCoroutine<T>(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
    override fun getCompleted(): T = getCompletedInternal() as T
    override suspend fun await(): T = awaitInternal() as T
    override val onAwait: SelectClause1<T> get() = this
    override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (T) -> R) =
        registerSelectClause1Internal(select, block)
}

awaitInternal

internal suspend fun awaitInternal(): Any? {
    // fast-path -- check state (avoid extra object creation)
    while (true) { // lock-free loop on state
        val state = this.state
        if (state !is Incomplete) {
            // already complete -- just return result
            if (state is CompletedExceptionally) { // Slow path to recover stacktrace
                recoverAndThrow(state.cause)
            }
            return state.unboxState()

        }
        if (startInternal(state) >= 0) break // break unless needs to retry
    }
    return awaitSuspend() // slow-path
}
private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
    val cont = AwaitContinuation(uCont.intercepted(), this)
    cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler))
    cont.getResult()
}

invokeOnCompletion

public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
    invokeOnCompletion(onCancelling = false, invokeImmediately = true, handler = handler)

ResumeAwaitOnCompletion

private class ResumeAwaitOnCompletion<T>(
    job: JobSupport,
    private val continuation: CancellableContinuationImpl<T>
) : JobNode<JobSupport>(job) {
    override fun invoke(cause: Throwable?) {
        val state = job.state
        assert { state !is Incomplete }
        if (state is CompletedExceptionally) {
            // Resume with exception in atomic way to preserve exception
            continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT)
        } else {
            // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
            @Suppress("UNCHECKED_CAST")
            continuation.resume(state.unboxState() as T)
        }
    }
    override fun toString() = "ResumeAwaitOnCompletion[$continuation]"
}

Dispatchers.Main.immediate

@JvmStatic
//这里的dispatcher就是下面的HandlerContext实例
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

MainDispatcherLoader
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {
  public abstract val immediate: MainCoroutineDispatcher
}

public sealed class HandlerDispatcher : MainCoroutineDispatcher(), Delay {
  public abstract override val immediate: HandlerDispatcher
}
internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {

@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null

override val immediate: HandlerContext = _immediate ?:
        HandlerContext(handler, name, true).also { _immediate = it }
  
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
    return !invokeImmediately || Looper.myLooper() != handler.looper
}

协程resume时根据dispatcher.isDispatchNeeded判断是否需要dispatch

DispatchedContinuation
inline fun resumeCancellable(value: T) {
    if (dispatcher.isDispatchNeeded(context)) {//main
        _state = value
        resumeMode = MODE_CANCELLABLE
        dispatcher.dispatch(context, this)
    } else {
        executeUnconfined(value, MODE_CANCELLABLE) {
            if (!resumeCancelled()) {
                resumeUndispatched(value)
            }
        }
    }
}

CoroutineStackFrame

发生异常时用于恢复协程堆栈

internal actual typealias CoroutineStackFrame = kotlin.coroutines.jvm.internal.CoroutineStackFrame
/**
 * Represents one frame in the coroutine call stack for debugger.
 * This interface is implemented by compiler-generated implementations of
 * [Continuation] interface.
 */
@SinceKotlin("1.3")
public interface CoroutineStackFrame {
    /**
     * Returns a reference to the stack frame of the caller of this frame,
     * that is a frame before this frame in coroutine call stack.
     * The result is `null` for the first frame of coroutine.
     */
    public val callerFrame: CoroutineStackFrame?

    /**
     * Returns stack trace element that correspond to this stack frame.
     * The result is `null` if the stack trace element is not available for this frame.
     * In this case, the debugger represents this stack frame using the
     * result of [toString] function.
     */
    public fun getStackTraceElement(): StackTraceElement?
}

DispatchedTask.run

public final override fun run() {
    val taskContext = this.taskContext
    var fatalException: Throwable? = null
        withCoroutineContext(context, delegate.countOrElement) {
            val exception = getExceptionalResult(state)
            val job = if (resumeMode.isCancellableMode) context[Job] else null
            /*
             * Check whether continuation was originally resumed with an exception.
             * If so, it dominates cancellation, otherwise the original exception
             * will be silently lost.
             */
            if (exception == null && job != null && !job.isActive) {
                val cause = job.getCancellationException()
                cancelResult(state, cause)
                continuation.resumeWithStackTrace(cause)
            } else {
                if (exception != null) continuation.resumeWithStackTrace(exception)//main
                else continuation.resume(getSuccessfulResult(state))
            }
        }
}
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
    resumeWith(Result.failure(recoverStackTrace(exception, this)))
}
internal actual fun <E : Throwable> recoverStackTrace(exception: E, continuation: Continuation<*>): E {
    if (!RECOVER_STACK_TRACES || continuation !is CoroutineStackFrame) return exception
    return recoverFromStackFrame(exception, continuation)
}
private fun <E : Throwable> recoverFromStackFrame(exception: E, continuation: CoroutineStackFrame): E {
    /*
    * Here we are checking whether exception has already recovered stacktrace.
    * If so, we extract initial and merge recovered stacktrace and current one
    */
    val (cause, recoveredStacktrace) = exception.causeAndStacktrace()

    // Try to create an exception of the same type and get stacktrace from continuation
    val newException = tryCopyException(cause) ?: return exception
    val stacktrace = createStackTrace(continuation)
private fun createStackTrace(continuation: CoroutineStackFrame): ArrayDeque<StackTraceElement> {
    val stack = ArrayDeque<StackTraceElement>()
    continuation.getStackTraceElement()?.let { stack.add(it) }

    var last = continuation
    while (true) {
        last = (last as? CoroutineStackFrame)?.callerFrame ?: break
        last.getStackTraceElement()?.let { stack.add(it) }
    }
    return stack
}