kotlin之Flow

Flow — cold asynchronous stream with flow builder and comprehensive operator set (filter, map, etc);
流 - 冷的异步流,具有流生成器和全面的操作符集(过滤器、map等)。


Flow有以下特点:
  • 冷数据流,不消费则不生产,这一点与Channel正相反:Channel的发送端并不依赖于接收端。

  • Flow通过flowOn改变数据发射的线程,数据消费线程则由协程所在线程决定

  • 与RxJava类似,支持通过catch捕获异常,通过onCompletion回调完成

  • Flow没有提供取消方法,可以通过取消Flow所在协程的方式来取消

创建 flow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
lifecycleScope.launch(Dispatchers.IO) {
flow {
emit(1)
emit(2)
emit(3)
}.onCompletion {
Log.d(TAG, "flow onCompletion")
}.collect {
Log.d(TAG, "flow.collect: $it")
}
}

------------------------------------------
TestFlowActivity: flow.collect: 1
TestFlowActivity: flow.collect: 2
TestFlowActivity: flow.collect: 3
TestFlowActivity: flow onCompletion
------------------------------------------

lifecycleScope.launch(Dispatchers.IO) {
flowOf(1, 2, 3).onCompletion {
Log.d(TAG, "flowOf onCompletion")
}.collect {
Log.d(TAG, "flowOf collect: $it")
}

listOf(1,2,3).asFlow().onCompletion {
Log.d(TAG, "asFlow onCompletion")
}.collect {
Log.d(TAG, "asFlow collect: $it")
}
}

------------------------------------------
TestFlowActivity: flowOf collect: 1
TestFlowActivity: flowOf collect: 2
TestFlowActivity: flowOf collect: 3
TestFlowActivity: flowOf onCompletion
TestFlowActivity: asFlow collect: 1
TestFlowActivity: asFlow collect: 2
TestFlowActivity: asFlow collect: 3
TestFlowActivity: asFlow onCompletion
------------------------------------------

切换线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
lifecycleScope.launch(Dispatchers.Main) {
flow {
Log.d(TAG, "emit " + Thread.currentThread().name)
emit(1)
}.flowOn(Dispatchers.IO).map {
Log.d(TAG, "map " + Thread.currentThread().name)
it*2
}.onCompletion {
Log.d(TAG, "onCompletion " + Thread.currentThread().name)
}.collect {
Log.d(TAG, "collect " + Thread.currentThread().name)
}
}

------------------------------------------
TestFlowActivity: emit DefaultDispatcher-worker-1
TestFlowActivity: map main
TestFlowActivity: collect main
TestFlowActivity: onCompletion main
------------------------------------------

lifecycleScope.launch(Dispatchers.Main) {
flow {
Log.d(TAG, "emit " + Thread.currentThread().name)
emit(1)
}.map {
Log.d(TAG, "map " + Thread.currentThread().name)
it * 2
}.flowOn(Dispatchers.IO).onCompletion {
Log.d(TAG, "onCompletion " + Thread.currentThread().name)
}.collect {
Log.d(TAG, "collect " + Thread.currentThread().name)
}
}

------------------------------------------
TestFlowActivity: emit DefaultDispatcher-worker-1
TestFlowActivity: map DefaultDispatcher-worker-1
TestFlowActivity: collect main
TestFlowActivity: onCompletion main
------------------------------------------

可以通过flowOn()改变的是Flow函数内部发射数据时的线程,而在collect收集数据时会自动切回创建Flow时的线程。
Flow的调度器 API 中看似只有flowOn与subscribeOn对应,其实collect所在协程的调度器也与observeOn指定的调度器对应。

对比类型 Flow RxJava
改变数据发送的线程 flowOn subscribeOn
改变消费数据的线程 自动切回协程的线程 observeOn


注意点:(和RxJava一致)

发射数据和处理数据在一个线程内,发射一次消费一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
lifecycleScope.launch(Dispatchers.IO) {
flow {
Log.d(TAG, "emit: 1")
emit(1)
Log.d(TAG, "emit: 2")
emit(2)
Log.d(TAG, "emit: 3")
emit(3)
}.onCompletion {
Log.d(TAG, "flow onCompletion")
}.collect {
Log.d(TAG, "flow.collect: $it")
}
}

------------------------------------------
TestFlowActivity: emit: 1
TestFlowActivity: flow.collect: 1
TestFlowActivity: emit: 2
TestFlowActivity: flow.collect: 2
TestFlowActivity: emit: 3
TestFlowActivity: flow.collect: 3
TestFlowActivity: flow onCompletion
------------------------------------------

发射数据和处理数据不在一个线程内,发射所有数据后,再消费所有数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
lifecycleScope.launch(Dispatchers.IO) {
flow {
Log.d(TAG, "emit: 1")
emit(1)
Log.d(TAG, "emit: 2")
emit(2)
Log.d(TAG, "emit: 3")
emit(3)
}.flowOn(Dispatchers.Main).onCompletion {
Log.d(TAG, "flow onCompletion")
}.collect {
Log.d(TAG, "flow.collect: $it")
}
}

------------------------------------------
TestFlowActivity: emit: 1
TestFlowActivity: emit: 2
TestFlowActivity: emit: 3
TestFlowActivity: flow.collect: 1
TestFlowActivity: flow.collect: 2
TestFlowActivity: flow.collect: 3
TestFlowActivity: flow onCompletion
------------------------------------------

flow异常处理

Flow中的catch对应着 RxJava 中的 onError。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
lifecycleScope.launch(Dispatchers.IO) {
flow {
emit(1)
throw NullPointerException()
}.catch {
Log.d(TAG, "catch $it")
}.collect {
Log.d(TAG, "collect " + Thread.currentThread().name)
}
}

------------------------------------------
TestFlowActivity: collect DefaultDispatcher-worker-1
TestFlowActivity: catch java.lang.NullPointerException
------------------------------------------

flow 取消

flow 在一个挂起函数内被挂起了, flow 才能被取消。 ????????

如果这里使用withTimeout,然后超时collect。贼不会返回,最后的logd不会执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
lifecycleScope.launch(Dispatchers.IO) {
val f = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(1000)
}

val a = withTimeoutOrNull(2000) {
f.collect {
Log.d(TAG, "f.collect: $it")
}
}

Log.d(TAG, "flow end $a")
}

------------------------------------------
TestFlowActivity: f.collect: 1
TestFlowActivity: f.collect: 2
TestFlowActivity: flow end null
------------------------------------------


Flow
Flow 是在消费者消费时才会生产数据,且冷流和消费者是一对一的关系,即当有多个消费者消费时,生产者是重新生产发送数据的。

StateFlow 和 SharedFlow 是热流,生产数据不依赖消费者消费,热流与消费者是一对多的关系,当有多个消费者时,它们之间的数据都是同一份。

SharedFlow(侧重事件,自定义粘性数量)

  1. 没有初始值
  2. 保留历史数据
  3. 可以传入一个 replay 参数,它表示可以对新订阅者重新发送 replay 个历史数据,默认值为 0, 即非粘性

StateFlow(侧重状态,粘性数量默认1)

  1. 必须有初始值
  2. 只保留最新数据
  3. 可以看成是一个 replay = 1 且没有缓冲区的 SharedFlow 
  4. 每次更新数据都会和旧数据做一次比较,只有不同时候才会更新数值

参考文章:
https://www.jianshu.com/p/b1e7e00da971