跳至正文

🌊 Flow

📖 什麼是 Flow?

Flow 是一個非同步地依序返回多個值的資料串流。可以把它想像成 suspend 函數的列表版本!

💡 基本概念

單一值 vs 多個值

// suspend - 一個值
suspend fun fetchData(): String {
delay(1000)
return "資料"
}

// Flow - 多個值
fun fetchDataStream(): Flow<String> = flow {
emit("資料1")
delay(500)
emit("資料2")
delay(500)
emit("資料3")
}

fun main() = runBlocking {
// 單一值
println(fetchData())

// 多個值
fetchDataStream().collect { value ->
println(value)
}
}

建立 Flow

import kotlinx.coroutines.flow.*

fun main() = runBlocking {
// 1. flow 建構器
val flow1 = flow {
emit(1)
emit(2)
emit(3)
}

// 2. flowOf
val flow2 = flowOf(1, 2, 3)

// 3. asFlow
val flow3 = listOf(1, 2, 3).asFlow()

flow1.collect { println(it) }
}

🎯 實戰範例

即時資料

fun tickerFlow(): Flow<Int> = flow {
var counter = 0
while (counter < 5) {
emit(counter++)
delay(1000)
}
}

fun main() = runBlocking {
tickerFlow().collect { value ->
println("$value 秒經過")
}
}

資料庫變化偵測

data class User(val id: Int, val name: String)

fun observeUsers(): Flow<List<User>> = flow {
repeat(3) { i ->
delay(1000)
val users = listOf(
User(i * 2, "使用者${i * 2}"),
User(i * 2 + 1, "使用者${i * 2 + 1}")
)
emit(users)
}
}

fun main() = runBlocking {
observeUsers().collect { users ->
println("更新的使用者:")
users.forEach { println(" - ${it.name}") }
}
}

感測器資料

fun sensorData(): Flow<Double> = flow {
repeat(10) {
val reading = (20..30).random() + Math.random()
emit(reading)
delay(500)
}
}

fun main() = runBlocking {
sensorData().collect { temperature ->
println("溫度: ${"%.1f".format(temperature)}°C")
}
}

🔧 Flow 運算子

map - 轉換

fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.map { it * it } // 平方
.collect { println(it) }
// 1, 4, 9, 16, 25
}

filter - 過濾

fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.filter { it % 2 == 0 } // 只有偶數
.collect { println(it) }
// 2, 4
}

transform - 複雜轉換

fun main() = runBlocking {
flowOf(1, 2, 3)
.transform { value ->
emit("轉換: $value")
emit("平方: ${value * value}")
}
.collect { println(it) }
}

take - 限制數量

fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.take(3) // 只有前 3 個
.collect { println(it) }
// 1, 2, 3
}

🎨 進階運算子

buffer - 緩衝

fun main() = runBlocking {
val time = measureTimeMillis {
flow {
repeat(3) {
delay(100)
emit(it)
}
}
.buffer() // 透過緩衝提升效能
.collect {
delay(300)
println(it)
}
}
println("所需時間: ${time}ms")
}

conflate - 只取最新值

fun main() = runBlocking {
flow {
repeat(5) {
delay(100)
emit(it)
}
}
.conflate() // 如果有慢速收集者,跳過中間值
.collect {
delay(300)
println("收集: $it")
}
}

zip - 結合

fun main() = runBlocking {
val numbers = flowOf(1, 2, 3)
val strings = flowOf("一", "二", "三")

numbers.zip(strings) { num, str ->
"$num - $str"
}.collect { println(it) }
// 1 - 一
// 2 - 二
// 3 - 三
}

combine - 組合

fun main() = runBlocking {
val flow1 = flow {
emit("A")
delay(500)
emit("B")
}

val flow2 = flow {
emit(1)
delay(200)
emit(2)
delay(200)
emit(3)
}

flow1.combine(flow2) { a, b ->
"$a$b"
}.collect { println(it) }
}

🔥 實用模式

搜尋功能

