graph
subgraph 协程
subgraph 挂起函数1
开始挂起函数1-->|执行|函数1执行中-->|3:需要函数1内协作判断isAlive,退出计算|函数1执行完成
end
subgraph 挂起函数2
开始挂起函数2-->|执行|函数2执行中-->函数2执行完成
end
函数1执行完成-->resume-->开始挂起函数2
cancel-->|1:cpu密集计算类型:在该协程线程中修改协程job状态为canceling|函数1执行中
resume-->|2:DispatchedTask.run判断job状态|异常resume
开始挂起函数1-->|IO类型:cont.invokeOnCancellation|设置cancel时的处理方法
end
runtask时对CancellationException的判断
协程的取消只是在协程的第一层包装中 AbstractCoroutine 中修改协程的状态,并没有影响到第二层包装中 BaseContinuationImpl 中协程的实际运算逻辑。所以协程的取消只是状态的变化,并不会取消协程的实际运算逻辑,看下面的代码示例:
fun main(args: Array<String>) = runBlocking {
val job1 = launch(Dispatchers.Default) {
repeat(5) {
println("job1 sleep ${it + 1} times")
delay(500)
}
}
delay(700)
job1.cancel()
val job2 = launch(Dispatchers.Default) {
var nextPrintTime = 0L
var i = 1
while (i <= 3) {
val currentTime = System.currentTimeMillis()
if (currentTime >= nextPrintTime) {
println("job2 sleep ${i++} ...")
nextPrintTime = currentTime + 500L
}
}
}
delay(700)
job2.cancel()
}
输出结果如下:
job1 sleep 1 times
job1 sleep 2 times
job2 sleep 1 ...
job2 sleep 2 ...
job2 sleep 3 ...
上面代码中 job1 取消后,delay()
会检测协程是否已取消,所以 job1 之后的运算就结束了;而 job2 取消后,没有检测协程状态的逻辑,都是计算逻辑,所以 job2 的运算逻辑还是会继续运行。
所以为了可以及时取消协程的运算逻辑,可以检测协程的状态,使用isActive
来判断,上面示例中可以将while(i <= 3)
替换为while(isActive)
。
commonMain/JobSupport.kt
// external cancel with cause, never invoked implicitly from internal machinery
public override fun cancel(cause: CancellationException?) {
cancelInternal(cause) // must delegate here, because some classes override cancelInternal(x)
}
public open fun cancelInternal(cause: Throwable?): Boolean =
cancelImpl(cause) && handlesException
// cause is Throwable or ParentJob when cancelChild was invoked
// returns true is exception was handled, false otherwise
internal fun cancelImpl(cause: Any?): Boolean {
if (onCancelComplete) {
// make sure it is completing, if cancelMakeCompleting returns true it means it had make it
// completing and had recorded exception
if (cancelMakeCompleting(cause)) return true
// otherwise just record exception via makeCancelling below
}
return makeCancelling(cause)//main
}
// transitions to Cancelling state
// cause is Throwable or ParentJob when cancelChild was invoked
private fun makeCancelling(cause: Any?): Boolean {
var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
loopOnState { state ->
when (state) {
is Finishing -> { // already finishing -- collect exceptions
.....
return true
}
is Incomplete -> {
// Not yet finishing -- try to make it cancelling
val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
if (state.isActive) {
// active state becomes cancelling
if (tryMakeCancelling(state, causeException)) return true//main
} else {
// non active state starts completing
when (tryMakeCompleting(state, CompletedExceptionally(causeException), mode = MODE_ATOMIC_DEFAULT)) {
COMPLETING_ALREADY_COMPLETING -> error("Cannot happen in $state")
COMPLETING_COMPLETED, COMPLETING_WAITING_CHILDREN -> return true // ok
COMPLETING_RETRY -> return@loopOnState
else -> error("unexpected result")
}
}
}
else -> return false // already complete
}
}
}
// try make new Cancelling state on the condition that we're still in the expected state
private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean {
assert { state !is Finishing } // only for non-finishing states
assert { state.isActive } // only for active states
// get state's list or else promote to list to correctly operate on child lists
val list = getOrPromoteCancellingList(state) ?: return false//main
// Create cancelling state (with rootCause!)
val cancelling = Finishing(list, false, rootCause)
if (!_state.compareAndSet(state, cancelling)) return false
// Notify listeners
notifyCancelling(list, rootCause)//main
return true
}
// Performs promotion of incomplete coroutine state to NodeList for the purpose of
// converting coroutine state to Cancelling, returns null when need to retry
private fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list
private fun notifyCancelling(list: NodeList, cause: Throwable) {
// first cancel our own children
onCancelling(cause)
notifyHandlers<JobCancellingNode<*>>(list, cause)//main
// then cancel parent
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}
private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
var exception: Throwable? = null
list.forEach<T> { node ->
try {
node.invoke(cause)//main
} catch (ex: Throwable) {
exception?.apply { addSuppressedThrowable(ex) } ?: run {
exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
}
}
}
exception?.let { handleOnCompletionException(it) }
}
// Same as ChildHandleNode, but for cancellable continuation
internal class ChildContinuation(
parent: Job,
@JvmField val child: CancellableContinuationImpl<*>
) : JobCancellingNode<Job>(parent) {
override fun invoke(cause: Throwable?) {
child.cancel(child.getContinuationCancellationCause(job))//main
}
override fun toString(): String =
"ChildContinuation[$child]"
}
/**
* It is used when parent is cancelled to get the cancellation cause for this continuation.
*/
open fun getContinuationCancellationCause(parent: Job): Throwable =
parent.getCancellationException()
//commonMain/CancellableContinuationImpl.kt
public override fun cancel(cause: Throwable?): Boolean {
_state.loop { state ->
if (state !is NotCompleted) return false // false if already complete or cancelling
// Active -- update to final state
val update = CancelledContinuation(this, cause, handled = state is CancelHandler)
if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
// Invoke cancel handler if it was present
if (state is CancelHandler) invokeHandlerSafely { state.invoke(cause) }
// Complete state update
disposeParentHandle()
dispatchResume(mode = MODE_ATOMIC_DEFAULT)//main
return true
}
}
private fun dispatchResume(mode: Int) {
if (tryResume()) return // completed before getResult invocation -- bail out
// otherwise, getResult has already commenced, i.e. completed later or in other thread
dispatch(mode)
}
internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
val delegate = this.delegate
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
// dispatch directly using this instance's Runnable implementation
val dispatcher = delegate.dispatcher
val context = delegate.context
if (dispatcher.isDispatchNeeded(context)) {
dispatcher.dispatch(context, this)
} else {
resumeUnconfined()
}
} else {
resume(delegate, mode)//main
}
}
internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: Int) {
// slow-path - use delegate
val state = takeState()
val exception = getExceptionalResult(state)
if (exception != null) {
/*
* Recover stacktrace for non-dispatched tasks.
* We usually do not recover stacktrace in a `resume` as all resumes go through `DispatchedTask.run`
* and we recover stacktraces there, but this is not the case for a `suspend fun main()` that knows nothing about
* kotlinx.coroutines and DispatchedTask
*/
val recovered = if (delegate is DispatchedTask<*>) exception else recoverStackTrace(exception, delegate)
delegate.resumeWithExceptionMode(recovered, useMode)//main
} else {
delegate.resumeMode(getSuccessfulResult(state), useMode)
}
}
internal fun <T> Continuation<T>.resumeWithExceptionMode(exception: Throwable, mode: Int) {
when (mode) {
MODE_ATOMIC_DEFAULT -> resumeWithException(exception)//main
MODE_CANCELLABLE -> resumeCancellableWithException(exception)
MODE_DIRECT -> resumeDirectWithException(exception)
MODE_UNDISPATCHED -> (this as DispatchedContinuation).resumeUndispatchedWithException(exception)
MODE_IGNORE -> {}
else -> error("Invalid mode $mode")
}
}
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC_DEFAULT
dispatcher.dispatch(context, this)//main
} else {
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)//main
}
}
}
}
后续在dispatchedtask.run走异常resume流程
之后:
nternal abstract class BaseContinuationImpl(
// This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
// it has a public getter (since even untrusted code is allowed to inspect its call stack).
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result//cancel时为Failure(kotlinx.coroutines.JobCancellationException
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)//Failure类型的param会导致ResultKt.throwOnFailure($result)抛异常
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
//cancel时为kotlinx.coroutines.JobCancellationException
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)//后续流程和非cancel异常处理类似,
//主要区别1:仅cancel时会notifyCancelling
//2.cancelParent(finalException) || handleJobException(finalException)时由于cancelParent为true,
//不会走handleJobException的异常处理流程
return
}
}
}
https://www.kotlincn.net/docs/reference/coroutines/cancellation-and-timeouts.html