Kotlin 學習筆記(六)—— Flow 數據流學習實踐指北(二)StateFlow 與 SharedFlow

語言: CN / TW / HK

要説最近圈內大事件,那就非 chatGPT 莫屬了!人工智能領域最新的大突破了吧?很可能引發下一場的技術革命,因為大家都懂的原因現在還不能在中國大陸使用,不過國內的度廠正在積極跟進了,預計3月份能面世,且期待一下吧~

上節主要講述了 Flow 的組成、Flow 常用操作符以及冷流的具體使用。這節自然就要介紹熱流了。先來温習下:

冷流(Cold Flow):在數據被消費者訂閲後,即調用 collect 方法之後,生產者才開始執行發送數據流的代碼,通常是調用 emit 方法。即不消費,不生產,多次消費才會多次生產。消費者和生產者是一對一的關係。

上次説的例子不太直觀,所以這次換了個更直觀的對比例子,先來看第一個: kotlin //code 1 val coldFlow = flow { println("coldFlow begin emitting") emit(40) println("coldFlow 40 is emitted") emit(50) println("coldFlow 50 is emitted") } binding.btn2.setOnClickListener { lifecycleScope.launch { coldFlow.collect { println("coldFlow = $it") } } } 只有當點擊按鈕時,才會如圖打印出信息,即冷流只有調用了 collect 方法收集流後,emit 才會開始執行。 圖1 冷流特點日誌圖

熱流(Hot Flow)就不一樣了,無論有無消費者,生產者都會生產數據。它不像冷流,Flow 必須在調用末端操作符之後才會去執行;而是可以自己控制是否發送或者生產數據流。並且熱流可以有多個訂閲者;而冷流只有一個。再來看看熱流的例子: ```kotlin //code 2 val hotFlow = MutableStateFlow(0) lifecycleScope.launch { println("hotFlow begin emitting") hotFlow.emit(40) println("hotFlow 40 is emitted")

hotFlow.emit(50)
println("hotFlow 50 is emitted")

} binding.btn2.setOnClickListener { lifecycleScope.launch { hotFlow.collect { println("hotFlow collects $it") } } } ``` MutableStateFlow 就是熱流中的一種,當沒有點擊按鈕時,便會輸出下圖中的前三行信息。 圖2 熱流特點日誌圖 當點擊兩下按鈕後,就會依次輸出如圖第 4,5 行的信息,至於為什麼只會接收到 50,這跟 MutableStateFlow 的特性有關,後面再説。

通過這兩個例子就可清楚地知道冷熱流之間的區別。熱流有兩種對象,分別是 StateFlow 和 SharedFlow。

1. SharedFlow

先來看看 SharedFlow,它是一個 subscriber 訂閲者的角色,當一個 SharedFlow 調用了 collect 方法後,它就不會正常地結束完成;但可以 cancel 掉 collect 所在的協程,這樣就可以取消掉訂閲了。SharedFlow 在每次 emit 時都會去 check 一下所在協程是否已經取消。絕大多數的終端操作符,例如 Flow.toList() 都不會使得 SharedFlow 結束完成,但 Flow.take() 之類的截斷操作符是例外,它們是可以強制完成一個 SharedFlow 的。

SharedFlow 的簡單使用樣例: ```kotlin //code 3 class EventBus { private val _events = MutableSharedFlow() // private mutable shared flow val events = _events.asSharedFlow() // publicly exposed as read-only shared flow

suspend fun produceEvent(event: Event) {
    _events.emit(event) // suspends until all subscribers receive it
}

} ``` 與 LiveData 相似的使用方式。但 SharedFlow 的功能更為強大,它有 replay cache 和 buffer 機制。

1.1 Replay cache

