日常思考,目前Kotlin協程能完全取代Rxjava嗎

語言: CN / TW / HK

theme: channing-cyan highlight: arduino-light


前言

自從jetbrains公司提出Kotlin協程用來解決異步線程問題,並且衍生出來了Flow作為響應式框架,引來了大量Android開發者的青睞;而目前比較穩定的響應式庫當屬Rxjava,這樣以來目的就很明顯了,旨在用Kotlin協程來逐步替代掉Rxjava;

仔細思考下,真的可以完全替代掉Rxjava麼,它的複雜性和多樣化的操作符,而協程的許多API仍然是實驗性的,目前為止,隨着kt不斷地進行版本迭代,越來越趨於穩定,對此我不能妄下斷言;當然Rxjava無疑也是一個非常優秀的框架,值得我們不斷深入思考,但是隨着協程的出現,就個人而言我會更喜歡使用協程來作為滿足日常開發的異步解決方案。

協程的本質和Rxjava是截然不同的,所以直接拿它們進行對比是比較棘手的;換一種思路,本文我們從日常開發中的異步問題出發,分別觀察協程與Rxjava是如何提供相應的解決方案,依次來進行比對,探討下Kotlin協程是否真的足以取代Rxjava這個話題吧

流類型的比較

現在我們來看下Rxjava提供的流類型有哪些,我們可以使用的基本流類型操作符如下圖所示

Rxjava流類型@2x.png

它們的基本實現在下文會提及到,這裏我們簡單來討論下在協程中是怎麼定義這些流操作符的

  • Single<T>其實就是一個返回不可空值的suspend函數

  • Maybe<T>恰好相反,是一個返回可空的supspend函數

  • Completable不會發送事件,所以在協程中就是一個不返回任何東西的簡單掛起函數

  • 對於ObservableFlowable,兩者都可以發射多個事件,不同在於前者是沒有背壓管理的,後者才有,而他們在協程中我們可以直接使用Flow來完成,在異步數據流中按順序發出值,所以只需要一個返回當前Data數據類型的Flow<T>

    值得注意的是,該函數本身是不需要supsend修飾符的,由於Flow是冷流,在進行收集\訂閲之前是不會發射數據,只要在collect的時候才需要協程作用域中執行。為什麼説Flow足以替代ObservableFlowable原因在與它處理背壓(backpressure)的方式。這自然而然來源於協程中的設計與理念,不需要一些巧妙設計的解決方案來處理顯示背壓,Flow中所有Api基本上都帶有suspend修復符,它也成為了解決背壓的關鍵先生。其目的就是在不阻塞線程的情況下暫停調用者的執行,因此,當Flow<T>在同一個協程中發射和收集的時候,如果收集器跟不上數據流,它可以簡單地暫停元素的發射,直到它準備好接收更多。

流類型比較的基本實現

好的小夥伴們,上文我們簡單用協程寫出Rxjava的幾個基本流類型,現在讓我們用幾個詳細的實例來看看他們的不同之處吧

Completable ---- 異步任務完成沒有結果,可能會拋出錯誤

Rxjava中,我們使用Completable.create去創建,裏面的CompletableEmitter中有onComplete表示完成的方法和一個onError傳遞異常的方法,如下代碼所示

//completable in Rxjava    fun completableRequest(): Completable {        return Completable.create { emitter->            try {                emitter.onComplete()           }catch (e:Exception) {                emitter.onError(e)           }       }   }    fun main() {        completableRequest()           .subscribe {                println("I,am done")                println()           }   }

在協程當中,我們對應的就是調用一個不返回任何內容的掛起函數(returns Unit),就類似於我們調用一個普通函數一樣

fun completableCoroutine() = runBlocking {        try {            delay(500L)            println("I am done")       } catch (e: Exception) {            println("Got an exception")       }   }

注意不要在生產環境代碼使用runBlocking,你應該有一個合適的CoroutineScope,由於是測試代碼本文都將使用runBlocking來輔助説明測試場景

Single ---- 必須返回或拋出錯誤的異步任務

RxJava 中,我們使用一個Single ,它裏面有一個onSuccess傳遞返回值的方法和一個onError傳遞異常的方法。

`kotlin /* * Single in RxJava / fun main() {    singleResult()       .subscribe(           { result -> println(result) },           { println("Got an exception") }       ) }

fun singleResult(): Single {    return Single.create { emitter ->        try {            // process a request            emitter.onSuccess("Some result")       } catch (e: Exception) {            emitter.onError(e)       }   } ​ ``` ````

而在協程中,我們調用一個返回非空值的掛起函數:

