Kotlin Flow Test
Start
Code
suspend fun main(args : Array<String>){
val flow = MutableSharedFlow<Int>()
GlobalScope.launch {
flow.collect {
println("A: $it")
}
}
GlobalScope.launch {
flow.collect {
println("B: $it")
delay(2000)
}
}
GlobalScope.async {
var i = 0
while (i < 50) {
flow.emit(i++)
delay(200)
}
}.await()
}
Output
A: 1
B: 1
# 2s delay
A: 2
B: 2
# 2s delay
A: 3
B: 3
# 2s delay
Conclusion
MutableSharedFlow
is ahot
flow, which means it will emit the value to all the collectors.- All collectors of
MutableSharedFlow
are in the same coroutine when receiving values, which means that once one collector is blocked (delay
), no other collector will receive the value!
Test and Verify
Code
suspend fun main() {
val flow = MutableSharedFlow<Int>()
GlobalScope.launch {
flow.collect {
println("A: $it")
println("A - Thread: " + Thread.currentThread().name)
}
}
GlobalScope.launch {
flow.collect {
println("B: $it")
println("B - Thread: " + Thread.currentThread().name)
delay(2000)
}
}
GlobalScope.async {
var i = 0
while (i < 50) {
flow.emit(i++)
delay(200)
}
}.await()
}
Output
A: 1
B: 1
A - Thread: DefaultDispatcher-worker-1
B - Thread: DefaultDispatcher-worker-3
A: 2
A - Thread: DefaultDispatcher-worker-2
B: 2
B - Thread: DefaultDispatcher-worker-2
A: 3
A - Thread: DefaultDispatcher-worker-3
B: 3
B - Thread: DefaultDispatcher-worker-3
Conclusion
- All collectors are executed in the same coroutine and are not in the same coroutine as the
emit
's coroutine. - The first time it was a different coroutine was because the collector was not registered at the same time.
Overthrow
Code
suspend fun main() {
val flow = MutableSharedFlow<Int>()
GlobalScope.launch {
flow.collect {
println("A: $it")
println("A - Thread: " + Thread.currentThread().name)
}
}
GlobalScope.launch {
delay(1000)
flow.collect {
println("B: $it")
println("B - Thread: " + Thread.currentThread().name)
}
}
GlobalScope.async {
var i = 0
while (i < 100) {
flow.emit(i++)
delay(500)
}
}.await()
}
Output
A: 1
A - Thread: DefaultDispatcher-worker-3
B: 2
A: 2
A - Thread: DefaultDispatcher-worker-3
B - Thread: DefaultDispatcher-worker-1
B: 3
B - Thread: DefaultDispatcher-worker-4
A: 3
A - Thread: DefaultDispatcher-worker-2
B: 4
B - Thread: DefaultDispatcher-worker-3
A: 4
A - Thread: DefaultDispatcher-worker-3
B: 5
B - Thread: DefaultDispatcher-worker-2
A: 5
A - Thread: DefaultDispatcher-worker-3
B: 6
B - Thread: DefaultDispatcher-worker-1
A: 6
A - Thread: DefaultDispatcher-worker-2
B: 7
A: 7
A - Thread: DefaultDispatcher-worker-3
B - Thread: DefaultDispatcher-worker-4
A: 8
A - Thread: DefaultDispatcher-worker-2
B: 8
B - Thread: DefaultDispatcher-worker-4
B: 9
B - Thread: DefaultDispatcher-worker-2
A: 9
A - Thread: DefaultDispatcher-worker-4
B: 10
B - Thread: DefaultDispatcher-worker-4
A: 10
A - Thread: DefaultDispatcher-worker-3
B: 11
B - Thread: DefaultDispatcher-worker-3
A: 11
A - Thread: DefaultDispatcher-worker-2
Conclusion
- Each collector is indeed in different coroutines.
Why does blockage cause other collectors to be unable to receive properly?
It is highly likely that when updating the value to the collector, he actually initiated a coroutine. However, he forcibly waited for the coroutine to end before pushing the value to the next collector.
Solve
Immediately start a new coroutine
/switch context
after each collector updates its value, without affecting the next collector.