可以理解為是一個粘性事件的緩存。每個新的訂閲者會首先收到 replay cache 中之前發出並接收到的事件,再才會收到新的發射出的值。可以在 MutableSharedFlow 的構造函數中設置 cache 的大小,不能為負數,默認為 0. kotlin //code 4 public fun <T> MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ) replay 重播之前最新的 n 個事件,見字知義。下面是例子: ```kotlin //code 5 private fun testSharedFlow() { val sharedFlow = MutableSharedFlow(replay = 2) lifecycleScope.launch { launch { sharedFlow.collect { println("++++ sharedFlow1 collected $it") } }

    launch {
        (1..3).forEach{
            sharedFlow.emit(it)
        }
    }

    delay(200)
    launch {
        sharedFlow.collect {
            println("++++ sharedFlow2 collected $it")
        }
    }
}

} 結果為:kotlin com.example.myapplication I/System.out: ++++ sharedFlow1 collected 1 com.example.myapplication I/System.out: ++++ sharedFlow1 collected 2 com.example.myapplication I/System.out: ++++ sharedFlow1 collected 3 com.example.myapplication I/System.out: ++++ sharedFlow2 collected 2 com.example.myapplication I/System.out: ++++ sharedFlow2 collected 3 `` 在emit` 發射數據前後分別設置了一個訂閲者,後面還延時了 200ms 才進行訂閲。第一個訂閲者 1、2、3都收到了;而第二個訂閲者卻只收到了 2 和 3. 這是因為在第二個訂閲者開始訂閲時,數據已經都發射完了,而 SharedFlow 的重播 replay 為 2,就可將最近發射的兩個數據再依次發送一遍,這就可以收到 2 和 3 了。

1.2 extraBufferCapacity

SharedFlow 構造函數的第二個參數 extraBufferCapacity 的作用是,在 replay cache 之外還能額外設置的緩存。常用於當生產者生產數據的速度 > 消費者消費數據的速度時的情況,可以有效提升吞吐量。

所以,若 replay = m,extraBufferCapacity = n,那麼這個 SharedFlow 總共的 BufferSize = m + n. replay 會存儲最近發射的數據,如果滿了就會往 extraBuffer 中存。接下來看一個例子: ```kotlin //code 6 private fun coroutineStudy() { val sharedFlow = MutableSharedFlow(replay = 1, extraBufferCapacity = 1) lifecycleScope.launch { launch { sharedFlow.collect { println("++++ sharedFlow1 collected $it") delay(6000) } }

    launch {
        (1..4).forEach{
            sharedFlow.emit(it)
            println("+++emit $it")
            delay(1000)
        }
    }

    delay(4000)
    launch {
        sharedFlow.collect {
            println("++++ sharedFlow2 collected $it")
            delay(20000)
        }
    }
}

} 運行結果為:kotlin 17:32:09.283 28184-28184 System.out com.wen.testdemo I +++emit 1 17:32:09.284 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 1 17:32:10.285 28184-28184 System.out com.wen.testdemo I +++emit 2 17:32:11.289 28184-28184 System.out com.wen.testdemo I +++emit 3 17:32:13.286 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow2 collected 3 17:32:15.292 28184-28184 System.out com.wen.testdemo I +++emit 4 17:32:15.293 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 2 17:32:21.301 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 3 17:32:27.311 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 4 17:32:33.292 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow2 collected 4 `` 打印結果可能會有點懵,對照着時序圖更容易理解(此圖來自於參考文獻3,感謝 fundroid 大佬的輸出~): ![圖 3 SharedFlow緩存時序圖](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/470efed5448e453b80194a8e2dfdec79~tplv-k3u1fbpfcp-zoom-1.image) 1)Emitter 發送 1,因為 Subscriber1 在 Emitter 發送數據前就已開始訂閲,所以 Subscriber1 可馬上接收;此時replay存儲 1; 2)Emitter 發送 2,Subscriber1 還在處理中處於掛起態,此時replay存儲 2; 3)Emitter 發送 3,此時還沒有任何消費者能消費,則replay存儲 3,將 2 放入extra中; 4)Emitter 想要發送 4,但發現 SharedFlow 的 Buffer 已滿,則按照默認的策略進行掛起等待(默認策略就是 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND); 5)Subscriber2 開始訂閲,接收到replay中的 3,此時 Subscriber1 還是掛起態,Buffer 中數據沒變化,即replay存儲 3,extra存儲 2; 6)Subscriber1 處理完 1 後,依次處理 Buffer 中 的下一個數據,即消費extra中的 2,這時 Buffer 終於有空間了,Emitter 結束掛起,發送 4,replay存儲 4,將 3 放入extra中; 7)Subscriber1 消費完 2 後接着再消費extra` 中的 3,此時 Buffer 中就只有 4 了。後面的就不用多説了

比較繞,需要多看幾次思考一下。需要注意的是,代碼運行結果中下面兩行輸出到底誰先誰後的問題: kotlin 17:32:15.292 28184-28184 System.out com.wen.testdemo I +++emit 4 17:32:15.293 28184-28184 System.out com.wen.testdemo I ++++ sharedFlow1 collected 2 打印出的時間戳幾乎是一樣的,若嚴格按照 log 打印的時間戳順序,應該是 Emitter 先發送的 4,Subscriber1 再才接收到的 2,但根據反覆實踐的結果來看,實際上是 Subscriber1 先接收緩衝區中的 2,等緩衝區有剩餘空間後,Emitter 才結束掛起繼續發送 4. 把上面的例子簡化一下,再改改數據: ```kotlin //code 7 private fun coroutineStudy() { val sharedFlow = MutableSharedFlow(replay = 1, extraBufferCapacity = 1) lifecycleScope.launch { launch { sharedFlow.collect { println("++++ sharedFlow1 collected $it") delay(10000) } }

    launch {
        (1..4).forEach{
            sharedFlow.emit(it)
            println("+++emit $it")
            delay(1000)
        }
    }
}

} 打印結果如下所示,因為把 sharedFlow delay 的時長設置為 10s,所以很明顯地看到 Emitter 在發送 1、2、3 時時間間隔均是 1s,發送 4 時足足過了 8s,這段時間就是 Emitter 被掛起了,一直等到 sharedFlow1 接收到 2 之後,4 才被 Emitter 發送,而 sharedFlow1 的每次接收都是間隔 10s,所以是先接收的 2,再結束掛起發送的 4.kotlin 00:25:52.481 29483-29483/com.example.myapplication I/System.out: +++emit 1 00:25:52.482 29483-29483/com.example.myapplication I/System.out: ++++ sharedFlow1 collected 1 00:25:53.483 29483-29483/com.example.myapplication I/System.out: +++emit 2 00:25:54.486 29483-29483/com.example.myapplication I/System.out: +++emit 3 00:26:02.487 29483-29483/com.example.myapplication I/System.out: +++emit 4 00:26:02.488 29483-29483/com.example.myapplication I/System.out: ++++ sharedFlow1 collected 2 00:26:12.497 29483-29483/com.example.myapplication I/System.out: ++++ sharedFlow1 collected 3 00:26:22.516 29483-29483/com.example.myapplication I/System.out: ++++ sharedFlow1 collected 4 通過源碼也可看出這個結論,從 `collect` 方法進入,最終可以找到實際上是調用了 SharedFlowImpl 中的 `collect` 方法:kotlin //code 8 override suspend fun collect(collector: FlowCollector) { val slot = allocateSlot() try { if (collector is SubscribedFlowCollector) collector.onSubscription() val collectorJob = currentCoroutineContext()[Job] while (true) { var newValue: Any? while (true) { newValue = tryTakeValue(slot) //首先嚐試直接獲取值 if (newValue !== NO_VALUE) break awaitValue(slot) //沒獲取到則只能掛起等待新值到來 } collectorJob?.ensureActive() collector.emit(newValue as T) } } finally { freeSlot(slot) } } `` 在內層while循環中,首先是通過tryTakeValue方法直接取值,如果沒取到則通過awaitValue方法掛起等待新值,awaitValue是個掛起函數。取到新值之後,才會跳出內層while循環,並執行collector.emit(newValue as T),而這一段代碼,實際上就是調用的 code 7 中的sharedFlow.emit(it)` 代碼。

