Kotlin 之 协程(三)Flow异步流

语言: CN / TW / HK

开启掘金成长之旅!这是我参与「掘金日新计划 · 12 月更文挑战」的第6天,点击查看活动详情

flow介绍

挂起函数可以异步返回单个值,那如何异步多次返回多个值呢? 使用flow,flow的特点: - flow{...}块中的代码可以挂起 - 使用flow,suspend修饰符可以省略 - 流使用emit函数发射值 - 流使用collect的函数收集值 - flow类似冷流,flow中代码直到流被收集(调用collect)的时候才运行,类似lazy,什么时候用,什么时候执行。 - 流的连续性:流收集都是按顺序收集的 - flowOn可更改流发射的上下文,即可以指定在主线程或子线程中执行 - 与之相对的是热流,我们即将介绍的 StateFlow 和 SharedFlow 是热流,在垃圾回收之前,都是存在内存之中,并且处于活跃状态的。

c //使用flow,suspend修饰符可以省略 fun doflow() = flow<Int> { for (i in 1..5) { //这里是挂起,不是阻塞 delay(500) emit(i) } }.flowOn(Dispatchers.IO) //调用 runBlocking { doflow().collect { log("value=$it") } } 打印(多次返回多个值) com.z.zjetpack V/zx: value=1 com.z.zjetpack V/zx: value=2 com.z.zjetpack V/zx: value=3 com.z.zjetpack V/zx: value=4 com.z.zjetpack V/zx: value=5

flow的应用场景

文件下载场景

```c //正在下载(文件总大小为5) fun doflow() = flow { for (i in 1..5) { delay(500) emit(i.toDouble()) } //flowOn来指定在IO线程中下载 }.flowOn(Dispatchers.IO) //读取进度 runBlocking { doflow().collect { log("当前下载=${it / 5 * 100}%") } }

打印: com.z.zjetpack V/zx: 当前下载=20.0% com.z.zjetpack V/zx: 当前下载=40.0% com.z.zjetpack V/zx: 当前下载=60.0% com.z.zjetpack V/zx: 当前下载=80.0% com.z.zjetpack V/zx: 当前下载=100.0% ```

流构建器

flowof 和asflow

```c runBlocking { flowOf(1, 2, 3) .onEach { delay(500) } .collect { log("value = $it") }

        (5..8).asFlow()
            .onEach { delay(500) }
            .collect {
                log("value = $it")
            }
    }

```

使用launchin替换collect在单独的协程中启动收集流。

```c fun event() = (1..3) .asFlow() .onEach { delay(500) }.flowOn(Dispatchers.IO)

//调用

    runBlocking {
        val job =   event().onEach {
            log("value = $it")
        }.launchIn(CoroutineScope(Dispatchers.IO))
        //主线程可用this
        //.launchIn(this)

        job.join()
    }

```

流的取消

超时的时候取消

```c fun cancelFlow() = flow { for (i in 1..5) { delay(1000) emit(i) } }

//调用 runBlocking { //超时的时候取消流 withTimeoutOrNull(2500) { cancelFlow().collect { log("value = $it") } } }

打印:在2.5秒的时候超时了,取消了 com.z.zjetpack V/zx: value = 1 com.z.zjetpack V/zx: value = 2 ```

直接取消

```c runBlocking { cancelFlow().collect { log("value = $it") if(it == 3){ cancel() }

            }
    }

```

繁忙的任务是不能直接取消的,需要检测取消(cancellable)

c runBlocking { (1..5).asFlow().cancellable().collect { if(it == 3) { cancel() } } } 背压:生产者效率 > 消费者效率 在这里插入图片描述 使用缓冲和flowon来处理背压

buffer():并发运行流中发射元素的代码 conflate():合并发射项,不对每个值处理 collectLatest():取消并重新发送最后一个值