``` /* * Single equivalent in coroutines / fun main() = runBlocking {    try {        val result = getResult()        println(result)   } catch (e: Exception) {        println("Got an exception")   } }

suspend fun getResult(): String {    // process a request    delay(100)    return "Some result" } ```

Maybe --- 可能返回結果或拋出錯誤的異步任務

RxJava 中,我們使用一個Maybe. 它裏面有一個onSuccess傳遞返回值的方法onComplete,一個在沒有值的情況下發出完成信號的方法,以及一個onError傳遞異常的方法。

``` /* * Maybe in RxJava / fun main() {    maybeResult()       .subscribe(           { result -> println(result) },           { println("Got an exception") },           { println("Completed without a value!") }       ) }

fun maybeResult(): Maybe {    return Maybe.create { emitter ->        try {            // process a request            if (Random.nextBoolean()) {                emitter.onSuccess("Some value")           } else {                emitter.onComplete()           }       } catch (e: Exception) {            emitter.onError(e)       }   } } ```

在協程中,我們調用一個返回可空值得掛起函數

``` /* * Maybe equivalent in coroutines / fun main() = runBlocking {    try {        val result = getNullableResult()        if (result != null) {            println(result)       } else {            println("Completed without a value!")       }   } catch (e: Exception) {        println("Got an exception")   } }

suspend fun getNullableResult(): String? {    // process a request    delay(100)    return if (Random.nextBoolean()) {        "Some value"   } else {        null   } } ```

0..N事件的異步流

由於在Rxjava中,FlowableObservable都是屬於0..N事件的異步流,但是Observable幾乎沒有做相應的背壓管理,所以這裏我們主要以Flowable為例子,onNext發出下一個流值的方法,一個onComplete表示流完成的方法,以及一個onError傳遞異常的方法。

``` /* * Flowable in RxJava / fun main() {    flowableValues()       .subscribe(           { value -> println(value) },           { println("Got an exception") },           { println("I'm done") }       ) }

fun flowableValues(): Flowable {    val flowableEmitter = { emitter: FlowableEmitter ->        try {            for (i in 1..10) {                emitter.onNext(i)           }       } catch (e: Exception) {            emitter.onError(e)       } finally {            emitter.onComplete()       }   }

return Flowable.create(flowableEmitter, BackpressureStrategy.BUFFER) } ```

在協程中,我們只是創建一個Flow就可以完成這個方法

``` /* * Flow in Kotlin / fun main() = runBlocking {    try {        eventFlow().collect { value ->            println(value)       }        println("I'm done")   } catch (e: Exception) {        println("Got an exception")   } }

fun eventFlow() = flow {    for (i in 1..10) {        emit(i)   } } ```

在慣用的 Kotlin 中,創建上述流程的方法之一是:fun eventFlow() = (1..10).asFlow()

如上面這些代碼所見,我們基本可以使用協程涵蓋Rxjava所有的主要基本用法,此外,協程的設計允許我們使用所有標準的Kotlin功能編寫典型的順序代碼 ,它還消除了對onCompleteonError回調的需要。我們可以像在普通代碼中那樣捕獲錯誤或設置協程異常處理程序。並且,考慮到當掛起函數完成時,協程繼續按順序執行,我們可以在下一行繼續編寫我們的“完成邏輯”。

值得注意的是,當我們進行調用collect收集的時候也是如此,在收集完所有元素後才會執行下一行代碼

eventFlow().collect { value ->    println(value) } println("I'm done")

Flow收集完所有元素後,才會調用打印I'm done

操作符的比較

總所周知,Rxjava的主要優勢在於它擁有非常多的操作符,基本上可以應對日常開發中出現的各種情況,由於它種類特別繁多又比較難記憶,這裏我只簡單舉些常見的操作符進行比較

COMPLETABLE,SINGLE, MAYBE

這裏需要強調的是,在RxjavaCompletable,SingleMaybe都有許多相同的操作符,然而在協程中任何類型的操作符其實都是多餘的,我們以Single中的map()簡單操作符為例來看下:

/** * Maps Single<String> to * Single<User> synchronously */ fun main() {    getUsername()       .map { username ->            User(username)       }       .subscribe(           { user -> println(user) },           { println("Got an exception") }       ) }

map作為Rxjava中最常用的操作符,獲取一個值並將其轉換為另一個值,但是在協程中我們不需要.map()操作符就可以實現這種操作

