Kotlin Flow Test

Start

Code

suspend fun main(args : Array<String>){
    val flow = MutableSharedFlow<Int>()

    GlobalScope.launch {
        flow.collect {
            println("A: $it")
        }
    }

    GlobalScope.launch {
        flow.collect {
            println("B: $it")
            delay(2000)
        }
    }

    GlobalScope.async {
        var i = 0
        while (i < 50) {
            flow.emit(i++)
            delay(200)
        }
    }.await()
}

Output

A: 1
B: 1
# 2s delay
A: 2
B: 2
# 2s delay
A: 3
B: 3
# 2s delay

Conclusion

  • MutableSharedFlow is a hot flow, which means it will emit the value to all the collectors.
  • All collectors of MutableSharedFlow are in the same coroutine when receiving values, which means that once one collector is blocked (delay), no other collector will receive the value!

Test and Verify

Code

suspend fun main() {
    val flow = MutableSharedFlow<Int>()

    GlobalScope.launch {
        flow.collect {
            println("A: $it")
            println("A - Thread: " + Thread.currentThread().name)
        }
    }

    GlobalScope.launch {
        flow.collect {
            println("B: $it")
            println("B - Thread: " + Thread.currentThread().name)
            delay(2000)
        }
    }

    GlobalScope.async {
        var i = 0
        while (i < 50) {
            flow.emit(i++)
            delay(200)
        }
    }.await()
}

Output

A: 1
B: 1
A - Thread: DefaultDispatcher-worker-1
B - Thread: DefaultDispatcher-worker-3

A: 2
A - Thread: DefaultDispatcher-worker-2
B: 2
B - Thread: DefaultDispatcher-worker-2

A: 3
A - Thread: DefaultDispatcher-worker-3
B: 3
B - Thread: DefaultDispatcher-worker-3

Conclusion

  • All collectors are executed in the same coroutine and are not in the same coroutine as the emit's coroutine.
  • The first time it was a different coroutine was because the collector was not registered at the same time.

Overthrow

Code

suspend fun main() {
    val flow = MutableSharedFlow<Int>()

    GlobalScope.launch {
        flow.collect {
            println("A: $it")
            println("A - Thread: " + Thread.currentThread().name)
        }
    }

    GlobalScope.launch {
        delay(1000)
        flow.collect {
            println("B: $it")
            println("B - Thread: " + Thread.currentThread().name)
        }
    }

    GlobalScope.async {
        var i = 0
        while (i < 100) {
            flow.emit(i++)
            delay(500)
        }
    }.await()
}

Output

A: 1
A - Thread: DefaultDispatcher-worker-3
B: 2
A: 2
A - Thread: DefaultDispatcher-worker-3
B - Thread: DefaultDispatcher-worker-1
B: 3
B - Thread: DefaultDispatcher-worker-4
A: 3
A - Thread: DefaultDispatcher-worker-2
B: 4
B - Thread: DefaultDispatcher-worker-3
A: 4
A - Thread: DefaultDispatcher-worker-3
B: 5
B - Thread: DefaultDispatcher-worker-2
A: 5
A - Thread: DefaultDispatcher-worker-3
B: 6
B - Thread: DefaultDispatcher-worker-1
A: 6
A - Thread: DefaultDispatcher-worker-2
B: 7
A: 7
A - Thread: DefaultDispatcher-worker-3
B - Thread: DefaultDispatcher-worker-4
A: 8
A - Thread: DefaultDispatcher-worker-2
B: 8
B - Thread: DefaultDispatcher-worker-4
B: 9
B - Thread: DefaultDispatcher-worker-2
A: 9
A - Thread: DefaultDispatcher-worker-4
B: 10
B - Thread: DefaultDispatcher-worker-4
A: 10
A - Thread: DefaultDispatcher-worker-3
B: 11
B - Thread: DefaultDispatcher-worker-3
A: 11
A - Thread: DefaultDispatcher-worker-2

Conclusion

  • Each collector is indeed in different coroutines.

Why does blockage cause other collectors to be unable to receive properly?

It is highly likely that when updating the value to the collector, he actually initiated a coroutine. However, he forcibly waited for the coroutine to end before pushing the value to the next collector.

Solve

Immediately start a new coroutine/switch context after each collector updates its value, without affecting the next collector.