此處源代碼還可以看出,SharedFlow 每次在 emit 之前,確實都會查看所在協程是否還在運行;且它確實是不會停止的,哪怕沒有接收到新值,也會一直處於掛起等待的狀態,想要結束則得使用截斷類型的操作符。

1.3 onBufferOverflow

SharedFlow 構造函數的第三個參數就是設置超過 Buffer 之後的策略,默認是將生產者掛起暫時不再發送數據,即 BufferOverflow.SUSPEND。

還有另外兩個數據丟棄策略:
1)BufferOverflow.DROP_LATEST 丟棄最新數據; 圖 4 BufferOverflow.DROP_LATEST 策略 Emitter 在發送 4 時,因為 Buffer 已滿,所以只能按照策略將最新的數據 4 丟棄。而在發送 3 時,由於 1 已經被消費過,所以可以從 Buffer 中移除,從而騰出存儲空間緩存 3。

2)BufferOverflow.DROP_OLDEST 丟棄最老數據: 圖 5 BufferOverflow.DROP_OLDEST 策略 這個策略就比較簡單,Buffer 中只會存儲最新的數據。不管較老的數據是否被消費,當 Buffer 已滿而又有新的數據到達時,老數據都會從 Buffer 中移除,騰出空間讓給新數據。

注意點:當 replay、extra 都為 0,即沒有 Buffer 的時候,那麼 onBufferOverflow 只能是 BufferOverflow.SUSPEND。丟棄策略啟動的前提是 SharedFlow 至少有 Buffer 且 Buffer 已滿。

