LiveDataCoroutine

@UseExperimental(ExperimentalTypeInference::class)
fun <T> liveData(
    context: CoroutineContext = EmptyCoroutineContext,
    timeoutInMs: Long = DEFAULT_TIMEOUT,
    @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)

CoroutineLiveData

图解

sequenceDiagram

CoroutineLiveData->>CoroutineLiveData: init
activate CoroutineLiveData

CoroutineLiveData->>BlockRunner: new BlockRunner

BlockRunner->>CoroutineScope: new CoroutineScope
deactivate CoroutineLiveData

CoroutineLiveData->>CoroutineLiveData: onActive
activate CoroutineLiveData
CoroutineLiveData->>BlockRunner: blockRunner?.maybeRun()

BlockRunner->>CoroutineScope: runningJob = scope.launch

CoroutineScope->>CoroutineScope: block(liveDataScopeImpl)
deactivate CoroutineLiveData

CoroutineLiveData->>CoroutineLiveData: onInactive
activate CoroutineLiveData

CoroutineLiveData->>BlockRunner: blockRunner?.cancel()

BlockRunner->>CoroutineScope: cancellationJob = scope.launch
deactivate CoroutineLiveData
internal typealias Block<T> = suspend LiveDataScope<T>.() -> Unit
internal class CoroutineLiveData<T>(
    context: CoroutineContext = EmptyCoroutineContext,
    timeoutInMs: Long = DEFAULT_TIMEOUT,
    block: Block<T>
) : MediatorLiveData<T>() {
    private var blockRunner: BlockRunner<T>?

    init {
        // use an intermediate supervisor job so that if we cancel individual block runs due to losing
        // observers, it won't cancel the given context as we only cancel w/ the intention of possibly
        // relaunching using the same parent context.
        val supervisorJob = SupervisorJob(context[Job])

        // The scope for this LiveData where we launch every block Job.
        // We default to Main dispatcher but developer can override it.
        // The supervisor job is added last to isolate block runs.
        val scope = CoroutineScope(Dispatchers.Main.immediate + context + supervisorJob)
        blockRunner = BlockRunner(
            liveData = this,
            block = block,
            timeoutInMs = timeoutInMs,
            scope = scope
        ) {
            blockRunner = null
        }
    }
}

onActive

override fun onActive() {
    super.onActive()
    blockRunner?.maybeRun()
}

BlockRunner.maybeRun

@MainThread
fun maybeRun() {
    cancellationJob?.cancel()
    cancellationJob = null
    if (runningJob != null) {
        return
    }
    runningJob = scope.launch {
        val liveDataScope = LiveDataScopeImpl(liveData, coroutineContext)
        block(liveDataScope)//block是suspend function,通过这种方式完成CoroutineScope到liveDataScope的闭包参数转换
        onDone()
    }
}

onInactive

override fun onInactive() {
    super.onInactive()
    blockRunner?.cancel()
}

BlockRunner.cancel

@MainThread
fun cancel() {
    if (cancellationJob != null) {
        error("Cancel call cannot happen without a maybeRun")
    }
    cancellationJob = scope.launch(Dispatchers.Main.immediate) {
        delay(timeoutInMs)
        if (!liveData.hasActiveObservers()) {
            // one last check on active observers to avoid any race condition between starting
            // a running coroutine and cancelation
            runningJob?.cancel()
            runningJob = null
        }
    }
}

LiveDataScope

interface LiveDataScope<T> {
  suspend fun emit(value: T)
  suspend fun emitSource(source: LiveData<T>): DisposableHandle
  val latestValue: T?
}
internal class LiveDataScopeImpl<T>(
    internal var target: CoroutineLiveData<T>,
    context: CoroutineContext
) : LiveDataScope<T> {

    override val latestValue: T?
        get() = target.value

    // use `liveData` provided context + main dispatcher to communicate with the target
    // LiveData. This gives us main thread safety as well as cancellation cooperation
    private val coroutineContext = context + Dispatchers.Main.immediate

}

emit(value: T)

override suspend fun emit(value: T) = withContext(coroutineContext) {
    target.clearSource()
    target.value = value//CoroutineLiveData<T>.setValue
}

emitSource(source: LiveData)

图解

graph LR
CoroutineLiveData-->|1: addSource|SourceLiveData
CoroutineLiveData-->|2: onActive|SourceLiveData
SourceLiveData-->|3: observe when coroutineLiveData onActive|observer-->|5:update when sourceLiveData onChanged|CoroutineLiveData
observer-->|4:I have observer,call my onAcitive|SourceLiveData
 override suspend fun emitSource(source: LiveData<T>): DisposableHandle =
        withContext(coroutineContext) {
            return@withContext target.emitSource(source)
        }

CoroutineLiveData.emitSource

private var emittedSource: EmittedSource? = null

internal suspend fun emitSource(source: LiveData<T>): DisposableHandle {
    clearSource()
    val newSource = addDisposableSource(source)
    emittedSource = newSource
    return newSource
}
internal suspend fun clearSource() {
    emittedSource?.disposeNow()
    emittedSource = null
}
internal suspend fun <T> MediatorLiveData<T>.addDisposableSource(
    source: LiveData<T>
): EmittedSource = withContext(Dispatchers.Main.immediate) {
    addSource(source) {//call mediatorLiveData.addSource
        value = it //call liveData.setValue
    }
    EmittedSource(
        source = source,
        mediator = this@addDisposableSource
    )
}

参考

https://developer.android.com/kotlin/ktx#livedata

https://github.com/android/architecture-components-samples/tree/main/LiveDataSample