Skip to main content

๐ŸŒŠ Flow

๐Ÿ“– What is Flow?โ€‹

Flow is a data stream that asynchronously returns multiple values sequentially. You can think of it as a list version of suspend functions!

๐Ÿ’ก Basic Conceptsโ€‹

Single Value vs Multiple Valuesโ€‹

// suspend - single value
suspend fun fetchData(): String {
delay(1000)
return "data"
}

// Flow - multiple values
fun fetchDataStream(): Flow<String> = flow {
emit("data1")
delay(500)
emit("data2")
delay(500)
emit("data3")
}

fun main() = runBlocking {
// Single value
println(fetchData())

// Multiple values
fetchDataStream().collect { value ->
println(value)
}
}

Creating Flowโ€‹

import kotlinx.coroutines.flow.*

fun main() = runBlocking {
// 1. flow builder
val flow1 = flow {
emit(1)
emit(2)
emit(3)
}

// 2. flowOf
val flow2 = flowOf(1, 2, 3)

// 3. asFlow
val flow3 = listOf(1, 2, 3).asFlow()

flow1.collect { println(it) }
}

๐ŸŽฏ Practical Examplesโ€‹

Real-time Dataโ€‹

fun tickerFlow(): Flow<Int> = flow {
var counter = 0
while (counter < 5) {
emit(counter++)
delay(1000)
}
}

fun main() = runBlocking {
tickerFlow().collect { value ->
println("$value seconds elapsed")
}
}

Database Change Detectionโ€‹

data class User(val id: Int, val name: String)

fun observeUsers(): Flow<List<User>> = flow {
repeat(3) { i ->
delay(1000)
val users = listOf(
User(i * 2, "User${i * 2}"),
User(i * 2 + 1, "User${i * 2 + 1}")
)
emit(users)
}
}

fun main() = runBlocking {
observeUsers().collect { users ->
println("Updated users:")
users.forEach { println(" - ${it.name}") }
}
}

Sensor Dataโ€‹

fun sensorData(): Flow<Double> = flow {
repeat(10) {
val reading = (20..30).random() + Math.random()
emit(reading)
delay(500)
}
}

fun main() = runBlocking {
sensorData().collect { temperature ->
println("Temperature: ${"%.1f".format(temperature)}ยฐC")
}
}

๐Ÿ”ง Flow Operatorsโ€‹

map - Transformationโ€‹

fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.map { it * it } // square
.collect { println(it) }
// 1, 4, 9, 16, 25
}

filter - Filteringโ€‹

fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.filter { it % 2 == 0 } // even numbers only
.collect { println(it) }
// 2, 4
}

transform - Complex Transformationโ€‹

fun main() = runBlocking {
flowOf(1, 2, 3)
.transform { value ->
emit("Transformed: $value")
emit("Squared: ${value * value}")
}
.collect { println(it) }
}

take - Limiting Countโ€‹

fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.take(3) // first 3 only
.collect { println(it) }
// 1, 2, 3
}

๐ŸŽจ Advanced Operatorsโ€‹

buffer - Bufferingโ€‹

fun main() = runBlocking {
val time = measureTimeMillis {
flow {
repeat(3) {
delay(100)
emit(it)
}
}
.buffer() // improve performance with buffering
.collect {
delay(300)
println(it)
}
}
println("Time taken: ${time}ms")
}

conflate - Latest Value Onlyโ€‹

fun main() = runBlocking {
flow {
repeat(5) {
delay(100)
emit(it)
}
}
.conflate() // skip intermediate values if collector is slow
.collect {
delay(300)
println("Collected: $it")
}
}

zip - Combiningโ€‹

fun main() = runBlocking {
val numbers = flowOf(1, 2, 3)
val strings = flowOf("One", "Two", "Three")

numbers.zip(strings) { num, str ->
"$num - $str"
}.collect { println(it) }
// 1 - One
// 2 - Two
// 3 - Three
}

combine - Compositionโ€‹

fun main() = runBlocking {
val flow1 = flow {
emit("A")
delay(500)
emit("B")
}

val flow2 = flow {
emit(1)
delay(200)
emit(2)
delay(200)
emit(3)
}

flow1.combine(flow2) { a, b ->
"$a$b"
}.collect { println(it) }
}