fun main() = runBlocking {    try {        val username = getUsername() // suspend fun        val user = User(username)        println(user)   } catch (e: Exception) {        println("Got an exception")   } }

使用suspend掛起函數可以掛起當前函數,當執行完畢後在按順序執行接下來的代碼

Flow操作符與Rxjava操作符

現在讓我們看看Flow中有哪些操作符,它們與Rxjava相比有什麼不同,由於篇幅原因,這裏我簡單比較下日常開發中最常用的操作符

map()

對於map操作符,Flow中也具有相同的操作符

/** * Maps Flow<String> to Flow<User> */ fun main() = runBlocking {    usernameFlow()       .map { username ->            User(username)       }       .collect { user ->            println(user)       } }

Flow中的map操作符 相當於Rxjava做了一定的簡化處理,這是它的一個主要優勢,可以看下它的源碼

fun <T, R> Flow<T>.map(transform: suspend (T) -> R): Flow<R> = flow {    collect { value -> emit(transform(value)) } }

是不是非常簡單,只是重新創建一個新的flow,它從從上游收集值transform並在當前函數應用後發出這些值;事實上大多數Flow的操作符都是這樣工作的,不需要遵循嚴格的協議;對於大多數應用場景,標準Flow操作符就已經足夠了,當然編寫自定義操作符也是非常簡單容易的;相對於Rxjava,如果想要編寫自定義操作符,你必須非常瞭解Rxjava

Reactive Streams協議

flatmap()

另外,在Rxjava中我們經常使用的操作符還有flatmap(),同時還有很多種變體,例如.flatMapSingle()flatMapObservable(),flatMapIterable()等,簡單來説,在Rxjava中我們如果需要對一個值進行同步轉換,就使用map,進行異步轉換的時候就需要使用flatMap();對此,Flow進行同步或者異步轉換的時候不需要不同的操作符,僅僅使用map就足夠了,由於它們都有supsend掛起函數進行修飾,不用擔心同步性

可以看下在Rxjava中的示例

fun compareFlatMap() {    getUsernames() //Flowable<String>       .flatMapSingle { username ->            getUserFromNetwork(username) // Single<User>       }       .subscribe(           { user -> println(user) },           { println("Got an exception") }       ) }

好的,我們使用Flow來轉換下上述的這一段代碼,只需要使用map就可以以任何方式進行轉換值,如下代碼所示:

runBlocking {        flow {            emit(User("Jacky"))       }.map {            getUserFromName(it) //suspend       }.collect {            println(it)       }   } ​    suspend fun getUserFromName(user: User): String {        return user.userName   }

實際上使用Flow中的map操作符,就可以將上游流發出的值轉換為新流,然後將所有流扁平化為一個,這和flatMap的功能幾乎可以達到同樣的效果

filter()

對於filter操作符,我們在Rxjava中並沒有直接的方法進行異步過濾,這需要我們自己編寫代碼來進行過濾判斷,如下所示

fun getUsernames(): Flowable<String> {    val flowableEmitter = { emitter: FlowableEmitter<String> ->        emitter.onNext("Jacky")   }    return Flowable.create(flowableEmitter, BackpressureStrategy.BUFFER) } ​ fun isCorrectUserName(userName: String): Single<Boolean> {    return Single.create { emitter ->        runCatching {            //名字判斷....            if (userName.isNotEmpty()) {                emitter.onSuccess(true)           } else {                emitter.onSuccess(false)           }       }.onFailure {            emitter.onError(it)       }   } } ​ fun compareFilter() {    getUsernames()//Flowable<String>       .flatMapSingle { userName ->            isCorrectUserName(userName)               .flatMap { isCorrect ->                    if (isCorrect) {                        Single.just(userName)                   } else {                        Single.never()                   }               }       }.subscribe {            println(it)       } ​ }

乍一看,是不是感覺有點麻煩,事實上這確實需要我們使用些小手段才能達到目的;而在Flow中,我們能夠輕鬆地根據同步和異步調用過濾流

runBlocking {        userNameFlow().filter { user ->            isCorrectName(user.userName)       }.collect { user->            println(user)       }   } ​ suspend fun isCorrectName(userName: String): Boolean {    return userName.isNotEmpty() } ​

結語

由於篇幅原因,Rxjava和協程都是一個非常龐大的思考話題,它們之間的不同比較可以永遠進行下去;事實上,在Kotlin協程被廣泛使用之前,Rxjava作為項目中主要的異步解決方案,以至於到現在工作上還有很多項目用着Rxjava, 所以即使切換到Kotlin協程之後,還有相當長一段時間還在用着Rxjava;這並不代表Rxjava不夠好,而是協程讓代碼變得更易讀,更易於使用;

暫時先吿一段落了,事實上證明協程確實能夠滿足我們日常開發的主要需求,下次將會對Rxjava中的背壓和之前所討論的Flow背壓問題進行比較探討,還有非常多的東西要學,共勉!!!!

本文主要內容譯至 -> http://www.javaadvent.com/2021/12/are-kotlin-coroutines-enough-to-replace-rxjava.html