🌊 Flow
📖 什么是Flow?
Flow是一个异步地按顺序返回多个值的数据流。可以把它想象成suspend函数的列表版本!
💡 基本概念
单个值 vs 多个值
// suspend - 一个值
suspend fun fetchData(): String {
delay(1000)
return "数据"
}
// Flow - 多个值
fun fetchDataStream(): Flow<String> = flow {
emit("数据1")
delay(500)
emit("数据2")
delay(500)
emit("数据3")
}
fun main() = runBlocking {
// 单个值
println(fetchData())
// 多个值
fetchDataStream().collect { value ->
println(value)
}
}
Flow创建
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// 1. flow构建器
val flow1 = flow {
emit(1)
emit(2)
emit(3)
}
// 2. flowOf
val flow2 = flowOf(1, 2, 3)
// 3. asFlow
val flow3 = listOf(1, 2, 3).asFlow()
flow1.collect { println(it) }
}
🎯 实战示例
实时数据
fun tickerFlow(): Flow<Int> = flow {
var counter = 0
while (counter < 5) {
emit(counter++)
delay(1000)
}
}
fun main() = runBlocking {
tickerFlow().collect { value ->
println("$value 秒经过")
}
}
数据库变化监测
data class User(val id: Int, val name: String)
fun observeUsers(): Flow<List<User>> = flow {
repeat(3) { i ->
delay(1000)
val users = listOf(
User(i * 2, "用户${i * 2}"),
User(i * 2 + 1, "用户${i * 2 + 1}")
)
emit(users)
}
}
fun main() = runBlocking {
observeUsers().collect { users ->
println("更新的用户:")
users.forEach { println(" - ${it.name}") }
}
}
传感器数据
fun sensorData(): Flow<Double> = flow {
repeat(10) {
val reading = (20..30).random() + Math.random()
emit(reading)
delay(500)
}
}
fun main() = runBlocking {
sensorData().collect { temperature ->
println("温度: ${"%.1f".format(temperature)}°C")
}
}
🔧 Flow运算符
map - 转换
fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.map { it * it } // 平方
.collect { println(it) }
// 1, 4, 9, 16, 25
}
filter - 过滤
fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.filter { it % 2 == 0 } // 仅偶数
.collect { println(it) }
// 2, 4
}
transform - 复杂转换
fun main() = runBlocking {
flowOf(1, 2, 3)
.transform { value ->
emit("转换: $value")
emit("平方: ${value * value}")
}
.collect { println(it) }
}
take - 数量限制
fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.take(3) // 仅前3个
.collect { println(it) }
// 1, 2, 3
}