1.4 emit 與 tryEmit

由前一節可知,當 SharedFlow 的 Buffer 已滿且 onBufferOverflow 為 BufferOverflow.SUSPEND 的時候,emit 會被掛起(emit 是個掛起函數),但這會影響到 Emitter 的速度。如果不想在發送數據的時候被掛起,除了設置 onBufferOverflow 丟棄策略外,還可以使用 tryEmit 方法。 ```kotlin //code 9 override fun tryEmit(value: T): Boolean { var resumes: Array?> = EMPTY_RESUMES val emitted = synchronized(this) { if (tryEmitLocked(value)) { resumes = findSlotsToResumeLocked(resumes) true } else { false } } for (cont in resumes) cont?.resume(Unit) return emitted }

@Suppress("UNCHECKED_CAST")
private fun tryEmitLocked(value: T): Boolean {
    // Fast path without collectors -> no buffering
    // 1.沒有訂閲者時,直接返回 true,因為沒有人接收,發了也沒用,也不用緩存
    if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
    // With collectors we'll have to buffer
    // 2.有訂閲者,就得考慮緩存發送的值了
    // cannot emit now if buffer is full & blocked by slow collectors
    // 3.如果緩存空間已滿,且訂閲者還在掛起處理上次的數據,則不能 emit
    if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
        when (onBufferOverflow) {
            BufferOverflow.SUSPEND -> return false // will suspend
            BufferOverflow.DROP_LATEST -> return true // just drop incoming
            BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
        }
    }
    // 4.代碼能走到這裏,説明緩存還有空間或丟棄策略為DROP_OLDEST
    enqueueLocked(value)
    bufferSize++ // value was added to buffer
    // drop oldest from the buffer if it became more than bufferCapacity
    if (bufferSize > bufferCapacity) dropOldestLocked()
    // keep replaySize not larger that needed
    if (replaySize > replay) { // increment replayIndex by one
        updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
    }
    return true
}

`` 由代碼可見tryEmit不是一個掛起函數,它有返回值,如果返回 true 則説明發送數據成功了;如果返回 false,則説明這時發送數據需要被掛起等待。其中最主要的就是tryEmitLocked` 方法。

