본문으로 건너뛰기

🌊 Flow

📖 Flow란?

Flow는 비동기적으로 여러 값을 순차적으로 반환하는 데이터 스트림입니다. 마치 suspend 함수의 리스트 버전이라고 생각하면 됩니다!

💡 기본 개념

단일 값 vs 여러 값

// suspend - 한 개의 값
suspend fun fetchData(): String {
delay(1000)
return "데이터"
}

// Flow - 여러 개의 값
fun fetchDataStream(): Flow<String> = flow {
emit("데이터1")
delay(500)
emit("데이터2")
delay(500)
emit("데이터3")
}

fun main() = runBlocking {
// 단일 값
println(fetchData())

// 여러 값
fetchDataStream().collect { value ->
println(value)
}
}

Flow 생성

import kotlinx.coroutines.flow.*

fun main() = runBlocking {
// 1. flow 빌더
val flow1 = flow {
emit(1)
emit(2)
emit(3)
}

// 2. flowOf
val flow2 = flowOf(1, 2, 3)

// 3. asFlow
val flow3 = listOf(1, 2, 3).asFlow()

flow1.collect { println(it) }
}

🎯 실전 예제

실시간 데이터

fun tickerFlow(): Flow<Int> = flow {
var counter = 0
while (counter < 5) {
emit(counter++)
delay(1000)
}
}

fun main() = runBlocking {
tickerFlow().collect { value ->
println("$value 초 경과")
}
}

데이터베이스 변화 감지

data class User(val id: Int, val name: String)

fun observeUsers(): Flow<List<User>> = flow {
repeat(3) { i ->
delay(1000)
val users = listOf(
User(i * 2, "사용자${i * 2}"),
User(i * 2 + 1, "사용자${i * 2 + 1}")
)
emit(users)
}
}

fun main() = runBlocking {
observeUsers().collect { users ->
println("업데이트된 사용자:")
users.forEach { println(" - ${it.name}") }
}
}

센서 데이터

fun sensorData(): Flow<Double> = flow {
repeat(10) {
val reading = (20..30).random() + Math.random()
emit(reading)
delay(500)
}
}

fun main() = runBlocking {
sensorData().collect { temperature ->
println("온도: ${"%.1f".format(temperature)}°C")
}
}

🔧 Flow 연산자

map - 변환

fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.map { it * it } // 제곱
.collect { println(it) }
// 1, 4, 9, 16, 25
}

filter - 필터링

fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.filter { it % 2 == 0 } // 짝수만
.collect { println(it) }
// 2, 4
}

transform - 복잡한 변환

fun main() = runBlocking {
flowOf(1, 2, 3)
.transform { value ->
emit("변환: $value")
emit("제곱: ${value * value}")
}
.collect { println(it) }
}

take - 개수 제한

fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.take(3) // 처음 3개만
.collect { println(it) }
// 1, 2, 3
}

🎨 고급 연산자

buffer - 버퍼링

fun main() = runBlocking {
val time = measureTimeMillis {
flow {
repeat(3) {
delay(100)
emit(it)
}
}
.buffer() // 버퍼링으로 성능 향상
.collect {
delay(300)
println(it)
}
}
println("소요 시간: ${time}ms")
}

conflate - 최신 값만

fun main() = runBlocking {
flow {
repeat(5) {
delay(100)
emit(it)
}
}
.conflate() // 느린 수집자가 있으면 중간 값 건너뛰기
.collect {
delay(300)
println("수집: $it")
}
}

zip - 결합

fun main() = runBlocking {
val numbers = flowOf(1, 2, 3)
val strings = flowOf("하나", "둘", "셋")

numbers.zip(strings) { num, str ->
"$num - $str"
}.collect { println(it) }
// 1 - 하나
// 2 - 둘
// 3 - 셋
}

combine - 조합

fun main() = runBlocking {
val flow1 = flow {
emit("A")
delay(500)
emit("B")
}

val flow2 = flow {
emit(1)
delay(200)
emit(2)
delay(200)
emit(3)
}

flow1.combine(flow2) { a, b ->
"$a$b"
}.collect { println(it) }
}