模拟背压代码: ```c fun preFlow() = flow { for (i in 1..5) { delay(100) emit(i) log("发送$i") } }

//调用 //100ms发送一次,300ms接收一次就产生了背压 runBlocking { val time = measureTimeMillis { preFlow() //buffer可以增加缓冲,提高效率 //.buffer(100) //flowOn自带缓冲功能 //.flowOn(Dispatchers.IO) //conflate不对每个值处理 //.conflate() //.collect //取消并重新发送最后一个值 .collectLatest { delay(300) log("接收到:$it") } } log("总耗时 $time")

    }

打印: com.z.zjetpack V/zx: 接收到:1 com.z.zjetpack V/zx: 发送1 com.z.zjetpack V/zx: 接收到:2 com.z.zjetpack V/zx: 发送2 com.z.zjetpack V/zx: 接收到:3 com.z.zjetpack V/zx: 发送3 com.z.zjetpack V/zx: 接收到:4 com.z.zjetpack V/zx: 发送4 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 发送5 com.z.zjetpack V/zx: 总耗时 2033

使用buffer后 com.z.zjetpack V/zx: 发送1 com.z.zjetpack V/zx: 发送2 com.z.zjetpack V/zx: 发送3 com.z.zjetpack V/zx: 接收到:1 com.z.zjetpack V/zx: 发送4 com.z.zjetpack V/zx: 发送5 com.z.zjetpack V/zx: 接收到:2 com.z.zjetpack V/zx: 接收到:3 com.z.zjetpack V/zx: 接收到:4 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 总耗时 1634

使用flowOn后 com.z.zjetpack V/zx: 发送1 com.z.zjetpack V/zx: 发送2 com.z.zjetpack V/zx: 发送3 com.z.zjetpack V/zx: 接收到:1 com.z.zjetpack V/zx: 发送4 com.z.zjetpack V/zx: 发送5 com.z.zjetpack V/zx: 接收到:2 com.z.zjetpack V/zx: 接收到:3 com.z.zjetpack V/zx: 接收到:4 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 总耗时 1639

使用conflate后 com.z.zjetpack V/zx: 发送1 com.z.zjetpack V/zx: 发送2 com.z.zjetpack V/zx: 发送3 com.z.zjetpack V/zx: 接收到:1 com.z.zjetpack V/zx: 发送4 com.z.zjetpack V/zx: 发送5 com.z.zjetpack V/zx: 接收到:3 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 总耗时 1034

使用collectLatest后 com.z.zjetpack V/zx: 发送1 com.z.zjetpack V/zx: 发送2 com.z.zjetpack V/zx: 发送3 com.z.zjetpack V/zx: 发送4 com.z.zjetpack V/zx: 发送5 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 总耗时 843 ```

操作符

转换操作符:map ,transform 限长操作符:取指定数量,take 末端操作符:末端操作符用于启动流收集的挂起函数,collect,tolist,toset,reduce,fold 组合操作符:zip 展平操作符:flatMapConcat(连接),flatMapMerge(合并),flatMapLatest(最新)

map

```c suspend fun perRequest(req: Int): String { delay(1000) return "转换 $req" }

    runBlocking {
        (1..3).asFlow().map {
            perRequest(it)
        }.collect {
            log(it)
        }
  }

打印: com.z.zjetpack V/zx: 转换 1 com.z.zjetpack V/zx: 转换 2 com.z.zjetpack V/zx: 转换 3 ```

transform