tryEmitLocked 方法主要邏輯已在註釋中説明,需要額外説明的是,bufferCapacity 就是 replay + extraBufferCapacity 的大小;replayIndex 指的是最近開始訂閲的訂閲者在 replay cache 緩存數組中需要重播的最小 index。所以當使用默認構造的 SharedFlow 時,replayextraBufferCapacity 都為 0,如果這時再使用 tryEmit 方法進行發送,則會使得 if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) 判斷為 true,默認的丟棄策略又是 BufferOverflow.SUSPEND,就會導致這裏會直接返回 false,永遠都不會發送出值。所以,在使用默認構造的 SharedFlow 時,不能使用 tryEmit 發送值,否則無法發送。 一般使用 emit 即可。

在 SharedFlow 具體實現中,emit 方法就是先嚐試使用 tryEmit 來發送值,如果不能馬上發送再使用掛起函數 emitSuspend 方法: kotlin //code 10 class SharedFlowImpl override suspend fun emit(value: T) { if (tryEmit(value)) return // fast-path emitSuspend(value) }

2. StateFlow

看完 SharedFlow 再來看 StateFlow 的話就比較簡單了。因為 StateFlow 就是 SharedFlow 的一種特殊子類,特點有三:
1)它的 replay cache 容量為 1;即可緩存最近的一次粘性事件;
2)初始化時必須給它設置一個初始值;
3)每次發送數據都會與上次緩存的數據作比較,如果不一樣才會發送,自動過濾掉沒有發生變化的數據。
它還可直接訪問它自己的 value 參數獲取當前結果值,總體來説,在使用上與 LiveData 相似,下面是它倆的異同點對比。

2.1 與 LiveData 比較的相同點

  1. 均提供了 可讀可寫 和 僅可讀 兩個版本:MutableStateFlow、StateFlow 與 MutableLiveData、LiveData;
  2. 允許被多個觀察者觀察,即生產者對消費者可以為一對多的關係;
  3. 都只會把最新的值給到觀察者,即使沒有觀察者,也會更新自己的值;
  4. 都會產生粘性事件問題;
  5. 都可能產生丟失值的問題;

粘性事件問題:因為 StateFlow 初始化時必須給定初始值,且 replay 為 1,所以每個觀察者進行觀察時,都會收到最近一次的回播數據。如果想避免粘性事件問題,換用 SharedFlow 即可,replay 使用默認值 0 。

值丟失問題:出現在消費者處理數據比生產者生產數據慢的情況,消費者來不及處理數據,就會把之前生產者發送的舊數據丟棄掉,看個例子: ```kotlin //code 11 private fun stateFlowDemo1() { val stateFlow = MutableStateFlow(0) CoroutineScope(Dispatchers.Default).launch { var count = 1 while (true) { val tmp = count++ delay(1000) println("+++++ tmp = $tmp") stateFlow.value = tmp } }

    CoroutineScope(Dispatchers.Default).launch {
        stateFlow.collect{
            println("++++ count = $it")
            delay(5000)  //模擬耗時操作
        }
    }
}

``` 圖 6 StateFlow丟失值log 可以從打印結果看出,StateFlow 會丟棄掉生產者之前發送的值,其實 MutableStateFlow 的丟棄策略就是設置的 BufferOverflow.DROP_OLDEST。

2.2 與 LiveData 比較的不同點

  1. StateFlow 必須在構建的時候傳入初始值,LiveData 不需要;
  2. StateFlow 默認是防抖的,LiveData 默認不防抖;
  3. 對於 Android 來説 StateFlow 默認沒有和生命週期綁定,直接使用會有問題;

StateFlow 默認防抖:即如果發送的值與上次相同,則生產者並不會真正發送。在源碼中也有説明,具體在 StateFlow.kt -> class StateFlowImpl -> private fun updateState -> if (oldState == newState) return true 感興趣的可以自行查閲,我看的版本是 1.5.0.

