kotlin协程取消

协程的取消

graph

subgraph 协程

subgraph 挂起函数1
开始挂起函数1-->|执行|函数1执行中-->|3:需要函数1内协作判断isAlive,退出计算|函数1执行完成
end

subgraph 挂起函数2
开始挂起函数2-->|执行|函数2执行中-->函数2执行完成
end

函数1执行完成-->resume-->开始挂起函数2

cancel-->|1:cpu密集计算类型:在该协程线程中修改协程job状态为canceling|函数1执行中

resume-->|2:DispatchedTask.run判断job状态|异常resume

开始挂起函数1-->|IO类型:cont.invokeOnCancellation|设置cancel时的处理方法

end

runtask时对CancellationException的判断

协程的取消只是在协程的第一层包装中 AbstractCoroutine 中修改协程的状态,并没有影响到第二层包装中 BaseContinuationImpl 中协程的实际运算逻辑。所以协程的取消只是状态的变化,并不会取消协程的实际运算逻辑,看下面的代码示例:

fun main(args: Array<String>) = runBlocking {
    val job1 = launch(Dispatchers.Default) {
        repeat(5) {
            println("job1 sleep ${it + 1} times")
            delay(500)
        }
    }
    delay(700)
    job1.cancel()
    val job2 = launch(Dispatchers.Default) {
        var nextPrintTime = 0L
        var i = 1
        while (i <= 3) {
            val currentTime = System.currentTimeMillis()
            if (currentTime >= nextPrintTime) {
                println("job2 sleep ${i++} ...")
                nextPrintTime = currentTime + 500L
            }
        }
    }
    delay(700)
    job2.cancel()
}

输出结果如下:

job1 sleep 1 times
job1 sleep 2 times
job2 sleep 1 ...
job2 sleep 2 ...
job2 sleep 3 ...

上面代码中 job1 取消后,delay()会检测协程是否已取消,所以 job1 之后的运算就结束了;而 job2 取消后,没有检测协程状态的逻辑,都是计算逻辑,所以 job2 的运算逻辑还是会继续运行。

所以为了可以及时取消协程的运算逻辑,可以检测协程的状态,使用isActive来判断,上面示例中可以将while(i <= 3)替换为while(isActive)

commonMain/JobSupport.kt

JobSupport.cancel

// external cancel with cause, never invoked implicitly from internal machinery
public override fun cancel(cause: CancellationException?) {
    cancelInternal(cause) // must delegate here, because some classes override cancelInternal(x)
}

public open fun cancelInternal(cause: Throwable?): Boolean =
    cancelImpl(cause) && handlesException

 // cause is Throwable or ParentJob when cancelChild was invoked
 // returns true is exception was handled, false otherwise
    internal fun cancelImpl(cause: Any?): Boolean {
        if (onCancelComplete) {
            // make sure it is completing, if cancelMakeCompleting returns true it means it had make it
            // completing and had recorded exception
            if (cancelMakeCompleting(cause)) return true
            // otherwise just record exception via makeCancelling below
        }
        return makeCancelling(cause)//main
    }

makeCancelling

    // transitions to Cancelling state
    // cause is Throwable or ParentJob when cancelChild was invoked
    private fun makeCancelling(cause: Any?): Boolean {
        var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
        loopOnState { state ->
            when (state) {
                is Finishing -> { // already finishing -- collect exceptions
                    .....
                    return true
                }
                is Incomplete -> {
                    // Not yet finishing -- try to make it cancelling
                    val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
                    if (state.isActive) {
                        // active state becomes cancelling
                        if (tryMakeCancelling(state, causeException)) return true//main
                    } else {
                        // non active state starts completing
                        when (tryMakeCompleting(state, CompletedExceptionally(causeException), mode = MODE_ATOMIC_DEFAULT)) {
                            COMPLETING_ALREADY_COMPLETING -> error("Cannot happen in $state")
                            COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true // ok
                            COMPLETING_RETRY -> return@loopOnState
                            else -> error("unexpected result")
                        }
                    }
                }
                else -> return false // already complete
            }
        }
    }
 // try make new Cancelling state on the condition that we're still in the expected state
    private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean {
        assert { state !is Finishing } // only for non-finishing states
        assert { state.isActive } // only for active states
        // get state's list or else promote to list to correctly operate on child lists
        val list = getOrPromoteCancellingList(state) ?: return false//main
        // Create cancelling state (with rootCause!)
        val cancelling = Finishing(list, false, rootCause)
        if (!_state.compareAndSet(state, cancelling)) return false
        // Notify listeners
        notifyCancelling(list, rootCause)//main
        return true
    }

getOrPromoteCancellingList

    // Performs promotion of incomplete coroutine state to NodeList for the purpose of
    // converting coroutine state to Cancelling, returns null when need to retry
    private fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list

notifyCancelling

