@UseExperimental(ExperimentalTypeInference::class)
fun <T> liveData(
context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT,
@BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)
sequenceDiagram
CoroutineLiveData->>CoroutineLiveData: init
activate CoroutineLiveData
CoroutineLiveData->>BlockRunner: new BlockRunner
BlockRunner->>CoroutineScope: new CoroutineScope
deactivate CoroutineLiveData
CoroutineLiveData->>CoroutineLiveData: onActive
activate CoroutineLiveData
CoroutineLiveData->>BlockRunner: blockRunner?.maybeRun()
BlockRunner->>CoroutineScope: runningJob = scope.launch
CoroutineScope->>CoroutineScope: block(liveDataScopeImpl)
deactivate CoroutineLiveData
CoroutineLiveData->>CoroutineLiveData: onInactive
activate CoroutineLiveData
CoroutineLiveData->>BlockRunner: blockRunner?.cancel()
BlockRunner->>CoroutineScope: cancellationJob = scope.launch
deactivate CoroutineLiveData
internal typealias Block<T> = suspend LiveDataScope<T>.() -> Unit
internal class CoroutineLiveData<T>(
context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT,
block: Block<T>
) : MediatorLiveData<T>() {
private var blockRunner: BlockRunner<T>?
init {
// use an intermediate supervisor job so that if we cancel individual block runs due to losing
// observers, it won't cancel the given context as we only cancel w/ the intention of possibly
// relaunching using the same parent context.
val supervisorJob = SupervisorJob(context[Job])
// The scope for this LiveData where we launch every block Job.
// We default to Main dispatcher but developer can override it.
// The supervisor job is added last to isolate block runs.
val scope = CoroutineScope(Dispatchers.Main.immediate + context + supervisorJob)
blockRunner = BlockRunner(
liveData = this,
block = block,
timeoutInMs = timeoutInMs,
scope = scope
) {
blockRunner = null
}
}
}
override fun onActive() {
super.onActive()
blockRunner?.maybeRun()
}
@MainThread
fun maybeRun() {
cancellationJob?.cancel()
cancellationJob = null
if (runningJob != null) {
return
}
runningJob = scope.launch {
val liveDataScope = LiveDataScopeImpl(liveData, coroutineContext)
block(liveDataScope)//block是suspend function,通过这种方式完成CoroutineScope到liveDataScope的闭包参数转换
onDone()
}
}
override fun onInactive() {
super.onInactive()
blockRunner?.cancel()
}
@MainThread
fun cancel() {
if (cancellationJob != null) {
error("Cancel call cannot happen without a maybeRun")
}
cancellationJob = scope.launch(Dispatchers.Main.immediate) {
delay(timeoutInMs)
if (!liveData.hasActiveObservers()) {
// one last check on active observers to avoid any race condition between starting
// a running coroutine and cancelation
runningJob?.cancel()
runningJob = null
}
}
}
interface LiveDataScope<T> {
suspend fun emit(value: T)
suspend fun emitSource(source: LiveData<T>): DisposableHandle
val latestValue: T?
}
internal class LiveDataScopeImpl<T>(
internal var target: CoroutineLiveData<T>,
context: CoroutineContext
) : LiveDataScope<T> {
override val latestValue: T?
get() = target.value
// use `liveData` provided context + main dispatcher to communicate with the target
// LiveData. This gives us main thread safety as well as cancellation cooperation
private val coroutineContext = context + Dispatchers.Main.immediate
}
override suspend fun emit(value: T) = withContext(coroutineContext) {
target.clearSource()
target.value = value//CoroutineLiveData<T>.setValue
}
graph LR
CoroutineLiveData-->|1: addSource|SourceLiveData
CoroutineLiveData-->|2: onActive|SourceLiveData
SourceLiveData-->|3: observe when coroutineLiveData onActive|observer-->|5:update when sourceLiveData onChanged|CoroutineLiveData
observer-->|4:I have observer,call my onAcitive|SourceLiveData
override suspend fun emitSource(source: LiveData<T>): DisposableHandle =
withContext(coroutineContext) {
return@withContext target.emitSource(source)
}
private var emittedSource: EmittedSource? = null
internal suspend fun emitSource(source: LiveData<T>): DisposableHandle {
clearSource()
val newSource = addDisposableSource(source)
emittedSource = newSource
return newSource
}
internal suspend fun clearSource() {
emittedSource?.disposeNow()
emittedSource = null
}
internal suspend fun <T> MediatorLiveData<T>.addDisposableSource(
source: LiveData<T>
): EmittedSource = withContext(Dispatchers.Main.immediate) {
addSource(source) {//call mediatorLiveData.addSource
value = it //call liveData.setValue
}
EmittedSource(
source = source,
mediator = this@addDisposableSource
)
}
https://developer.android.com/kotlin/ktx#livedata
https://github.com/android/architecture-components-samples/tree/main/LiveDataSample