Flow — cold asynchronous stream with flow builder and comprehensive operator set (filter, map, etc); 流 - 冷的异步流,具有流生成器和全面的操作符集(过滤器、map等)。
Flow有以下特点:
冷数据流,不消费则不生产,这一点与Channel正相反:Channel的发送端并不依赖于接收端。
Flow通过flowOn改变数据发射的线程,数据消费线程则由协程所在线程决定
与RxJava类似,支持通过catch捕获异常,通过onCompletion回调完成
Flow没有提供取消方法,可以通过取消Flow所在协程的方式来取消
创建 flow 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 lifecycleScope.launch(Dispatchers.IO) { flow { emit(1 ) emit(2 ) emit(3 ) }.onCompletion { Log.d(TAG, "flow onCompletion" ) }.collect { Log.d(TAG, "flow.collect: $it " ) } } ------------------------------------------ TestFlowActivity: flow.collect: 1 TestFlowActivity: flow.collect: 2 TestFlowActivity: flow.collect: 3 TestFlowActivity: flow onCompletion ------------------------------------------ lifecycleScope.launch(Dispatchers.IO) { flowOf(1 , 2 , 3 ).onCompletion { Log.d(TAG, "flowOf onCompletion" ) }.collect { Log.d(TAG, "flowOf collect: $it " ) } listOf(1 ,2 ,3 ).asFlow().onCompletion { Log.d(TAG, "asFlow onCompletion" ) }.collect { Log.d(TAG, "asFlow collect: $it " ) } } ------------------------------------------ TestFlowActivity: flowOf collect: 1 TestFlowActivity: flowOf collect: 2 TestFlowActivity: flowOf collect: 3 TestFlowActivity: flowOf onCompletion TestFlowActivity: asFlow collect: 1 TestFlowActivity: asFlow collect: 2 TestFlowActivity: asFlow collect: 3 TestFlowActivity: asFlow onCompletion ------------------------------------------
切换线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 lifecycleScope.launch(Dispatchers.Main) { flow { Log.d(TAG, "emit " + Thread.currentThread().name) emit(1 ) }.flowOn(Dispatchers.IO).map { Log.d(TAG, "map " + Thread.currentThread().name) it*2 }.onCompletion { Log.d(TAG, "onCompletion " + Thread.currentThread().name) }.collect { Log.d(TAG, "collect " + Thread.currentThread().name) } } ------------------------------------------ TestFlowActivity: emit DefaultDispatcher-worker-1 TestFlowActivity: map main TestFlowActivity: collect main TestFlowActivity: onCompletion main ------------------------------------------ lifecycleScope.launch(Dispatchers.Main) { flow { Log.d(TAG, "emit " + Thread.currentThread().name) emit(1 ) }.map { Log.d(TAG, "map " + Thread.currentThread().name) it * 2 }.flowOn(Dispatchers.IO).onCompletion { Log.d(TAG, "onCompletion " + Thread.currentThread().name) }.collect { Log.d(TAG, "collect " + Thread.currentThread().name) } } ------------------------------------------ TestFlowActivity: emit DefaultDispatcher-worker-1 TestFlowActivity: map DefaultDispatcher-worker-1 TestFlowActivity: collect main TestFlowActivity: onCompletion main ------------------------------------------
可以通过flowOn()改变的是Flow函数内部发射数据时的线程,而在collect收集数据时会自动切回创建Flow时的线程。 Flow的调度器 API 中看似只有flowOn与subscribeOn对应,其实collect所在协程的调度器也与observeOn指定的调度器对应。
对比类型
Flow
RxJava
改变数据发送的线程
flowOn
subscribeOn
改变消费数据的线程
自动切回协程的线程
observeOn
注意点:(和RxJava一致) 发射数据和处理数据在一个线程内,发射一次消费一次
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 lifecycleScope.launch(Dispatchers.IO) { flow { Log.d(TAG, "emit: 1" ) emit(1 ) Log.d(TAG, "emit: 2" ) emit(2 ) Log.d(TAG, "emit: 3" ) emit(3 ) }.onCompletion { Log.d(TAG, "flow onCompletion" ) }.collect { Log.d(TAG, "flow.collect: $it " ) } } ------------------------------------------ TestFlowActivity: emit: 1 TestFlowActivity: flow.collect: 1 TestFlowActivity: emit: 2 TestFlowActivity: flow.collect: 2 TestFlowActivity: emit: 3 TestFlowActivity: flow.collect: 3 TestFlowActivity: flow onCompletion ------------------------------------------
发射数据和处理数据不在一个线程内,发射所有数据后,再消费所有数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 lifecycleScope.launch(Dispatchers.IO) { flow { Log.d(TAG, "emit: 1" ) emit(1 ) Log.d(TAG, "emit: 2" ) emit(2 ) Log.d(TAG, "emit: 3" ) emit(3 ) }.flowOn(Dispatchers.Main).onCompletion { Log.d(TAG, "flow onCompletion" ) }.collect { Log.d(TAG, "flow.collect: $it " ) } } ------------------------------------------ TestFlowActivity: emit: 1 TestFlowActivity: emit: 2 TestFlowActivity: emit: 3 TestFlowActivity: flow.collect: 1 TestFlowActivity: flow.collect: 2 TestFlowActivity: flow.collect: 3 TestFlowActivity: flow onCompletion ------------------------------------------
flow异常处理 Flow中的catch对应着 RxJava 中的 onError。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 lifecycleScope.launch(Dispatchers.IO) { flow { emit(1 ) throw NullPointerException() }.catch { Log.d(TAG, "catch $it " ) }.collect { Log.d(TAG, "collect " + Thread.currentThread().name) } } ------------------------------------------ TestFlowActivity: collect DefaultDispatcher-worker-1 TestFlowActivity: catch java.lang.NullPointerException ------------------------------------------
flow 取消 flow 在一个挂起函数内被挂起了, flow 才能被取消。 ????????
如果这里使用withTimeout,然后超时collect。贼不会返回,最后的logd不会执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 lifecycleScope.launch(Dispatchers.IO) { val f = flow { emit(1 ) delay(1000 ) emit(2 ) delay(1000 ) emit(3 ) delay(1000 ) } val a = withTimeoutOrNull(2000 ) { f.collect { Log.d(TAG, "f.collect: $it " ) } } Log.d(TAG, "flow end $a " ) } ------------------------------------------ TestFlowActivity: f.collect: 1 TestFlowActivity: f.collect: 2 TestFlowActivity: flow end null ------------------------------------------
Flow Flow 是在消费者消费时才会生产数据,且冷流和消费者是一对一的关系,即当有多个消费者消费时,生产者是重新生产发送数据的。
StateFlow 和 SharedFlow 是热流,生产数据不依赖消费者消费,热流与消费者是一对多的关系,当有多个消费者时,它们之间的数据都是同一份。
SharedFlow(侧重事件,自定义粘性数量)
没有初始值
保留历史数据
可以传入一个 replay 参数,它表示可以对新订阅者重新发送 replay 个历史数据,默认值为 0, 即非粘性
StateFlow(侧重状态,粘性数量默认1)
必须有初始值
只保留最新数据
可以看成是一个 replay = 1 且没有缓冲区的 SharedFlow
每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值
参考文章:https://www.jianshu.com/p/b1e7e00da971