與 LiveData 相比,沒有和 Activity 的生命週期綁定恐怕是使用 StateFlow 最不方便的地方了。當 View 進入 STOPPED 狀態時,LiveData.observe() 會自動取消註冊使用方,這樣就不會再接收到數據了,也符合常理。因為用户此時已經離開頁面,再接收數據已沒有意義,如果繼續處理後續邏輯可能還會出 bug。

而如果使用的是 StateFlow 或其他數據流,在 View 進入 STOPPED 狀態時,收集數據的操作並不會自動停止。如需實現相同的行為,則需要從 Lifecycle.repeatOnLifecycle 塊收集數據流。如下是來自官方文檔的例子: kotlin //code 12 class LatestNewsActivity : AppCompatActivity() { private val latestNewsViewModel = // getViewModel() override fun onCreate(savedInstanceState: Bundle?) { ... // Start a coroutine in the lifecycle scope lifecycleScope.launch { // repeatOnLifecycle launches the block in a new coroutine every time the // lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED. repeatOnLifecycle(Lifecycle.State.STARTED) { // Trigger the flow and start listening for values. // Note that this happens when lifecycle is STARTED and stops // collecting when the lifecycle is STOPPED latestNewsViewModel.uiState.collect { uiState -> // New value received when (uiState) { is LatestNewsUiState.Success -> showFavoriteNews(uiState.news) is LatestNewsUiState.Error -> showError(uiState.exception) } } } } } } //注意:repeatOnLifecycle API 僅在 androidx.lifecycle:lifecycle-runtime-ktx:2.4.0 庫及更高版本中提供。 英文部分註釋説的比較明確了,repeatOnLifecycle(Lifecycle.State.STARTED) 的作用就是每次進入 STARTED 可見狀態時都會重新觀察並收集數據;而在 STOPPED 狀態時就會 cancel 掉 StateFlow 收集流所在的協程從而停止收集。

總結

最後總結一下 Flow 第二小節的內容吧:
1)熱流有無消費者都可發送數據,生產者和消費者的關係可以是一對多;
2)SharedFlow 可構建熱流,可設置 replay 重播數據量及 extraBufferCapacity 緩衝區大小,以及 onBufferOverflow 緩衝區滿的策略;
3)emittryEmit 發送方法的異同,前者是掛起函數,注意在使用默認構造的 SharedFlow 時不要使用 tryEmit
4)StateFlow 是 SharedFlow 的一個子類,replay = 1,必須給定初始值,自帶防抖;
5)使用 StateFlow 或 SharedFlow 收集值時,記得在 repeatOnLifecycle(Lifecycle.State.STARTED) 方法中,防止出現崩潰等問題。

更多內容,歡迎關注公眾號:修之竹

贊人玫瑰,手留餘香!歡迎點贊、轉發~ 轉發請註明出處~

參考文獻

  1. Reactive Streams on Kotlin: SharedFlow and StateFlow; Ricardo Costeira; http://www.raywenderlich.com/22030171-reactive-streams-on-kotlin-sharedflow-and-stateflow
  2. Kotlin中 Flow、SharedFlow與StateFlow區別;五問;http://juejin.cn/post/7142038525997744141
  3. 一看就懂!圖解 Kotlin SharedFlow 緩存系統;fundroid;http://juejin.cn/post/7156408785886511111
  4. Kotlin:深入理解StateFlow與SharedFlow,StateFlow和LiveData使用差異區分,SharedFlow實現源碼解析; pumpkin的玄學; http://blog.csdn.net/weixin_44235109/article/details/121594988?spm=1001.2014.3001.5502
  5. StateFlow 和 SharedFlow 官方文檔 http://developer.android.google.cn/kotlin/flow/stateflow-and-sharedflow?hl=zh-cn