跳至正文

🌊 Flow

📖 什么是Flow?

Flow是一个异步地按顺序返回多个值的数据流。可以把它想象成suspend函数的列表版本!

💡 基本概念

单个值 vs 多个值

// suspend - 一个值
suspend fun fetchData(): String {
delay(1000)
return "数据"
}

// Flow - 多个值
fun fetchDataStream(): Flow<String> = flow {
emit("数据1")
delay(500)
emit("数据2")
delay(500)
emit("数据3")
}

fun main() = runBlocking {
// 单个值
println(fetchData())

// 多个值
fetchDataStream().collect { value ->
println(value)
}
}

Flow创建

import kotlinx.coroutines.flow.*

fun main() = runBlocking {
// 1. flow构建器
val flow1 = flow {
emit(1)
emit(2)
emit(3)
}

// 2. flowOf
val flow2 = flowOf(1, 2, 3)

// 3. asFlow
val flow3 = listOf(1, 2, 3).asFlow()

flow1.collect { println(it) }
}

🎯 实战示例

实时数据

fun tickerFlow(): Flow<Int> = flow {
var counter = 0
while (counter < 5) {
emit(counter++)
delay(1000)
}
}

fun main() = runBlocking {
tickerFlow().collect { value ->
println("$value 秒经过")
}
}

数据库变化监测

data class User(val id: Int, val name: String)

fun observeUsers(): Flow<List<User>> = flow {
repeat(3) { i ->
delay(1000)
val users = listOf(
User(i * 2, "用户${i * 2}"),
User(i * 2 + 1, "用户${i * 2 + 1}")
)
emit(users)
}
}

fun main() = runBlocking {
observeUsers().collect { users ->
println("更新的用户:")
users.forEach { println(" - ${it.name}") }
}
}

传感器数据

fun sensorData(): Flow<Double> = flow {
repeat(10) {
val reading = (20..30).random() + Math.random()
emit(reading)
delay(500)
}
}

fun main() = runBlocking {
sensorData().collect { temperature ->
println("温度: ${"%.1f".format(temperature)}°C")
}
}

🔧 Flow运算符

map - 转换

fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.map { it * it } // 平方
.collect { println(it) }
// 1, 4, 9, 16, 25
}

filter - 过滤

fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.filter { it % 2 == 0 } // 仅偶数
.collect { println(it) }
// 2, 4
}

transform - 复杂转换

fun main() = runBlocking {
flowOf(1, 2, 3)
.transform { value ->
emit("转换: $value")
emit("平方: ${value * value}")
}
.collect { println(it) }
}

take - 数量限制

fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.take(3) // 仅前3个
.collect { println(it) }
// 1, 2, 3
}

🎨 高级运算符

buffer - 缓冲

fun main() = runBlocking {
val time = measureTimeMillis {
flow {
repeat(3) {
delay(100)
emit(it)
}
}
.buffer() // 通过缓冲提升性能
.collect {
delay(300)
println(it)
}
}
println("耗时: ${time}ms")
}

conflate - 仅最新值

fun main() = runBlocking {
flow {
repeat(5) {
delay(100)
emit(it)
}
}
.conflate() // 如果收集者较慢则跳过中间值
.collect {
delay(300)
println("收集: $it")
}
}

zip - 结合

fun main() = runBlocking {
val numbers = flowOf(1, 2, 3)
val strings = flowOf("一", "二", "三")

numbers.zip(strings) { num, str ->
"$num - $str"
}.collect { println(it) }
// 1 - 一
// 2 - 二
// 3 - 三
}

combine - 组合

fun main() = runBlocking {
val flow1 = flow {
emit("A")
delay(500)
emit("B")
}

val flow2 = flow {
emit(1)
delay(200)
emit(2)
delay(200)
emit(3)
}

flow1.combine(flow2) { a, b ->
"$a$b"
}.collect { println(it) }
}

🔥 实用模式

搜索功能

fun searchQuery(query: String): Flow<List<String>> = flow {
delay(300) // 防抖
val results = listOf(
"${query}1",
"${query}2",
"${query}3"
).filter { it.contains(query) }
emit(results)
}

fun main() = runBlocking {
flowOf("kot", "kotlin", "kotli")
.flatMapLatest { query ->
searchQuery(query)
}
.collect { results ->
println("搜索结果: $results")
}
}

分页

fun loadPage(page: Int): Flow<List<String>> = flow {
delay(500)
val items = List(10) { "项目${page * 10 + it}" }
emit(items)
}

fun main() = runBlocking {
flowOf(1, 2, 3)
.flatMapConcat { page ->
loadPage(page)
}
.collect { items ->
println("加载页面: $items")
}
}

错误处理

fun riskyFlow(): Flow<Int> = flow {
emit(1)
emit(2)
throw Exception("发生错误!")
emit(3)
}

fun main() = runBlocking {
riskyFlow()
.catch { e ->
println("捕获错误: ${e.message}")
emit(-1) // 默认值
}
.collect { println("值: $it") }
}

重试

var attemptCount = 0

fun unstableFlow(): Flow<String> = flow {
attemptCount++
if (attemptCount < 3) {
throw Exception("临时错误")
}
emit("成功!")
}

fun main() = runBlocking {
unstableFlow()
.retry(3) { cause ->
println("重试... (${cause.message})")
delay(1000)
true
}
.collect { println(it) }
}

🛠️ StateFlow和SharedFlow

StateFlow - 状态管理

class Counter {
private val _count = MutableStateFlow(0)
val count: StateFlow<Int> = _count

fun increment() {
_count.value++
}
}

fun main() = runBlocking {
val counter = Counter()

launch {
counter.count.collect { value ->
println("计数: $value")
}
}

delay(100)
counter.increment()
delay(100)
counter.increment()
delay(100)
}

SharedFlow - 事件

class EventBus {
private val _events = MutableSharedFlow<String>()
val events: SharedFlow<String> = _events

suspend fun emit(event: String) {
_events.emit(event)
}
}

fun main() = runBlocking {
val bus = EventBus()

// 订阅者1
launch {
bus.events.collect { event ->
println("订阅者1: $event")
}
}

// 订阅者2
launch {
bus.events.collect { event ->
println("订阅者2: $event")
}
}

delay(100)
bus.emit("事件1")
delay(100)
bus.emit("事件2")
delay(100)
}

🤔 常见问题

Q1. Flow何时执行?

A: 调用collect时执行(冷流)!

fun main() = runBlocking {
val flow = flow {
println("Flow 开始!")
emit(1)
}

println("Flow 创建")
delay(1000)

println("调用 collect")
flow.collect { println(it) }
}
// Flow 创建
// (等待1秒)
// 调用 collect
// Flow 开始!
// 1

Q2. List和Flow的区别是?

A: List是立即的,Flow是渐进的!

fun main() = runBlocking {
// List - 所有数据在内存中
val list = listOf(1, 2, 3)

// Flow - 需要时才生成
val flow = flow {
repeat(3) {
delay(1000)
emit(it)
}
}
}

Q3. 多次收集Flow会怎样?

A: 每次都会重新执行!

fun main() = runBlocking {
val flow = flow {
println("执行!")
emit(1)
}

flow.collect { println("第一次收集: $it") }
flow.collect { println("第二次收集: $it") }
}
// 执行!
// 第一次收集: 1
// 执行!
// 第二次收集: 1

🎬 总结

用Flow实现响应式编程!

核心要点:
✅ 异步数据流
✅ 冷流(收集时执行)
✅ map、filter等运算符
✅ 用StateFlow管理状态
✅ 用SharedFlow传递事件

下一步: 在Context和Dispatcher中了解协程执行环境!