🌊 Flow
📖 什麼是 Flow?
Flow 是一個非同步地依序返回多個值的資料串流。可以把它想像成 suspend 函數的列表版本!
💡 基本概念
單一值 vs 多個值
// suspend - 一個值
suspend fun fetchData(): String {
delay(1000)
return "資料"
}
// Flow - 多個值
fun fetchDataStream(): Flow<String> = flow {
emit("資料1")
delay(500)
emit("資料2")
delay(500)
emit("資料3")
}
fun main() = runBlocking {
// 單一值
println(fetchData())
// 多個值
fetchDataStream().collect { value ->
println(value)
}
}
建立 Flow
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// 1. flow 建構器
val flow1 = flow {
emit(1)
emit(2)
emit(3)
}
// 2. flowOf
val flow2 = flowOf(1, 2, 3)
// 3. asFlow
val flow3 = listOf(1, 2, 3).asFlow()
flow1.collect { println(it) }
}
🎯 實戰範例
即時資料
fun tickerFlow(): Flow<Int> = flow {
var counter = 0
while (counter < 5) {
emit(counter++)
delay(1000)
}
}
fun main() = runBlocking {
tickerFlow().collect { value ->
println("$value 秒經過")
}
}
資料庫變化偵測
data class User(val id: Int, val name: String)
fun observeUsers(): Flow<List<User>> = flow {
repeat(3) { i ->
delay(1000)
val users = listOf(
User(i * 2, "使用者${i * 2}"),
User(i * 2 + 1, "使用者${i * 2 + 1}")
)
emit(users)
}
}
fun main() = runBlocking {
observeUsers().collect { users ->
println("更新的使用者:")
users.forEach { println(" - ${it.name}") }
}
}
感測器資料
fun sensorData(): Flow<Double> = flow {
repeat(10) {
val reading = (20..30).random() + Math.random()
emit(reading)
delay(500)
}
}
fun main() = runBlocking {
sensorData().collect { temperature ->
println("溫度: ${"%.1f".format(temperature)}°C")
}
}
🔧 Flow 運算子
map - 轉換
fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.map { it * it } // 平方
.collect { println(it) }
// 1, 4, 9, 16, 25
}
filter - 過濾
fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.filter { it % 2 == 0 } // 只有偶數
.collect { println(it) }
// 2, 4
}
transform - 複雜轉換
fun main() = runBlocking {
flowOf(1, 2, 3)
.transform { value ->
emit("轉換: $value")
emit("平方: ${value * value}")
}
.collect { println(it) }
}
take - 限制數量
fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.take(3) // 只有前 3 個
.collect { println(it) }
// 1, 2, 3
}
🎨 進階運算子
buffer - 緩衝
fun main() = runBlocking {
val time = measureTimeMillis {
flow {
repeat(3) {
delay(100)
emit(it)
}
}
.buffer() // 透過緩衝提升效能
.collect {
delay(300)
println(it)
}
}
println("所需時間: ${time}ms")
}
conflate - 只取最新值
fun main() = runBlocking {
flow {
repeat(5) {
delay(100)
emit(it)
}
}
.conflate() // 如果有慢速收集者,跳過中間值
.collect {
delay(300)
println("收集: $it")
}
}