graph LR
coroutineContext-->Element1["Element1: a singleton context by itself."]
coroutineContext-->Element2["Element2: a singleton context by itself."]
coroutineContext-->ContinuationInterceptor["ContinuationInterceptor: DefaultDispatcher."]
coroutineContext-->CoroutineExceptionHandler["CoroutineExceptionHandler: CoroutineExceptionHandlerImpl."]
coroutineContext-->ElementXxx["ElementXxx: a singleton context by itself."]
sequenceDiagram
CurrentThreadType->>DispatchedThreadType: dispatch
activate DispatchedThreadType
CurrentThreadType->>CurrentThreadType: suspend coroutine and release thread
DispatchedThreadType-->>CurrentThreadType: resume
deactivate DispatchedThreadType
graph LR
subgraph 基础层
suspendCoroutine-->suspendCoroutineUninterceptedOrReturn
suspendCancellableCoroutine-->suspendCoroutineUninterceptedOrReturn
end
subgraph 上层功能
delay-->suspendCancellableCoroutine
withContext-->suspendCoroutineUninterceptedOrReturn
awaitSuspend-->suspendCoroutineUninterceptedOrReturn
yield-->suspendCoroutineUninterceptedOrReturn
coroutineScope-->suspendCoroutineUninterceptedOrReturn
end
public val coroutineContext: CoroutineContext
launch本质上也是将用户配置的协程闭包作为一个suspend函数(()->Unit,这里查看编译产物看到实际上是new了对应的suspendLamda子类但是尚未关联第一层的abstractCoroutine,需要在后续的create过程中重新构造该子类对象并提供第一层的对象),并将该函数在指定的dispatcher上执行,和withContext类似
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)//main
return coroutine
}
intercepted过程会用到(context[ContinuationInterceptor]: DefaultScheduler实例DefaultDispatcher作为后续resume时的线程调度器
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
CoroutineStart.start–>invoke
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
when (this) {
//receiver and completion both are StandaloneCoroutine instance,same one
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)//main
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}
commonMain\intrinsics\Cancellable.kt
/**
* Use this function to start coroutine in a cancellable way, so that it can be cancelled
* while waiting to be dispatched.
*/
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)
}
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
try {
block()
} catch (e: Throwable) {
completion.resumeWith(Result.failure(e))
}
}
kotlin/coroutines/intrinsics/IntrinsicsJvm.kt
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
//new编译产物SuspendLambda的子类对象,并将第一层对象StandaloneCoroutine作为参数传递给构造方法
create(receiver, probeCompletion)//main,receiver和probeCompletion都是StandaloneCoroutine{Active}@7e4d11a
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
// Suspension lambdas inherit from this class
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
build/tmp/kotlin-classes/debug/com/xxx/CoroutineTest.class
使用jd-gui打开,对比
object CoroutineTest {
fun test() {
GlobalScope.launch(Dispatchers.Main.immediate) {
println("I'm sleeping ... thread name = ${Thread.currentThread().name}")
delay(500L)//release current thread while suspend current coroutine
println("I'm sleeping over ... thread name = ${Thread.currentThread().name}")
}
println("finished thread name = ${Thread.currentThread().name}")
}
}
对应
public final class CoroutineTest {
public final void test() {
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getMain().getImmediate(), null, new CoroutineTest$test$1(null), 2, null);
Intrinsics.checkExpressionValueIsNotNull(Thread.currentThread(), "Thread.currentThread()");
String str = "finished thread name = " + Thread.currentThread().getName();
boolean bool = false;
System.out.println(str);
}
static final class CoroutineTest$test$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
private CoroutineScope p$;
Object L$0;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
CoroutineScope $this$launch;
String str;
boolean bool;
Object object = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
$this$launch = this.p$;
Intrinsics.checkExpressionValueIsNotNull(Thread.currentThread(), "Thread.currentThread()");
str = "I'm sleeping ... thread name = " + Thread.currentThread().getName();
bool = false;
System.out.println(str);
this.L$0 = $this$launch;
this.label = 1;
if (DelayKt.delay(500L, (Continuation)this) == object)
return object;
DelayKt.delay(500L, (Continuation)this);
Intrinsics.checkExpressionValueIsNotNull(Thread.currentThread(), "Thread.currentThread()");
str = "I'm sleeping over ... thread name = " + Thread.currentThread().getName();
bool = false;
System.out.println(str);
return Unit.INSTANCE;
case 1:
$this$launch = (CoroutineScope)this.L$0;
ResultKt.throwOnFailure($result);
Intrinsics.checkExpressionValueIsNotNull(Thread.currentThread(), "Thread.currentThread()");
str = "I'm sleeping over ... thread name = " + Thread.currentThread().getName();
bool = false;
System.out.println(str);
return Unit.INSTANCE;
}
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
internal fun Result<*>.throwOnFailure() {
if (value is Result.Failure) throw value.exception
}
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
// State machines for named suspend functions extend from this class
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
//context是combineContext:[StandaloneCoroutine{Active}@7e4d11a, DefaultDispatcher]
//(context[ContinuationInterceptor]返回DefaultScheduler实例DefaultDispatcher
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
CoroutineDispatcher.interceptContinuation
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
is DispatchedContinuation -> resumeCancellable(value)
else -> resume(value)
}
inline fun resumeCancellable(value: T) {
if (dispatcher.isDispatchNeeded(context)) {
_state = value
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(value, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatched(value)
}
}
}
}
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
trackTask() // this is needed for virtual time support
val task = createTask(block, taskContext)
// try to submit the task to the local queue and act depending on the result
when (submitToLocalQueue(task, fair)) {
ADDED -> return
NOT_ADDED -> {
// try to offload task to global queue
if (!globalQueue.addLast(task)) {//main
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
requestCpuWorker()
}
else -> requestCpuWorker() // ask for help
}
}
fun addLast(element: E): Boolean {
_cur.loop { cur ->
when (cur.addLast(element)) {
Core.ADD_SUCCESS -> return true
Core.ADD_CLOSED -> return false
Core.ADD_FROZEN -> _cur.compareAndSet(cur, cur.next()) // move to next
}
}
}
jvmMain\scheduling\CoroutineScheduler.kt
internal inner class Worker private constructor() : Thread() {
override fun run() {
var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive
while (!isTerminated && state != WorkerState.TERMINATED) {
val task = findTask()//类似java ThreadPoolExecutor,循环读取WorkQueue中的Task,并执行task
if (task == null) {
// Wait for a job with potential park
if (state == WorkerState.CPU_ACQUIRED) {
cpuWorkerIdle()
} else {
blockingWorkerIdle()
}
wasIdle = true
} else {
// Note: read task.mode before running the task, because Task object will be reused after run
val taskMode = task.mode
if (wasIdle) {
idleReset(taskMode)
wasIdle = false
}
beforeTask(taskMode, task.submissionTime)
runSafely(task)//main
afterTask(taskMode)
}
}
tryReleaseCpu(WorkerState.TERMINATED)
}
}
internal fun findTask(): Task? {
if (tryAcquireCpuPermit()) return findTaskWithCpuPermit()
/*
* If the local queue is empty, try to extract blocking task from global queue.
* It's helpful for two reasons:
* 1) We won't call excess park/unpark here and someone's else CPU token won't be transferred,
* which is a performance win
* 2) It helps with rare race when external submitter sends depending blocking tasks
* one by one and one of the requested workers may miss CPU token
*/
return localQueue.poll() ?: globalQueue.removeFirstWithModeOrNull(TaskMode.PROBABLY_BLOCKING)
}
CoroutineScheduler.runSafely
private fun runSafely(task: Task) {
try {
task.run()
} catch (e: Throwable) {
val thread = Thread.currentThread()
thread.uncaughtExceptionHandler.uncaughtException(thread, e)
} finally {
unTrackTask()
}
}
resume过程参考visio类图
DispatchedTask<in T>
public final override fun run() {
val taskContext = this.taskContext
try {
val delegate = delegate as DispatchedContinuation<T>
//continuation为BaseContinuationImpl子类,实现对第二层的代理
val continuation = delegate.continuation
val context = continuation.context
val job = if (resumeMode.isCancellableMode) context[Job] else null
val state = takeState() // NOTE: Must take state in any case, even if cancelled
withCoroutineContext(context, delegate.countOrElement) {
if (job != null && !job.isActive)
continuation.resumeWithException(job.getCancellationException())
else {
val exception = getExceptionalResult(state)
if (exception != null)
continuation.resumeWithStackTrace(exception)//cancel和其他异常时进入
else
//调用resumeWith(Result.success(value))
continuation.resume(getSuccessfulResult(state))//main
}
}
} catch (e: Throwable) {
throw DispatchException("Unexpected exception running $this", e)
} finally {
taskContext.afterTask()
}
@Suppress("NOTHING_TO_INLINE")
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
resumeWith(Result.failure(recoverStackTrace(exception, this)))
}
//BaseContinuationImpl
//封装了协程的运算逻辑,用以协程的启动和恢复
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)//main
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {//catch了所有异常,封装到outcome里
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {//是AbstractCoroutine实例时
// top-level completion reached -- invoke and return
//completion为AbstractCoroutine实例,实现对第一层的代理
completion.resumeWith(outcome)
return
}
}
}
}
/** commonMain/AbstractCoroutine.kt
* Completes execution of this with coroutine with the specified result.
*/
public final override fun resumeWith(result: Result<T>) {
makeCompletingOnce(result.toState(), defaultResumeMode)
}
//commonMain/JobSupport.kt
internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean = loopOnState { state ->
when (tryMakeCompleting(state, proposedUpdate, mode)) {
COMPLETING_ALREADY_COMPLETING -> throw IllegalStateException("Job $this is already complete or completing, " +
"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)
COMPLETING_COMPLETED -> return true
COMPLETING_WAITING_CHILDREN -> return false
COMPLETING_RETRY -> return@loopOnState
else -> error("unexpected result")
}
}
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
val newContext = oldContext + context
// always check for cancellation of new context
newContext.checkCompletion()
// FAST PATH #1 -- new context is the same as the old one
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont) // MODE_DIRECT
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont) // MODE_UNDISPATCHED
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// SLOW PATH -- use new dispatcher
val coroutine = DispatchedCoroutine(newContext, uCont) // MODE_CANCELLABLE,传递uCont用于在old dispatcher上resume
coroutine.initParentJob()
block.startCoroutineCancellable(coroutine, coroutine)//该方法和launch时的主流程类似,恢复流程由DispatchedCoroutine指定
coroutine.getResult()
}
// Used by withContext when context dispatcher changes
private class DispatchedCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
internal open class ScopeCoroutine<in T>(
context: CoroutineContext,
@JvmField val uCont: Continuation<T> // unintercepted continuation
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
override fun afterCompletionInternal(state: Any?, mode: Int) {
if (state is CompletedExceptionally) {
val exception = if (mode == MODE_IGNORE) state.cause else recoverStackTrace(state.cause, uCont)
uCont.resumeUninterceptedWithExceptionMode(exception, mode)
} else {
uCont.resumeUninterceptedMode(state as T, mode)//uCont是old,eg:Continuation at com.example.myapplication.coroutine.CoroutineTest$postItem$1.invokeSuspend(CoroutineTest.kt:55)
}
}
}
internal fun <T> Continuation<T>.resumeUninterceptedMode(value: T, mode: Int) {
when (mode) {
MODE_ATOMIC_DEFAULT -> intercepted().resume(value)
MODE_CANCELLABLE -> intercepted().resumeCancellable(value)
MODE_DIRECT -> resume(value)
MODE_UNDISPATCHED -> withCoroutineContext(context, null) { resume(value) }
MODE_IGNORE -> {}
else -> error("Invalid mode $mode")
}
}
internal fun <T, R> AbstractCoroutine<T>.startUndispatchedOrReturn(receiver: R, block: suspend R.() -> T): Any? {
initParentJob()
return undispatchedResult({ true }) {
block.startCoroutineUninterceptedOrReturn(receiver, this)
}
}
kotlin/coroutines/intrinsics/IntrinsicsJvm.kt
public actual inline fun <T> (suspend () -> T).startCoroutineUninterceptedOrReturn(
completion: Continuation<T>
): Any? = (this as Function1<Continuation<T>, Any?>).invoke(completion)
/**
* Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*/
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
/** Returns [Delay] implementation of the given context */
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay
/**
* Suspends coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
* the [block]. This function throws [CancellationException] if the coroutine is cancelled or completed while suspended.
*/
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->//uCont是生成的SuspendLambda子类,也就是第二层
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
// NOTE: Before version 1.1.0 the following invocation was inlined here, so invocation of this
// method indicates that the code was compiled by kotlinx.coroutines < 1.1.0
// cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}
//Obtains the current continuation instance inside suspend functions and either suspends currently running coroutine or returns result immediately without suspension.
suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(
crossinline block: (Continuation<T>) -> Any?
): T
/**
* Implements [CoroutineDispatcher] on top of an arbitrary Android [Handler].
*/
internal class HandlerContext
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val timeNanos = delayToNanos(timeMillis)
if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
DelayedResumeTask(now + timeNanos, continuation).also { task ->
continuation.disposeOnCancellation(task)
schedule(now, task)
}
}
}
public fun CancellableContinuation<*>.disposeOnCancellation(handle: DisposableHandle) =
invokeOnCancellation(handler = DisposeOnCancel(handle).asHandler)
private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHandler() {
override fun invoke(cause: Throwable?) = handle.dispose()
}
private inner class DelayedResumeTask(
nanoTime: Long,
private val cont: CancellableContinuation<Unit>
) : DelayedTask(nanoTime) {
override fun run() { with(cont) { resumeUndispatched(Unit) } }
override fun toString(): String = super.toString() + cont.toString()
}
internal abstract class DelayedTask(
/**
* This field can be only modified in [scheduleTask] before putting this DelayedTask
* into heap to avoid overflow and corruption of heap data structure.
*/
@JvmField var nanoTime: Long
) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
@Synchronized
final override fun dispose() {
val heap = _heap
if (heap === DISPOSED_TASK) return // already disposed
@Suppress("UNCHECKED_CAST")
(heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first)
_heap = DISPOSED_TASK // never add again to any heap
}
}
public fun schedule(now: Long, delayedTask: DelayedTask) {
when (scheduleImpl(now, delayedTask)) {
SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
else -> error("unexpected result")
}
}
@PublishedApi
internal fun getResult(): Any? {
installParentCancellationHandler()
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
// if the parent job was already cancelled, then throw the corresponding cancellation exception
// otherwise, there is a race is suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
// before the block returns. This getResult would return a result as opposed to cancellation
// exception that should have happened if the continuation is dispatched for execution later.
if (resumeMode == MODE_CANCELLABLE) {
val job = context[Job]
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelResult(state, cause)
throw recoverStackTrace(cause, this)
}
}
return getSuccessfulResult(state)
}
private const val UNDECIDED = 0
private const val SUSPENDED = 1
private const val RESUMED = 2
private fun trySuspend(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
RESUMED -> return false
else -> error("Already suspended")
}
}
}
with(continuation) { resumeUndispatched(Unit) }
/**
* Calls the specified function [block] with the given [receiver] as its receiver and returns its result.
*/
@kotlin.internal.InlineOnly
public inline fun <T, R> with(receiver: T, block: T.() -> R): R {
return receiver.block()
}
commonMain/CancellableContinuationImpl.kt
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
val dc = delegate as? DispatchedContinuation
resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
}
// returns null when successfully dispatched resumed, CancelledContinuation if too late (was already cancelled)
private fun resumeImpl(proposedUpdate: Any?, resumeMode: Int): CancelledContinuation? {
_state.loop { state ->
when (state) {
is NotCompleted -> {
if (!_state.compareAndSet(state, proposedUpdate)) return@loop // retry on cas failure
disposeParentHandle()
dispatchResume(resumeMode)//main
return null
}
}
}
private fun dispatchResume(mode: Int) {
if (tryResume()) return // completed before getResult invocation -- bail out
// otherwise, getResult has already commenced, i.e. completed later or in other thread
dispatch(mode)
}
private fun tryResume(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
SUSPENDED -> return false
else -> error("Already resumed")
}
}
}
internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
val delegate = this.delegate
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
// dispatch directly using this instance's Runnable implementation
val dispatcher = delegate.dispatcher
val context = delegate.context
if (dispatcher.isDispatchNeeded(context)) {
dispatcher.dispatch(context, this)
} else {
resumeUnconfined()
}
} else {
resume(delegate, mode)//main
}
}
internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: Int) {
// slow-path - use delegate
val state = takeState()
val exception = getExceptionalResult(state)
if (exception != null) {
/*
* Recover stacktrace for non-dispatched tasks.
* We usually do not recover stacktrace in a `resume` as all resumes go through `DispatchedTask.run`
* and we recover stacktraces there, but this is not the case for a `suspend fun main()` that knows nothing about
* kotlinx.coroutines and DispatchedTask
*/
val recovered = if (delegate is DispatchedTask<*>) exception else recoverStackTrace(exception, delegate)
delegate.resumeWithExceptionMode(recovered, useMode)
} else {
delegate.resumeMode(getSuccessfulResult(state), useMode)//main
}
}
internal fun <T> Continuation<T>.resumeMode(value: T, mode: Int) {
when (mode) {
MODE_ATOMIC_DEFAULT -> resume(value)
MODE_CANCELLABLE -> resumeCancellable(value)
MODE_DIRECT -> resumeDirect(value)
MODE_UNDISPATCHED -> (this as DispatchedContinuation).resumeUndispatched(value)
MODE_IGNORE -> {}
else -> error("Invalid mode $mode")
}
}
inline fun resumeUndispatched(value: T) {
//this: DispatchedContinuation[Main [immediate]...]
withCoroutineContext(context, countOrElement) {
continuation.resume(value)//走后续的resume流程
}
}
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
//协程的恢复
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC_DEFAULT
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}
commonMain/CoroutineScope.kt
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn { uCont ->
val coroutine = ScopeCoroutine(uCont.context, uCont)
coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
* A job has the following states:
*
* | **State** | [isActive] | [isCompleted] | [isCancelled] |
* | -------------------------------- | ---------- | ------------- | ------------- |
* | _New_ (optional initial state) | `false` | `false` | `false` |
* | _Active_ (default initial state) | `true` | `false` | `false` |
* | _Completing_ (transient state) | `true` | `false` | `false` |
* | _Cancelling_ (transient state) | `false` | `false` | `true` |
* | _Cancelled_ (final state) | `false` | `true` | `true` |
* | _Completed_ (final state) | `false` | `true` | `false` |
* ```
* wait children
* +-----+ start +--------+ complete +-------------+ finish +-----------+
* | New | -----> | Active | ---------> | Completing | -------> | Completed |
* +-----+ +--------+ +-------------+ +-----------+
* | cancel / fail |
* | +----------------+
* | |
* V V
* +------------+ finish +-----------+
* | Cancelling | --------------------------------> | Cancelled |
* +------------+ +-----------+
* ```
async时释放线程,但不挂起协程
await被调用时不释放线程,但挂起协程
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
public interface Deferred<out T> : Job {
public suspend fun await(): T
......
}
private open class DeferredCoroutine<T>(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
override val onAwait: SelectClause1<T> get() = this
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (T) -> R) =
registerSelectClause1Internal(select, block)
}
internal suspend fun awaitInternal(): Any? {
// fast-path -- check state (avoid extra object creation)
while (true) { // lock-free loop on state
val state = this.state
if (state !is Incomplete) {
// already complete -- just return result
if (state is CompletedExceptionally) { // Slow path to recover stacktrace
recoverAndThrow(state.cause)
}
return state.unboxState()
}
if (startInternal(state) >= 0) break // break unless needs to retry
}
return awaitSuspend() // slow-path
}
private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
val cont = AwaitContinuation(uCont.intercepted(), this)
cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler))
cont.getResult()
}
public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
invokeOnCompletion(onCancelling = false, invokeImmediately = true, handler = handler)
private class ResumeAwaitOnCompletion<T>(
job: JobSupport,
private val continuation: CancellableContinuationImpl<T>
) : JobNode<JobSupport>(job) {
override fun invoke(cause: Throwable?) {
val state = job.state
assert { state !is Incomplete }
if (state is CompletedExceptionally) {
// Resume with exception in atomic way to preserve exception
continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT)
} else {
// Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
@Suppress("UNCHECKED_CAST")
continuation.resume(state.unboxState() as T)
}
}
override fun toString() = "ResumeAwaitOnCompletion[$continuation]"
}
@JvmStatic
//这里的dispatcher就是下面的HandlerContext实例
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
MainDispatcherLoader
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {
public abstract val immediate: MainCoroutineDispatcher
}
public sealed class HandlerDispatcher : MainCoroutineDispatcher(), Delay {
public abstract override val immediate: HandlerDispatcher
}
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
协程resume时根据dispatcher.isDispatchNeeded判断是否需要dispatch
DispatchedContinuation
inline fun resumeCancellable(value: T) {
if (dispatcher.isDispatchNeeded(context)) {//main
_state = value
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(value, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatched(value)
}
}
}
}
发生异常时用于恢复协程堆栈
internal actual typealias CoroutineStackFrame = kotlin.coroutines.jvm.internal.CoroutineStackFrame
/**
* Represents one frame in the coroutine call stack for debugger.
* This interface is implemented by compiler-generated implementations of
* [Continuation] interface.
*/
@SinceKotlin("1.3")
public interface CoroutineStackFrame {
/**
* Returns a reference to the stack frame of the caller of this frame,
* that is a frame before this frame in coroutine call stack.
* The result is `null` for the first frame of coroutine.
*/
public val callerFrame: CoroutineStackFrame?
/**
* Returns stack trace element that correspond to this stack frame.
* The result is `null` if the stack trace element is not available for this frame.
* In this case, the debugger represents this stack frame using the
* result of [toString] function.
*/
public fun getStackTraceElement(): StackTraceElement?
}
DispatchedTask.run
public final override fun run() {
val taskContext = this.taskContext
var fatalException: Throwable? = null
withCoroutineContext(context, delegate.countOrElement) {
val exception = getExceptionalResult(state)
val job = if (resumeMode.isCancellableMode) context[Job] else null
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
if (exception == null && job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) continuation.resumeWithStackTrace(exception)//main
else continuation.resume(getSuccessfulResult(state))
}
}
}
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
resumeWith(Result.failure(recoverStackTrace(exception, this)))
}
internal actual fun <E : Throwable> recoverStackTrace(exception: E, continuation: Continuation<*>): E {
if (!RECOVER_STACK_TRACES || continuation !is CoroutineStackFrame) return exception
return recoverFromStackFrame(exception, continuation)
}
private fun <E : Throwable> recoverFromStackFrame(exception: E, continuation: CoroutineStackFrame): E {
/*
* Here we are checking whether exception has already recovered stacktrace.
* If so, we extract initial and merge recovered stacktrace and current one
*/
val (cause, recoveredStacktrace) = exception.causeAndStacktrace()
// Try to create an exception of the same type and get stacktrace from continuation
val newException = tryCopyException(cause) ?: return exception
val stacktrace = createStackTrace(continuation)
private fun createStackTrace(continuation: CoroutineStackFrame): ArrayDeque<StackTraceElement> {
val stack = ArrayDeque<StackTraceElement>()
continuation.getStackTraceElement()?.let { stack.add(it) }
var last = continuation
while (true) {
last = (last as? CoroutineStackFrame)?.callerFrame ?: break
last.getStackTraceElement()?.let { stack.add(it) }
}
return stack
}