fun searchQuery(query: String): Flow<List<String>> = flow {
delay(300) // 防抖
val results = listOf(
"${query}1",
"${query}2",
"${query}3"
).filter { it.contains(query) }
emit(results)
}

fun main() = runBlocking {
flowOf("kot", "kotlin", "kotli")
.flatMapLatest { query ->
searchQuery(query)
}
.collect { results ->
println("搜尋結果: $results")
}
}

分頁

fun loadPage(page: Int): Flow<List<String>> = flow {
delay(500)
val items = List(10) { "項目${page * 10 + it}" }
emit(items)
}

fun main() = runBlocking {
flowOf(1, 2, 3)
.flatMapConcat { page ->
loadPage(page)
}
.collect { items ->
println("頁面載入: $items")
}
}

錯誤處理

fun riskyFlow(): Flow<Int> = flow {
emit(1)
emit(2)
throw Exception("發生錯誤!")
emit(3)
}

fun main() = runBlocking {
riskyFlow()
.catch { e ->
println("捕獲錯誤: ${e.message}")
emit(-1) // 預設值
}
.collect { println("值: $it") }
}

重試

var attemptCount = 0

fun unstableFlow(): Flow<String> = flow {
attemptCount++
if (attemptCount < 3) {
throw Exception("暫時性錯誤")
}
emit("成功!")
}

fun main() = runBlocking {
unstableFlow()
.retry(3) { cause ->
println("重試... (${cause.message})")
delay(1000)
true
}
.collect { println(it) }
}

🛠️ StateFlow 與 SharedFlow

StateFlow - 狀態管理

class Counter {
private val _count = MutableStateFlow(0)
val count: StateFlow<Int> = _count

fun increment() {
_count.value++
}
}

fun main() = runBlocking {
val counter = Counter()

launch {
counter.count.collect { value ->
println("計數: $value")
}
}

delay(100)
counter.increment()
delay(100)
counter.increment()
delay(100)
}

SharedFlow - 事件

class EventBus {
private val _events = MutableSharedFlow<String>()
val events: SharedFlow<String> = _events

suspend fun emit(event: String) {
_events.emit(event)
}
}

fun main() = runBlocking {
val bus = EventBus()

// 訂閱者 1
launch {
bus.events.collect { event ->
println("訂閱者1: $event")
}
}

// 訂閱者 2
launch {
bus.events.collect { event ->
println("訂閱者2: $event")
}
}

delay(100)
bus.emit("事件1")
delay(100)
bus.emit("事件2")
delay(100)
}

🤔 常見問題

Q1. Flow 何時執行?

A: 在呼叫 collect 時執行(冷串流)!

fun main() = runBlocking {
val flow = flow {
println("Flow 開始!")
emit(1)
}

println("建立 Flow")
delay(1000)

println("呼叫 collect")
flow.collect { println(it) }
}
// 建立 Flow
// (等待 1 秒)
// 呼叫 collect
// Flow 開始!
// 1

Q2. List 與 Flow 的差異?

A: List 是立即的,Flow 是漸進的!

fun main() = runBlocking {
// List - 所有資料都在記憶體中
val list = listOf(1, 2, 3)

// Flow - 需要時才產生
val flow = flow {
repeat(3) {
delay(1000)
emit(it)
}
}
}

Q3. 多次收集 Flow 會怎樣?

A: 每次都會重新執行!

fun main() = runBlocking {
val flow = flow {
println("執行!")
emit(1)
}

flow.collect { println("第一次收集: $it") }
flow.collect { println("第二次收集: $it") }
}
// 執行!
// 第一次收集: 1
// 執行!
// 第二次收集: 1

🎬 結語

用 Flow 來進行反應式程式設計!

核心總結:
✅ 非同步資料串流
✅ 冷串流(收集時執行)
✅ map、filter 等運算子
✅ 用 StateFlow 管理狀態
✅ 用 SharedFlow 傳遞事件

下一步: 在 Context 與 Dispatcher 中了解協程執行環境!