๐Ÿ”ฅ Practical Patternsโ€‹

Search Functionalityโ€‹

fun searchQuery(query: String): Flow<List<String>> = flow {
delay(300) // debounce
val results = listOf(
"${query}1",
"${query}2",
"${query}3"
).filter { it.contains(query) }
emit(results)
}

fun main() = runBlocking {
flowOf("kot", "kotlin", "kotli")
.flatMapLatest { query ->
searchQuery(query)
}
.collect { results ->
println("Search results: $results")
}
}

Paginationโ€‹

fun loadPage(page: Int): Flow<List<String>> = flow {
delay(500)
val items = List(10) { "Item${page * 10 + it}" }
emit(items)
}

fun main() = runBlocking {
flowOf(1, 2, 3)
.flatMapConcat { page ->
loadPage(page)
}
.collect { items ->
println("Page loaded: $items")
}
}

Error Handlingโ€‹

fun riskyFlow(): Flow<Int> = flow {
emit(1)
emit(2)
throw Exception("Error occurred!")
emit(3)
}

fun main() = runBlocking {
riskyFlow()
.catch { e ->
println("Error caught: ${e.message}")
emit(-1) // default value
}
.collect { println("Value: $it") }
}

Retryโ€‹

var attemptCount = 0

fun unstableFlow(): Flow<String> = flow {
attemptCount++
if (attemptCount < 3) {
throw Exception("Temporary error")
}
emit("Success!")
}

fun main() = runBlocking {
unstableFlow()
.retry(3) { cause ->
println("Retrying... (${cause.message})")
delay(1000)
true
}
.collect { println(it) }
}

๐Ÿ› ๏ธ StateFlow and SharedFlowโ€‹

StateFlow - State Managementโ€‹

class Counter {
private val _count = MutableStateFlow(0)
val count: StateFlow<Int> = _count

fun increment() {
_count.value++
}
}

fun main() = runBlocking {
val counter = Counter()

launch {
counter.count.collect { value ->
println("Count: $value")
}
}

delay(100)
counter.increment()
delay(100)
counter.increment()
delay(100)
}

SharedFlow - Eventsโ€‹

class EventBus {
private val _events = MutableSharedFlow<String>()
val events: SharedFlow<String> = _events

suspend fun emit(event: String) {
_events.emit(event)
}
}

fun main() = runBlocking {
val bus = EventBus()

// Subscriber 1
launch {
bus.events.collect { event ->
println("Subscriber1: $event")
}
}

// Subscriber 2
launch {
bus.events.collect { event ->
println("Subscriber2: $event")
}
}

delay(100)
bus.emit("Event1")
delay(100)
bus.emit("Event2")
delay(100)
}

๐Ÿค” Frequently Asked Questionsโ€‹

Q1. When does Flow execute?โ€‹

A: It executes when collect is called (Cold Stream)!

fun main() = runBlocking {
val flow = flow {
println("Flow started!")
emit(1)
}

println("Flow created")
delay(1000)

println("collect called")
flow.collect { println(it) }
}
// Flow created
// (wait 1 second)
// collect called
// Flow started!
// 1

Q2. What's the difference between List and Flow?โ€‹

A: List is immediate, Flow is progressive!

fun main() = runBlocking {
// List - all data in memory
val list = listOf(1, 2, 3)

// Flow - generated as needed
val flow = flow {
repeat(3) {
delay(1000)
emit(it)
}
}
}

Q3. What happens if I collect Flow multiple times?โ€‹

A: It executes anew each time!

fun main() = runBlocking {
val flow = flow {
println("Executing!")
emit(1)
}

flow.collect { println("First collection: $it") }
flow.collect { println("Second collection: $it") }
}
// Executing!
// First collection: 1
// Executing!
// Second collection: 1

๐ŸŽฌ Conclusionโ€‹

Reactive programming with Flow!

Key Points:
โœ… Asynchronous data streams
โœ… Cold Stream (executes on collection)
โœ… Operators like map, filter
โœ… State management with StateFlow
โœ… Event handling with SharedFlow

Next Step: Learn about coroutine execution environments in Context and Dispatcher!