🔥 실용 패턴

검색 기능

fun searchQuery(query: String): Flow<List<String>> = flow {
delay(300) // 디바운스
val results = listOf(
"${query}1",
"${query}2",
"${query}3"
).filter { it.contains(query) }
emit(results)
}

fun main() = runBlocking {
flowOf("kot", "kotlin", "kotli")
.flatMapLatest { query ->
searchQuery(query)
}
.collect { results ->
println("검색 결과: $results")
}
}

페이지네이션

fun loadPage(page: Int): Flow<List<String>> = flow {
delay(500)
val items = List(10) { "아이템${page * 10 + it}" }
emit(items)
}

fun main() = runBlocking {
flowOf(1, 2, 3)
.flatMapConcat { page ->
loadPage(page)
}
.collect { items ->
println("페이지 로드: $items")
}
}

에러 처리

fun riskyFlow(): Flow<Int> = flow {
emit(1)
emit(2)
throw Exception("에러 발생!")
emit(3)
}

fun main() = runBlocking {
riskyFlow()
.catch { e ->
println("에러 잡음: ${e.message}")
emit(-1) // 기본값
}
.collect { println("값: $it") }
}

재시도

var attemptCount = 0

fun unstableFlow(): Flow<String> = flow {
attemptCount++
if (attemptCount < 3) {
throw Exception("일시적 오류")
}
emit("성공!")
}

fun main() = runBlocking {
unstableFlow()
.retry(3) { cause ->
println("재시도... (${cause.message})")
delay(1000)
true
}
.collect { println(it) }
}

🛠️ StateFlow와 SharedFlow

StateFlow - 상태 관리

class Counter {
private val _count = MutableStateFlow(0)
val count: StateFlow<Int> = _count

fun increment() {
_count.value++
}
}

fun main() = runBlocking {
val counter = Counter()

launch {
counter.count.collect { value ->
println("카운트: $value")
}
}

delay(100)
counter.increment()
delay(100)
counter.increment()
delay(100)
}

SharedFlow - 이벤트

class EventBus {
private val _events = MutableSharedFlow<String>()
val events: SharedFlow<String> = _events

suspend fun emit(event: String) {
_events.emit(event)
}
}

fun main() = runBlocking {
val bus = EventBus()

// 구독자 1
launch {
bus.events.collect { event ->
println("구독자1: $event")
}
}

// 구독자 2
launch {
bus.events.collect { event ->
println("구독자2: $event")
}
}

delay(100)
bus.emit("이벤트1")
delay(100)
bus.emit("이벤트2")
delay(100)
}

🤔 자주 묻는 질문

Q1. Flow는 언제 실행되나요?

A: collect를 호출할 때 실행됩니다 (Cold Stream)!

fun main() = runBlocking {
val flow = flow {
println("Flow 시작!")
emit(1)
}

println("Flow 생성")
delay(1000)

println("collect 호출")
flow.collect { println(it) }
}
// Flow 생성
// (1초 대기)
// collect 호출
// Flow 시작!
// 1

Q2. List와 Flow의 차이는?

A: List는 즉시, Flow는 점진적!

fun main() = runBlocking {
// List - 모든 데이터가 메모리에
val list = listOf(1, 2, 3)

// Flow - 필요할 때마다 생성
val flow = flow {
repeat(3) {
delay(1000)
emit(it)
}
}
}

Q3. Flow를 여러 번 수집하면?

A: 매번 새로 실행됩니다!

fun main() = runBlocking {
val flow = flow {
println("실행!")
emit(1)
}

flow.collect { println("첫 수집: $it") }
flow.collect { println("두 수집: $it") }
}
// 실행!
// 첫 수집: 1
// 실행!
// 두 수집: 1

🎬 마치며

Flow로 리액티브 프로그래밍을!

핵심 정리:
✅ 비동기 데이터 스트림
✅ Cold Stream (수집 시 실행)
✅ map, filter 등 연산자
✅ StateFlow로 상태 관리
✅ SharedFlow로 이벤트 전달

다음 단계: Context와 Dispatcher에서 코루틴 실행 환경을 알아보세요!