环境介绍
本文所使用的运行环境如下:
- Kotlin 版本:1.9.10
- Coroutine 版本:1.7.3
- OpenJDK 版本:17.0.7+10-b829.16 x86_64
- IDE:IntelliJ IDEA 2023.2 (Ultimate Edition)
术语翻译
- flow:流
- emit:发送,发射(本文统一翻译成“发送”)
- emitter:发射器
- collect:收集
- collector:收集器
- terminal flow operators:末端流操作符,终端流操作符(本文统一翻译成“末端流操作符”)
- intermediate flow operators:中端流操作符,中间流操作符,过渡流操作符(本文统一翻译成“中端流操作符”)
- structured concurrency:结构化并发
思维导图
定义
Flow 是一种类似于序列的冷流,是序列的异步版本,这是一种收集类型,其中的值是逐个生成的。与序列一样,只有需要某个值时,Flow 才会根据需要生成该值,而且 Flow 可以包含无限数量的值。
Flow 函数定义如下,就只是表示这个类能够被订阅收集:
1 | public interface Flow<out T> { |
而其中的 FlowCollector 则定义了发送数据的功能:
1 | public fun interface FlowCollector<in T> { |
Flow 全面支持协程。这意味着您可以使用协程构建、转换和耗用 Flow。您还可以控制并发,即利用 Flow 通过声明的方式协调多个协程的执行。
Flow 是值的异步序列
Flow通过异步操作(例如网络请求、数据库调用或其他异步代码),一次生成一个值(而不是一次生成所有值)。它通过其 API 支持协程,因此您也可以使用协程来转换 flow。
在基础 Kotlin 协程中,
launch启动的协程没有返回值,运行后就结束。async启动的协程,通过await函数获取协程的返回值,但只能返回单个值,且无法进行数据流操作。
而 Flow 的出现,恰好弥补了 Kotlin 协程对于多个值异步运算的不足,并且允许进行复杂的数据流操作。
Flow 如何运行
先看下面的示例:
1 | fun makeFlow() = flow { |
运行上面的代码会输出以下内容:
1 | sending first value |
可以看到 collect lambda 与 flow 构建器交替执行。每次 flow 构建器调用 emit 时,它都会 suspends,直到元素完全处理为止。当从流中请求另一个值时,它会从上次停止的位置 resume,直到它再次调用 emit。flow 构建器执行完成后,流将被取消,同时 collect 恢复,从而允许调用协程输出 “flow is completed”。
对 collect 的调用非常重要。流使用挂起操作符(也被称为末端操作符)(例如 collect),而不是公开 Iterator,以便始终知晓流何时在被主动耗用。更重要的是,它可以在调用方无法再请求更多值时获知消息,以便清理资源。
Flow完全是使用协程构建的。通过使用协程的suspend和resume机制,可以将生产方 (Flow) 的执行与使用方 (collect) 同步。
Flow 何时运行
当 collect 操作符运行时,Flow 才会开始运行。通过调用 flow 构建器或其它 API 来创建新的 Flow 后,并不会立刻执行工作。挂起操作符 collect 在 Flow 中被称为末端操作符。还有很多其它末端操作符,例如 kotlinx-coroutines 附带的 toList、first 和 single 等。
默认情况下,Flow 将在以下情况下执行:
- 每次调用末端操作符时(且每个新调用均与之前启动的任何调用无关)
- 直到运行
Flow的协程被取消 - 当上一个值已完全处理,并且又请求了另一个值时
这些规则是 Flow 的默认行为,可以创建一个 Flow,该 Flow 可以与先前运行的 Flow 共享状态,在每个末端操作符之间不会重新启动,并且通过内置或自定义的 Flow 转换在收集过程中独立执行。
执行 Flow 的过程称为收集 Flow。默认情况下,Flow 在被收集之前(即应用任何末端操作符)不会执行任何操作。
由于存在以上这些规则,所以 Flow 可以参与结构化并发,并且可以安全地从 Flow 启动长时间运行的协程。Flow 不会泄露资源,因为在调用方被取消时,系统始终会按照协程合作取消规则清理这些资源。
Flow supports structured concurrency
Because a flow allows you to consume values only with terminal operators, it can support structured concurrency.
When the consumer of a flow is cancelled, the entire
Flowis cancelled. Due to structured concurrency, it is impossible to leak a coroutine from an intermediate step
我们使用 take 操作符修改一下上面的示例,以便仅查看前两个元素,然后再收集该 Flow 两次:
1 | import kotlinx.coroutines.flow.collect |
程序运行结果如下:
1 | first collection |
每次调用 collect 时,flow lambda 都会从头开始执行。如果 Flow 执行高成本的工作(例如发出网络请求),则这一点非常重要。此外,由于我们应用了 take(2) 操作符,因此该 flow 将只生成两个值。第二次调用 emit 后,将不会再次恢复 flow lambda,因此将不会再输出“second value collected…”这一行。
默认情况下,每次应用末端操作符时,
Flow都会从头重新开始执行。如果Flow执行高成本的工作(如发出网络请求),则这一点非常重要。
Flow 构建器
创建 Flow 数据流最简单的方法是使用 flow 构建器:
1 | public fun <T> flow( |
其参数使用 suspend 修饰,是以 FlowCollector 作为接收者的挂起函数,能直接调用 emit 发送数据。
点击右上角运行按钮可以查看程序运行结果。
simple 函数会快速返回,并且不会等待任何内容。每次收集( collect) 流时,流都会重新开始运行。这就是为什么每次调用 collect 时都会输出 “Flow started”。
根据上述代码我们还可知如下内容:
- Flow 类型的构建器函数名为
flow。 flow { ... }构建块中的代码可以挂起。- 函数
simple不需要使用suspend修饰符。 - 流使用
emit函数 发射 值。 - 流使用
collect函数 收集 值。
其它创建 Flow 的方式还有很多,例如 asFlow、flowOf 等。
此外,官方还提供了很多将类型转换到流的扩展函数。大家可以查看下官方文档。
说明:
由
flow函数的内部定义我们可以看出,block是由suspend修饰的,所以内部可以调用挂起函数。flow代码块的emit函数是线程不安全的,所以flow函数的不能修改协程上下文,无法调用如withContext等函数,避免下游collect被调度到其他线程。如果要修改数据流的协程调度,只能调用
flowOn函数。
Flow 的取消机制
Flow 的取消机制与协程的一致。在可取消挂起函数(例如: delay)中,Flow 的 collect 操作是可以被取消的。
示例:
点击右上角运行按钮可以查看程序运行结果。
通过运行结果可以看出,程序运行 250ms 后,Flow 直接被取消了,Emitting 3 及 3 并没有输出。
通过 flow 构建器创建的 Flow,会在每次 emit 值时,执行额外的 ensureActive 检测,从而可以取消 Flow。
再看一个示例:
点击右上角运行按钮可以查看程序运行结果。
通过运行结果可以看到,当 value 等 3 时,虽然我们调用了 cancel() 函数,但是 Flow 并没有立刻被取消,而是在下一次发送值时,也就是执行 emit(4) 时,Flow 才被取消。
然而,出于性能考虑,大多数 Flow 操作符并不会进行额外的取消检测。例如,如果您使用 IntRange.asFlow 扩展来编写相同的循环,由于没有挂起操作,因此就不会有取消检测:
当在大量循环操作中使用 Flow 时,必须显示的进行取消检测。例如,可以通过添加 .onEach { currentCoroutineContext().ensureActive() } 来达到此目的。不过,有一个更简单的做法,那就是使用 cancellable 操作符:
运行结果表明,输出 3 之后,由于调用了 cancel() 因此产生 JobCancellationException 异常导致程序异常退出。
中端流操作符
可以使用操作符转换流,就像转换集合与序列一样。 过渡操作符应用于上游流,并返回下游流。 这些操作符也是冷操作符,就像流一样。这类操作符本身不是挂起函数。它运行的速度很快,返回新的转换流的定义。
基础的中端流操作符名字看起来很熟悉,比如 map 与 filter。 这些操作符与序列的主要区别在于,这些操作符中的代码可以调用挂起函数。
举例来说,一个接收请求的流可以使用 map 操作符转换结果,即使是通过挂起函数,实现一个长时间的请求操作:
转换操作符 (Transform operator)
在流的转换操作符中,最通用的是称为 transform 的操作符。它可以用于模仿像 map 和 filter 这样的简单转换,以及实现更复杂的转换。使用 transform 操作符,我们可以发出任意值,并可发出任意次。
通过 transform 操作符,我们可以对流中的值进行更灵活的转换。这允许我们根据需要发出零个、一个或多个新值,甚至可以在一个单一的 emit 操作中多次发出不同的值。这种操作符在需要进行自定义、动态的流转换时非常有用,提供了更高级别的灵活性和控制权。
限长操作符 (Size-limiting operators)
限长中端操作符(例如 take)在达到相应限制时会取消流的执行。在协程中,这种取消操作始终会抛出一个异常,从而取消流的执行,在 catch 块中,我们可以捕获这种异常。这种异常机制确保了在取消流的情况下,资源管理函数(如 try {...} finally {...} 块)会正常运行,从而确保了程序的正确性和可靠性:
末端流操作符
由于 Flow 是冷流,因此必须要调用末端操作符(例如,collect)才会执行数据流的生产操作。
- 数据流的创建与数据流的消费是成对出现的
- 多个数据流订阅消费,也会同样有多个数据源生产创建
流的末端操作符是一种启动流收集的挂起函数。collect 操作符是最基础的一个,另外还有一些更方便使用的末端操作符:
例如:
流是顺序执行的
流的每次单独收集操作都是顺序执行的,除非使用了能够操作多个流的特殊操作符(例如 buffer)。收集操作直接在调用末端操作符的协程中执行。默认情况下不会启动新协程。每个发送 (emit) 的值都会经过从上游到下游的所有中端操作符的处理,最后传递给末端操作符处理。
请看以下示例,该示例对偶数进行过滤并将它们映射为字符串:
请点击右上角运行按钮,查看运行结果,了解流的具体执行过程。
流上下文
流的收集始终在调用协程的上下文中进行。
例如,如果有一个 simple 流,那么下面的代码会在指定的上下文 context 中运行,而不受 simple 流实现细节的影响:
1 | withContext(context) { |
流的这种属性称为上下文保留 (context preservation) 。
默认情况下,flow { ... } 构建器中的代码是运行在相应流的收集器提供的上下文中的。让我们来看一个示例:
但 collect { ... } 中的代码是允许切换上下文的。
切换上下文的常见陷阱
对于那些需要长时间运行且消耗 CPU 的代码,可能会在 Dispatchers.Default 上下文中执行,而更新 UI 的代码可能需要在 Dispatchers.Main 上下文中执行。通常情况下,可以使用 withContext 在 Kotlin 协程中切换上下文,但是在 flow { ... } 构建器中的代码遵循上下文保留属性,因此在 flow { ... } 构建器中的代码是不允许从不同的上下文发送数据的。
以下示例中的 simple() 流并不会完成切换上下文操作,相反还会产生异常:
flowOn 操作符
切换流运行的上下文(即 flow { ... } 构建器中的代码)的正确方法是使用 flowOn 函数。具体用法请看下面的示例:
本例中需要注意的是,flow { ... } 中的代码运行在后台线程,而收集操作则运行在主线程。
还有一点值得留意,flowOn 操作符改变了流的默认顺序性质 (default sequential nature),而默认情况下,流数据的产生 (emit) 和处理 (collect) 是发生在相同协程中的(详见之前的示例)。收集操作发生在(”coroutine#1”)中,而发送操作发生在另一个协程(”coroutine#2”)中,后者运行在与收集协程所在线程并发运行的另一个线程中。当使用 flowOn 操作符切换 CoroutineDispatcher 时,它为上游流创建了一个新的协程。
缓冲(buffer)
从整体上来讲,为了减小收集流所花费的时间,将流的不同部分在不同的协程中运行是一个行之有效的办法,特别是涉及到长时间运行的异步操作时。先来看一个示例:
在上例中,每产生一个数据耗时约 100ms,每处理一个数据耗时约 300ms。对于 3 个数据,流的总体运行时间约为 1200ms = 3 * (100ms + 300ms) 左右。
这里我们可以引入 buffer 操作符,改变流的运行顺序,让流的发送代码与收集代码并行运行。来看下面的示例:
1 | import kotlinx.coroutines.* |
运行结果如下:
1 | 1 |
让我们来分析一下执行过程:
在这个示例中,由于我们使用了 buffer 操作符,因此流的运行顺序不再像默认情况那样,需要等待收集处理完数据后,才可以再次发送数据。使用 buffer 操作符后,流的运行顺序变为发送数据无需等待收集处理结束,便可以直接开始下一次的数据发送。buffer 操作符让流的发送部分与收集部分并行运行了。
所以上例中,发送第一个数据前等待了 100ms,之后不需要等待 emit(1) 的收集处理完成(也就是无需等待 collect 处理所需要的 300ms ),便可直接开始下一次循环。也就是说在大约 300ms 时间,3 个数据就可以全部发送出去。而收集部分每处理一个数据还是需要花费 300ms 的。
由于发送与收集现在是并行的,在发送方 100ms 后发送第一个数据开始,收集方就开始处理数据(处理花费 300ms),与此同时发送方无需等待继续开始下一次的数据发送。因此流的总体运行时间大约为 100ms + 3 * 300ms = 1000ms。
我们可以用一个时间轴来直观的看一下程序随着时间运行的过程(理想情况,不考虑每条命令自身花费时间):
1 | └──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┘ |
注意,当我们使用 flowOn 操作符切换 CoroutineDispatcher 时,flowOn 操作符会使用相同的缓冲机制,只不过 buffer 操作符不会改变执行的上下文。
合并(conflate)
有时我们可能不需要在 collect 时处理每个被发送过来的值,而只需要处理最近较新的值(only most recent ones)。在这种情况下,可以使用 conflate 操作符,在收集器处理速度较慢时跳过中间值。让我们基于之前的示例进行演示,新代码如下:
1 | import kotlinx.coroutines.* |
这个示例中,仅把 buffer() 替换成成了 conflate(),让我们看一下运行结果:
1 | 1 |
可以看到,在第一条数据处理过程中,已经产生了第二条和第三条数据,因此当第一条数据处理完之后,已经有了两条数据待处理,因此当继续收集数据时,第二个数据就成了被合并 (conflated) 的数据,只有最新的数据(第三条数据)发送给收集器进行处理。
让我们再用一个时间轴来直观的看一下程序随着时间运行的过程(理想情况,不考虑每条命令自身花费时间):
1 | └──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┘ |
最后让我们看一下 conflate 函数的实现:
1 | public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED) |
可以看到 conflate 的本质还是 buffer,只不过将 buffer 的第一个参数 capacity 设置成了 CONFLATED 而已。而 buffer 的 capacity 参数默认值是 BUFFERED。
处理最新值(xxxLatest)
当发射器和收集器的处理都较慢时,合并(Conflation)是加快处理速度的一种方法,它通过丢弃发出的值来实现。另一种方法是在每次发出新值时取消较慢的收集器并重新启动它。Flow API 中有一系列的 xxxLatest 操作符,其执行与 xxx 操作符相同的基本逻辑,但会在有新值时取消其代码块的执行。
让我们尝试将上一个示例中的 conflate 替换成 collectLatest,新示例如下:
1 | import kotlinx.coroutines.* |
运行结果如下:
1 | Collecting 1 |
由于 collectLatest 的函数体需要花费 300 毫秒,但是每 100 毫秒才发送一次值,所以我们看到每个发送过来的值虽然都可以进入 collectLatest 代码块,但是只有最后一个值可以完整的被收集器处理。
让我们再用一个时间轴来直观的看一下程序随着时间运行的过程(理想情况,不考虑每条命令自身花费时间):
1 | └──────────┸──────────┸──────────┸──────────┸──────────┸──────────┸──────────┸──────────┘ |
最后,以 collectLatest 为例,让我们看一下它的函数实现:
1 | public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) { |
可以看到,collectLatest 内部依然使用了 buffer,并将 capacity 参数的值设置成了 0。
组合多个流
组合多个流有很多种方式。
Zip
就像 Kotlin 标准库中的 Sequence.zip 扩展函数一样, 流同样拥有一个 zip 操作符,用于组合两个流中的相关值。请看如下示例:
1 | import kotlinx.coroutines.* |
执行结果如下:
1 | 1 -> one |
Combine
当 Flow 代表变量或操作的最近较新值时 (most recent value)(参见 conflate 部分),可能需要执行一个计算操作,该计算依赖于相应 Flow 的最近较新值,并在任何上游 Flow 发出值时重新计算它。对应的操作符系列 (corresponding family of operators) 被称为 combine。
对于先前那个例子,如果 nums 每 300 毫秒更新一次,但 strs 每 400 毫秒更新一次,然后再使用 zip 操作符合并它们,仍会产生相同的结果,只不过结果每 400 毫秒才打印一次。修改后的代码如下:
在这个示例中,我们使用了
onEach中端操作符来延时发送每个元素,这会让代码更加简洁明了。
1 | import kotlinx.coroutines.* |
运行结果如下:
1 | 1 -> one at 445 ms from start |
如果我们再将例子中的 zip 替换成 combine 的话,结果又会如何呢?修改后的代码如下:
1 | import kotlinx.coroutines.* |
执行结果如下:
1 | 1 -> one at 444 ms from start |
可以看到,我们得到了完全不同的结果。nums 或 strs 流中的每次数据发送,都会打印一行结果。
让我们再用一个时间轴来直观的看一下程序随着时间运行的过程(理想情况,不考虑每条命令自身花费时间):
1 | └──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┸──────┘ |
展平流 (Flattening flows)
流代表异步接收的值序列,因此很容易遇到这样的情况: 每个值都会触发对另一个值序列的请求。例如,以下函数会返回两个字符串的流,返回间隔 500 毫秒:
1 | fun requestFlow(i: Int): Flow<String> = flow { |
现在,如果我们有一个包含三个整数的流,并对每个整数调用 requestFlow,如下所示:
1 | val flowWithMap: Flow<Flow<String>> = (1..3).asFlow().map { requestFlow(it) } |
这样我们就得到了一个包含流的流 (Flow<Flow<String>>),若需要对其做进一步处理的话,需要先将其展平 (flatten) 为单一流。集合和序列都有 flatten 与 flatMap 操作符来实现这一点。然而,由于流的异步性质,我们需要不同的展平模式,因此就需要一系列应用在流上的展平操作符。
flatMapConcat
将流的流串联由 flatMapConcat 与 flattenConcat 操作符提供。它们是相应序列操作符的最直接类比。它们在等待内部流完成之后才开始收集下一个值,请看下面的示例:
1 | import kotlinx.coroutines.* |
运行结果如下:
1 | 1: First at 121 ms from start |
通过输出结果,我们可以清晰地看到 flatMapConcat 的顺序特性。
flatMapMerge
另一种展平操作是并发收集所有传入的流,并将它们的值合并到一个单独的流,以便尽快的发出值。 它由 flatMapMerge 与 flattenMerge 操作符实现。它们都接受一个可选的并发参数 concurrency(默认情况下,它等于 DEFAULT_CONCURRENCY),该参数限制了同时收集的流的数量。
来看下面的示例:
1 | import kotlinx.coroutines.* |
运行结果如下:
1 | 1: First at 173 ms from start |
可以明显的看到 flatMapMerge 的同步特性。
请注意,
flatMapMerge虽然是按顺序调用其代码块(即本例中的{ requestFlow(it) }),但会同时并发收集结果流,相当于执行顺序是首先执行map { requestFlow(it) },然后在其返回结果上调用flattenMerge。
flatMapLatest
与 collectLatest 操作符类似(在“处理最新值”一节中已经描述过),同样存在相对应的“最新” (Latest) 展平模式,在发出新流后立即取消先前流的收集。 这由 flatMapLatest 操作符来实现。
请看下面的示例:
1 | import kotlinx.coroutines.* |
执行结果如下:
1 | 1: First at 164 ms from start |
上面这个例子非常好的展示了 flatMapLatest 的工作方式。
请注意,
flatMapLatest在收到一个新值时,会取消块中的所有代码 (即本例中的{ requestFlow(it) })。在上面这个特定示例中可能看不出有什么影响,因为调用requestFlow自身是快速的,非挂起的,且不能被取消。然而,如果我们在requestFlow中使用类似delay这样的挂起函数的话,那么输出结果的差异将会显现出来。
Android 上的 Kotlin 数据流
数据流 Flow 包含三个重要角色:
- 数据提供方:生成添加到数据流中的数据。得益于协程,数据流还可以异步生成数据。
- 中介(可选):可修改发送到数据流的值,或修正数据流本身。
- 数据使用方:使用数据流中的值。
StateFlow
StateFlow 是一个状态容器可观察数据流,可向其收集器发出当前状态更新和新状态更新。也可以通过其 value 属性读取当前状态值。如需更新状态并将其发送到数据流,请为 MutableStateFlow 类的 value 属性分配一个新值。
在 Android 中,StateFlow 非常适合需要让可变状态保持可观察的类。
该文章中有如下一段话,很好的总结了什么时候应该用 StateFlow:
When exposing UI state to a view, use StateFlow. It’s a safe and efficient observer designed to hold UI state.
如需将任何数据流转换为 StateFlow,请使用 stateIn 中间操作符。
利用 shareIn 使冷数据流变为热数据流
StateFlow 是热数据流,只要该数据流被收集,或对它的任何其他引用在垃圾回收根中存在,该数据流就会一直存于内存中。您可以使用 shareIn 操作符将冷数据流变为热数据流。
SharedFlow
shareIn 函数会返回一个热数据流 SharedFlow,此数据流会向从其中收集值的所有使用方发出数据。SharedFlow 是 StateFlow 的可配置性极高的泛化数据流。
您无需使用 shareIn 即可创建 SharedFlow。
Flow 常用 API
stateInshareInflatMapLatest(switchMaphas been deprecated)transformLatestcollectAsStateWithLifecyclecombine- combineTransform
- zip
filtermaponEachflowOntransform
数据流收集被中止原因
数据流收集可能会由于以下原因而停止:
- 收集数据的协程被取消,此操作也会让 数据提供方 停止活动。
- 数据提供方完成发出数据项。在这种情况下,数据流将关闭,调用
collect的协程则继续执行。
捕获异常
如需处理异常,可以使用 catch 操作符,如:
1 | fun getNewsData() { |