c runBlocking { (5..6).asFlow().transform { emit("s $it") emit(perRequest(it)) emit("e $it") } //.take(4) .collect { log(it) } } 打印: com.z.zjetpack V/zx: s 5 com.z.zjetpack V/zx: 转换 5 com.z.zjetpack V/zx: e 5 com.z.zjetpack V/zx: s 6 com.z.zjetpack V/zx: 转换 6 com.z.zjetpack V/zx: e 6

take

c 加上take之后 com.z.zjetpack V/zx: s 5 com.z.zjetpack V/zx: 转换 5 com.z.zjetpack V/zx: e 5 com.z.zjetpack V/zx: s 6

末端操作符:collect,tolist,toset,reduce,fold

c runBlocking { val sum = (1..5).asFlow().map { it * it }.reduce { a, b -> a + b } log("sum = $sum") val nList = (1..5).asFlow().toList() log("nList = $nList") val nSet = listOf(1, 2, 2, 3, 3, 5).asFlow().toSet() log("nSet = $nSet") } 打印: com.z.zjetpack V/zx: sum = 55 com.z.zjetpack V/zx: nList = [1, 2, 3, 4, 5] com.z.zjetpack V/zx: nSet = [1, 2, 3, 5]

展平操作符

只使用map的时候

```c //返回值是一个flow fun reqFlow(i: Int) = flow { emit("start $i") delay(500) emit("end $i") }

runBlocking { (0..1).asFlow().map { reqFlow(it) }.collect { log("首次collect = $it") it.collect { log("二次 = $it") } } } 打印:由于返回是flow所以需要collect 两次才能拿到值,Flow> com.z.zjetpack V/zx: 首次collect = [email protected] com.z.zjetpack V/zx: 二次 = start 0 com.z.zjetpack V/zx: 二次 = end 0 com.z.zjetpack V/zx: 首次collect = [email protected] com.z.zjetpack V/zx: 二次 = start 1 com.z.zjetpack V/zx: 二次 = end 1 ```

flatMapConcat

c runBlocking { (0..1).asFlow().flatMapConcat { reqFlow(it) }.collect { log("首次collect = $it") } } 打印:直接展开了 com.z.zjetpack V/zx: 首次collect = start 0 com.z.zjetpack V/zx: 首次collect = end 0 com.z.zjetpack V/zx: 首次collect = start 1 com.z.zjetpack V/zx: 首次collect = end 1

c runBlocking { (0..1).asFlow().flatMapMerge { reqFlow(it) }.collect { log("首次collect = $it") } } 打印: com.z.zjetpack V/zx: 首次collect = start 0 com.z.zjetpack V/zx: 首次collect = start 1 com.z.zjetpack V/zx: 首次collect = end 0 com.z.zjetpack V/zx: 首次collect = end 1

flatMapLatest

```c runBlocking { (0..1).asFlow().flatMapLatest { reqFlow(it) }.collect { log("首次collect = $it") } }

打印: com.z.zjetpack V/zx: 首次collect = start 0 com.z.zjetpack V/zx: 首次collect = start 1 com.z.zjetpack V/zx: 首次collect = end 1 ```

流的异常处理

catch函数 和 try catch

c flow { emit(1) throw NullPointerException() //catch函数只捕获上游的异常 }.catch { log("exception $it") //在异常后恢复 emit(20) }.flowOn(Dispatchers.IO) .collect { log("msg $it") } 打印: com.z.zjetpack V/zx: exception java.lang.NullPointerException com.z.zjetpack V/zx: msg 1 com.z.zjetpack V/zx: msg 20

c //不建议通过这种方式捕获上游的异常,违反了flow原则,这种适合捕获下游的异常 try { (1..3).asFlow().collect { check(it > 2) { "ex $it" } } } catch (e: Exception) { log("异常 $e") } 打印: com.z.zjetpack V/zx: 异常 java.lang.IllegalStateException: ex 1

流的完成

finally 和 onCompletion

```c try { (1..3).asFlow().collect { check(it > 2) { "ex $it" } } } catch (e: Exception) { log("异常 $e") } finally { log("流已完成") }

        //发生异常onCompletion可以拿到异常信息,但不会捕获
        try {
            (1..3).asFlow().onCompletion {
                log("onCompletion $it")
            }.collect {
                check(it > 2) {
                    "ex $it"
                }
            }
        } catch (e: Exception) {
            log("异常 $e")
        }

打印: com.z.zjetpack V/zx: 异常 java.lang.IllegalStateException: ex 1 com.z.zjetpack V/zx: 流已完成 com.z.zjetpack V/zx: onCompletion java.lang.IllegalStateException: ex 1 com.z.zjetpack V/zx: 异常 java.lang.IllegalStateException: ex 1 ```

StateFlow

StateFlow 是一个状态容器式可观察数据流,可以向其收集器发出当前状态更新和新状态更新。 1. StateFlow使用 第一步:创建 MutableStateFlow 并设置初始化的值。

c class MainViewModel : ViewModel() { val selected = MutableStateFlow<Boolean>(false) }

第二步:同 Flow 一样,使用 collect 方法:

c lifecycleScope.launch { viewModel.selected.collect { // ... 引起UI发生的变化 // 比如 某个按钮是否选中状态 } }

第三步:可以给 selected设置值,从而引起 Ui 层的变化:

c class MainViewModel : ViewModel() { val selected = MutableStateFlow<Boolean>(false) fun doSomeThing(value: Boolean) { selected.value = value } }

普通的 Flow,是不具备 selected.value = value 这种能力的

StateFlow 和 LiveData 有什么区别? 有两点区别:

第一点,StateFlow 必须有初始值,LiveData 不需要。 第二点,当 View 变为 STOPPED 状态时,LiveData.observe() 会自动取消注册使用方,而从 StateFlow 或任何其他数据流收集数据则不会取消注册使用方。 对于 StateFlow 在界面销毁的时仍处于活跃状态,有两种解决方法:

使用 ktx 将 Flow 转换为 LiveData。 在界面销毁的时候,手动取消(这很容易被遗忘)。

```c class LatestNewsActivity : AppCompatActivity() { ... // Coroutine listening for UI states private var uiStateJob: Job? = null

override fun onStart() {
    super.onStart()
    // Start collecting when the View is visible
    uiStateJob = lifecycleScope.launch {
        latestNewsViewModel.uiState.collect { uiState -> ... }
    }
}

override fun onStop() {
    // Stop collecting when the View goes to the background
    uiStateJob?.cancel()
    super.onStop()
}

} ```

SharedFlow

SharedFlow:数据共享,有点类似广播 和 StateFlow 一样,SharedFlow 也是热流,它可以将已发送过的数据发送给新的订阅者,并且具有高的配置性。

  1. SharedFlow使用场景 总的来说,SharedFlow 和 StateFlow 类似,他们都是热流,都可以用来存储状态,但 SharedFlow 配置灵活。

当你有如下场景时,需要使用 SharedFlow:

发生订阅时,需要将过去已经更新的n个值,同步给新的订阅者。 配置缓存策略。 2. SharedFlow的使用 简单写一个 Demo吧。

第一步:创建一个 MutableSharedFlow,对应的参数解释在注释中

c class MainViewModel : ViewModel() { val sharedFlow = MutableSharedFlow<Int>( 5 // 参数一:当新的订阅者Collect时,发送几个已经发送过的数据给它 , 3 // 参数二:减去replay,MutableSharedFlow还缓存多少数据 , BufferOverflow.DROP_OLDEST // 参数三:缓存策略,三种 丢掉最新值、丢掉最旧值和挂起 ) }

第二步:使用emit或者tryEmit方法

```c class MainViewModel : ViewModel() { val sharedFlow = MutableSharedFlow( // .... )

// 初始化时调用
init {
    for (i in 0..10) {
        sharedFlow.tryEmit(i)
    }
}

// 在按钮中调用
fun doAsClick() {
    for (i in 11..20) {
        sharedFlow.tryEmit(i)
    }
}

} ```

当 MutableSharedFlow 中缓存数据量超过阈值时,emit 方法和 tryEmit 方法的处理方式会有不同:

emit 方法:当缓存策略为 BufferOverflow.SUSPEND 时,emit 方法会挂起,直到有新的缓存空间。 tryEmit 方法:tryEmit 会返回一个 Boolean 值,true 代表传递成功,false 代表会产生一个回调,让这次数据发射挂起,直到有新的缓存空间。 第三步:接收数据 接收数据的方式,跟普通的 Flow 没什么区别。

下面是我的全部代码:

```c class MainActivity : AppCompatActivity() {

private lateinit var viewModel: MainViewModel

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    setContentView(R.layout.activity_main)

    viewModel = ViewModelProvider(this).get(com.example.coroutinedemo.viewmodel.MainViewModel::class.java)

    val tvContent = findViewById<TextView>(R.id.tv_content)
    // 启动第一个协程,接收初始化的数据
    lifecycleScope.launch {
        val sb = StringBuffer()
        viewModel.sharedFlow.collect {
            sb.append("<<${it}")
            tvContent.text = sb
        }
    }

    val btnGo = findViewById<Button>(R.id.btn_go)
    val tvTwo = findViewById<TextView>(R.id.tv_2)
    btnGo.setOnClickListener {
        // 发送新的数据
        viewModel.doAsClick()
        // 发送新的数据以后,启动第二个协程
        lifecycleScope.launch {
            val sb = StringBuffer()
            viewModel.sharedFlow.collect {
                sb.append("<<${it}")
                tvTwo.text = sb.toString()
            }
        }
    }
}

} ```

  1. 将冷流转化为SharedFlow 直接使用官网的代码,方法是使用 Flow 的扩展方法 shareIn:

c class NewsRemoteDataSource(..., private val externalScope: CoroutineScope, ) { val latestNews: Flow<List<ArticleHeadline>> = flow { ... }.shareIn( externalScope, replay = 1, started = SharingStarted.WhileSubscribed() // 启动政策 ) }

重点是参数三,分别提供了三个启动策略:

SharingStarted.WhileSubscribed():存在订阅者时,将使上游提供方保持活跃状态。 SharingStarted.Eagerly:立即启动提供方。 SharingStarted.Lazily:在第一个订阅者出现后开始共享数据,并使数据流永远保持活跃状态。 总结 Flow 给我的感觉就像古老的印刷术,版面定了就不可更改,不过,该版面可印刷多张内容;StateFlow 给我的感觉就像活字印刷,可以不停的更改版面,也可以使用同一个版面印刷很多内容。

如果你要使用 Flow 记录数据的状态,StateFlow 和 SharedFlow 会是一个不错的选择。StateFlow 和 SharedFlow 提供了在 Flow 中使用 LiveData 式更新数据的能力,但是如果要在 UI 层使用,需要注意生命周期的问题。

StateFlow 和 SharedFlow 相比,StateFlow 需要提供初始值,SharedFlow 配置灵活,可提供旧数据同步和缓存配置的功能。 协程进阶技巧 - StateFlow和SharedFlow