private fun notifyCancelling(list: NodeList, cause: Throwable) {
        // first cancel our own children
        onCancelling(cause)
        notifyHandlers<JobCancellingNode<*>>(list, cause)//main
        // then cancel parent
        cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
    }
    private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
        var exception: Throwable? = null
        list.forEach<T> { node ->
            try {
                node.invoke(cause)//main
            } catch (ex: Throwable) {
                exception?.apply { addSuppressedThrowable(ex) } ?: run {
                    exception =  CompletionHandlerException("Exception in completion handler $node for $this", ex)
                }
            }
        }
        exception?.let { handleOnCompletionException(it) }
    }

delay时

ChildContinuation as JobNode

// Same as ChildHandleNode, but for cancellable continuation
internal class ChildContinuation(
    parent: Job,
    @JvmField val child: CancellableContinuationImpl<*>
) : JobCancellingNode<Job>(parent) {
    override fun invoke(cause: Throwable?) {
        child.cancel(child.getContinuationCancellationCause(job))//main
    }
    override fun toString(): String =
        "ChildContinuation[$child]"
}

    /**
     * It is used when parent is cancelled to get the cancellation cause for this continuation.
     */
    open fun getContinuationCancellationCause(parent: Job): Throwable =
        parent.getCancellationException()

CancellableContinuationImpl.cancel

//commonMain/CancellableContinuationImpl.kt
public override fun cancel(cause: Throwable?): Boolean {
        _state.loop { state ->
            if (state !is NotCompleted) return false // false if already complete or cancelling
            // Active -- update to final state
            val update = CancelledContinuation(this, cause, handled = state is CancelHandler)
            if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
            // Invoke cancel handler if it was present
            if (state is CancelHandler) invokeHandlerSafely { state.invoke(cause) }
            // Complete state update
            disposeParentHandle()
            dispatchResume(mode = MODE_ATOMIC_DEFAULT)//main
            return true
        }
    }

dispatchResume

private fun dispatchResume(mode: Int) {
        if (tryResume()) return // completed before getResult invocation -- bail out
        // otherwise, getResult has already commenced, i.e. completed later or in other thread
        dispatch(mode)
    }

internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
    val delegate = this.delegate
    if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
        // dispatch directly using this instance's Runnable implementation
        val dispatcher = delegate.dispatcher
        val context = delegate.context
        if (dispatcher.isDispatchNeeded(context)) {
            dispatcher.dispatch(context, this)
        } else {
            resumeUnconfined()
        }
    } else {
        resume(delegate, mode)//main
    }
}

internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: Int) {
    // slow-path - use delegate
    val state = takeState()
    val exception = getExceptionalResult(state)
    if (exception != null) {
        /*
         * Recover stacktrace for non-dispatched tasks.
         * We usually do not recover stacktrace in a `resume` as all resumes go through `DispatchedTask.run`
         * and we recover stacktraces there, but this is not the case for a `suspend fun main()` that knows nothing about
         * kotlinx.coroutines and DispatchedTask
         */
        val recovered = if (delegate is DispatchedTask<*>) exception else recoverStackTrace(exception, delegate)
        delegate.resumeWithExceptionMode(recovered, useMode)//main
    } else {
        delegate.resumeMode(getSuccessfulResult(state), useMode)
    }
}

delegate.resumeWithExceptionMode

internal fun <T> Continuation<T>.resumeWithExceptionMode(exception: Throwable, mode: Int) {
    when (mode) {
        MODE_ATOMIC_DEFAULT -> resumeWithException(exception)//main
        MODE_CANCELLABLE -> resumeCancellableWithException(exception)
        MODE_DIRECT -> resumeDirectWithException(exception)
        MODE_UNDISPATCHED -> (this as DispatchedContinuation).resumeUndispatchedWithException(exception)
        MODE_IGNORE -> {}
        else -> error("Invalid mode $mode")
    }
}

public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
    resumeWith(Result.failure(exception))

override fun resumeWith(result: Result<T>) {
        val context = continuation.context
        val state = result.toState()
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_ATOMIC_DEFAULT
            dispatcher.dispatch(context, this)//main
        } else {
            executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
                withCoroutineContext(this.context, countOrElement) {
                    continuation.resumeWith(result)//main
                }
            }
        }
    }

后续在dispatchedtask.run走异常resume流程

之后:

nternal abstract class BaseContinuationImpl(
    // This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
    // it has a public getter (since even untrusted code is allowed to inspect its call stack).
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    // This implementation is final. This fact is used to unroll resumeWith recursion.
    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result//cancel时为Failure(kotlinx.coroutines.JobCancellationException
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)//Failure类型的param会导致ResultKt.throwOnFailure($result)抛异常
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                       //cancel时为kotlinx.coroutines.JobCancellationException
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)//后续流程和非cancel异常处理类似,
                  //主要区别1:仅cancel时会notifyCancelling
                  //2.cancelParent(finalException) || handleJobException(finalException)时由于cancelParent为true,
                  //不会走handleJobException的异常处理流程
                    return
                }
            }
        }

参考

https://www.kotlincn.net/docs/reference/coroutines/cancellation-and-timeouts.html

Cancellation in coroutines