Kotlin 协程的取消机制超详细解读

语言: CN / TW / HK

在 Java 语言中提供了线程中断的能力,但并不是所有的线程都可以中断的,因为 interrupt 方法并不是真正的终止线程,而是将一个标志位标记为中断状态,当运行到下一次中断标志位检查时,才能触发终止线程。

但无论如何,终止线程是一个糟糕的方案,因为在线程的销毁和重建,是要消耗系统资源的,造成了不必要的开销。Kotlin 协程提供了更优雅的取消机制,这也是协程比较核心的功能之一。

协程的状态

在了解取消机制之前我们需要知道一些关于 Job 状态的内容:

| State | isActive(是否活跃) | isCompleted(是否完成) | isCancelled(是否取消) | | ---------------- | -------------- | ----------------- | ----------------- | | New (可选初始状态) | false | false | false | | Active (默认初始状态) | true | false | false | | Completing (短暂态) | true | false | false | | Cancelling (短暂态) | false | false | true | | Cancelled (完成态) | false | true | true | | Completed (完成态) | false | true | false |

可以看出,在完成和取消的过程中,会经过一个短暂的进行中的状态,然后才变成已完成/已取消。

在这里只关注一下取消相关的状态:

  • Cancelling

    抛出异常的 Job 会导致其进入 Cancelling 状态,也可以使用 cancel 方法来随时取消 Job 使其立即转换为 Cancelling 状态。

  • Cancelled

    当它递归取消子项,并等待所有的子项都取消后,该 Job 会进入 Cancelled 状态。

取消协程的用法

协程在代码中抽象的类型是 Job , 下面是一个官方的代码示例,用来展示如何取消协程的执行:

suspend fun main(): Unit = coroutineScope {     val job = launch {         repeat(1000) { i ->             println("job: I'm sleeping $i ...")             delay(500L)         }     }     delay(1300L) // delay a bit     println("main: I'm tired of waiting!")     job.cancel() // cancels the job     job.join() // waits for job's completion      println("main: Now I can quit.") }

它的输出是:

job: I'm sleeping 0 ... job: I'm sleeping 1 ... job: I'm sleeping 2 ... main: I'm tired of waiting! main: Now I can quit.

一旦 mian 方法中调用了 job.cancel() ,我们就看不到其他协程的任何输出,因为它已被取消了。

协程取消的有效性

协程代码必须通过与挂起函数的配合才能被取消。kotlinx.coroutines 中所有挂起函数(带有 suspend 关键字函数)都是可以被取消的。suspend 函数会检查协程是否需要取消并在取消时抛出 CancellationException

但是,如果协程在运行过程中没有挂起点,则不能取消协程,如下例所示:

suspend fun main(): Unit = coroutineScope {     val startTime = System.currentTimeMillis()     val job = launch(Dispatchers.Default) {         var nextPrintTime = startTime         var i = 0         while (i < 5) { // computation loop, just wastes CPU             // print a message twice a second             if (System.currentTimeMillis() >= nextPrintTime) {                 println("job: I'm sleeping ${i++} ...")                 nextPrintTime += 500L             }         }     }     delay(1300L) // delay a bit     println("main: I'm tired of waiting!")     job.cancelAndJoin() // cancels the job and waits for its completion     println("main: Now I can quit.") }

在这个 job 中,并没有执行任何 suspend 函数,所以在执行过程中并没有对协程是否需要取消进行检查,自然也就无法触发取消。

同样的问题也可以在通过 捕获 CancellationException 并且不抛出的情况下 观察到:

suspend fun main(): Unit = coroutineScope {     val job = launch(Dispatchers.Default) {         repeat(5) { i ->             try {                 // print a message twice a second                 println("job: I'm sleeping $i ...")                 delay(500)             } catch (e: Exception) {                 // log the exception                 println(e)             }         }     }     delay(1300L) // delay a bit     println("main: I'm tired of waiting!")     job.cancelAndJoin() // cancels the job and waits for its completion     println("main: Now I can quit.") }

打印结果是:

job: I'm sleeping 0 ... job: I'm sleeping 1 ... job: I'm sleeping 2 ... main: I'm tired of waiting! kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@614acfe9 job: I'm sleeping 3 ... kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@614acfe9 job: I'm sleeping 4 ... kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@614acfe9 main: Now I can quit.

从打印结果来看,循环 5 次全部执行了,好像取消并没有起到作用。但实际上不是这样的,为了便于观察加上时间戳:

1665217217682: job: I'm sleeping 0 ... 1665217218196: job: I'm sleeping 1 ... 1665217218697: job: I'm sleeping 2 ... 1665217218996: main: I'm tired of waiting! 1665217219000: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@3a1efc0d 1665217219000: job: I'm sleeping 3 ... 1665217219000: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@3a1efc0d 1665217219000: job: I'm sleeping 4 ... 1665217219000: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@3a1efc0d 1665217219001: main: Now I can quit.

加上时间可以看出,抛出第一次异常后的两次循环和异常捕获都是在同一瞬间完成的。这说明了捕获到异常后,仍然会执行代码,但是所有的 delay 方法都没有生效,即该 Job 的所有子 Job 都失效了。但该 Job 仍在继续循环打印。原因是,父 Job 会等所有子 Job 处理结束后才能完成取消。

而如果我们不使用 try-catch 呢?

suspend fun main(): Unit = coroutineScope {     val job = launch(Dispatchers.Default) {         repeat(5) { i ->             // print a message twice a second             println("job: I'm sleeping $i ...")             delay(500)         }     }     delay(1300L) // delay a bit     println("main: I'm tired of waiting!")     job.cancelAndJoin() // cancels the job and waits for its completion     println("main: Now I can quit.") }

打印结果:

job: I'm sleeping 0 ... job: I'm sleeping 1 ... job: I'm sleeping 2 ... main: I'm tired of waiting! main: Now I can quit.

很顺利的取消了,这是因为协程抛出 Exception 直接终止了。

注意协程抛出 CancellationException 并不会导致 App Crash 。

使用 try-catch 来捕获 CancellationException 时需要注意,在挂起函数前的代码逻辑仍会多次执行,从而导致这部分代码仿佛没有被取消一样。

如何写出可以取消的代码

有两种方法可以使代码是可取消的。第一种方法是定期调用挂起函数,检查是否取消,就是上面的例子中的方法;另一个是显式检查取消状态:

suspend fun main(): Unit = coroutineScope {     val startTime = System.currentTimeMillis()     val job = launch(Dispatchers.Default) {         var nextPrintTime = startTime         var i = 0         while (isActive) { // cancellable computation loop             // print a message twice a second             if (System.currentTimeMillis() >= nextPrintTime) {                 println("job: I'm sleeping ${i++} ...")                 nextPrintTime += 500L             }         }     }     delay(1300L) // delay a bit     println("main: I'm tired of waiting!")     job.cancelAndJoin() // cancels the job and waits for its completion     println("main: Now I can quit.") }

将上面的循环 5 次通过使用 while (isActive) 进行替换,实现显示检查取消的代码。isActive 是通过 CoroutineScope 对象在协程内部可用的扩展属性。

在 finally 中释放资源

在前面的例子中我们使用 try-catch 捕获 CancellationException 发现会产生父协程等待所有子协程完成后才能完成,所以建议不用 try-catch 而是 try{…} finally{…} ,让父协程在被取消时正常执行终结操作:

val job = launch {     try {         repeat(1000) { i ->             println("job: I'm sleeping $i ...")             delay(500L)         }     } finally {         println("job: I'm running finally")     } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.")

join 和 cancelAndJoin 都要等待所有终结操作完成,所以上面的例子产生了以下输出:

job: I'm sleeping 0 ... job: I'm sleeping 1 ... job: I'm sleeping 2 ... main: I'm tired of waiting! job: I'm running finally main: Now I can quit.

使用不可取消的 block

如果在在上面的示例的 finally 代码块中使用 suspend 函数,会导致抛出 CancellationException 。

因为运行这些代码的协程已经被取消了。通常情况下这不会有任何问题,然而,在极少数情况下,如果你需要在 finally 中使用一个挂起函数,你可以通过使用 withContext(NonCancellable) { ... }

val job = launch {     try {         repeat(1000) { i ->             println("job: I'm sleeping $i ...")             delay(500L)         }     } finally {         withContext(NonCancellable) {             println("job: I'm running finally")             delay(1000L)             println("job: And I've just delayed for 1 sec because I'm non-cancellable")         }     } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.")

CancellationException

在上面的内容中,我们知道协程的取消是通过抛出 CancellationException 来进行的,神奇的是抛出 Exception 并没有导致应用程序 Crash 。

CancellationException 的真实实现是 j.u.c. 中的 CancellationException :

public actual typealias CancellationException = java.util.concurrent.CancellationException

如果协程的 Job 被取消,则由可取消的挂起函数抛出 CancellationException 。它表示协程的正常取消。在默认的 CoroutineExceptionHandler 下,它不会打印到控制台/日志。

上面引用了这个类的注释,看来处理抛出异常的逻辑在 CoroutineExceptionHandler 中:

``` public interface CoroutineExceptionHandler : CoroutineContext.Element {     /*      * Key for [CoroutineExceptionHandler] instance in the coroutine context.      /     public companion object Key : CoroutineContext.Key

/*      * Handles uncaught [exception] in the given [context]. It is invoked      * if coroutine has an uncaught exception.      /     public fun handleException(context: CoroutineContext, exception: Throwable) } ```

通常,未捕获的 Exception 只能由使用协程构建器的根协程产生。所有子协程都将异常的处理委托给他们的父协程,父协程也委托给它自身的父协程,直到委托给根协程处理。所以在子协程中的 CoroutineExceptionHandler 永远不会被使用。

使用 SupervisorJob 运行的协程不会将异常传递给它们的父协程,SupervisorJob 被视为根协程。

使用 async 创建的协程总是捕获它的所有异常通过结果 Deferred 对象回调出去,因此它不能导致未捕获的异常。

CoroutineExceptionHandler 用于记录异常、显示某种类型的错误消息、终止和/或重新启动应用程序。

如果需要在代码的特定部分处理异常,建议在协程中的相应代码周围使用 try-catch。通过这种方式,您可以阻止异常协程的完成(异常现在被捕获),重试操作,和/或采取其他任意操作。 这也就是我们前面论证的在协程中使用 try-catch 导致的取消失效。

默认情况下,如果协程没有配置用于处理异常的 Handler ,未捕获的异常将按以下方式处理:

  • 如果 exception 是 CancellationException ,那么它将被忽略(因为这是取消正在运行的协程的假定机制)。

  • 其他情况:

    • 如果上下文中有一个 Job,那么调用 job.cancel()
    • 否则,通过 ServiceLoader 找到的 CoroutineExceptionHandler 的所有实例并调用当前线程的 Thread.uncaughtExceptionHandler 来处理异常。

超时取消

取消协程执行的最合适的应用场景是它的执行时间超过了规定的最大时间时自动取消任务。在 Kotlin 协程库中提供了 withTimeout 方法来实现这个功能:

withTimeout(1300L) {     repeat(1000) { i ->         println("I'm sleeping $i ...")         delay(500L)     } }

执行结果:

I'm sleeping 0 ... I'm sleeping 1 ... I'm sleeping 2 ... Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms

TimeoutCancellationException 是 CancellationException 的子类,TimeoutCancellationException 通过 withTimeout 函数抛出。

在本例中,我们在main函数中使用了withTimeout ,运行过程中会导致 Crash 。

有两种解决办法,就是使用 try{…} catch (e: TimeoutCancellationException){…} 代码块;另一种办法是使用在超时的情况下不是抛出异常而是返回 null 的 withTimeoutOrNull 函数:

val result = withTimeoutOrNull(1300L) {     repeat(1000) { i ->         println("I'm sleeping $i ...")         delay(500L)     }     "Done" // will get cancelled before it produces this result } println("Result is $result")

打印结果:

I'm sleeping 0 ... I'm sleeping 1 ... I'm sleeping 2 ... Result is null

异步的超时和资源

withTimeout 中的超时事件相对于在其块中运行的代码是异步的,并且可能在任何时间发生,甚至在从超时块内部返回之前。如果你在块内部打开或获取一些资源,需要关闭或释放到块外部。

例如,在这里,我们用 Resource 类模拟一个可关闭资源,它只是通过对获得的计数器递增,并对该计数器从其关闭函数递减来跟踪创建次数。让我们用小超时运行大量的协程,尝试在一段延迟后从withTimeout块内部获取这个资源,并从外部释放它。

``` var acquired = 0

class Resource {     init { acquired++ } // Acquire the resource     fun close() { acquired-- } // Release the resource }

fun main() {     runBlocking {         repeat(100_000) { // Launch 100K coroutines             launch {                  val resource = withTimeout(60) { // Timeout of 60 ms                     delay(50) // Delay for 50 ms                     Resource() // Acquire a resource and return it from withTimeout block                      }                 resource.close() // Release the resource             }         }     }     // Outside of runBlocking all coroutines have completed     println(acquired) // Print the number of resources still acquired } ```

如果运行上面的代码,您将看到它并不总是打印 0,尽管它可能取决于您的机器的时间,在本例中您可能需要调整超时以实际看到非零值。

要解决这个问题,可以在变量中存储对资源的引用,而不是从withTimeout块返回它。

fun main() {     runBlocking {         repeat(100_000) { // Launch 100K coroutines             launch {                 var resource: Resource? = null // Not acquired yet                 try {                     withTimeout(60) { // Timeout of 60 ms                         delay(50) // Delay for 50 ms                         resource = Resource() // Store a resource to the variable if acquired                     }                     // We can do something else with the resource here                 } finally {                     resource?.close() // Release the resource if it was acquired                 }             }         }     } // Outside of runBlocking all coroutines have completed     println(acquired) // Print the number of resources still acquired }

这样这个例子总是输出0。资源不会泄漏。

取消检查的底层原理

在探索协程取消的有效性时,我们知道协程代码必须通过与挂起函数的配合才能被取消。

kotlinx.coroutines 中所有挂起函数(带有 suspend 关键字函数)都是可以被取消的。suspend 函数会检查协程是否需要取消并在取消时抛出 CancellationException 。

关于协程的取消机制,很明显和 suspend 关键字有关。为了测试 suspend 关键字的作用,实现下面的代码:

class Solution {     suspend fun func(): String {         return "测试 suspend 关键字"     } }

作为对照组,另一个是不加 suspend 关键字的 func 方法:

class Solution {     fun func(): String {         return "测试 suspend 关键字"     } }

两者反编译成 Java :

``` // 普通的方法 public final class Solution {     public static final int $stable = LiveLiterals$SolutionKt.INSTANCE.Int$class-Solution();

@NotNull     public final String func() {         return LiveLiterals$SolutionKt.INSTANCE.String$fun-func$class-Solution();     } }

// 带有 suspend 关键字的方法 public final class Solution {     public static final int $stable = LiveLiterals$SolutionKt.INSTANCE.Int$class-Solution();

@Nullable     public final Object func(@NotNull Continuation<? super String> $completion) {         return LiveLiterals$SolutionKt.INSTANCE.String$fun-func$class-Solution();     } } ```

suspend 关键字修饰的方法反编译后默认生成了带有 Continuation 参数的方法。说明 suspend 关键字的玄机在 Continuation 类中。

Continuation 是 Kotlin 协程的核心思想 Continuation-Passing Style 的实现。原理参考简述协程的底层实现原理

通过在普通函数的参数中增加一个 Continuation 参数,这个 continuation 的性质类似于一个 lambda 对象,将方法的返回值类型传递到这个 lambda 代码块中。

什么意思呢?就是本来这个方法的返回类型直接 return 出来的:

val a: String = func() print(a)

而经过 suspend 修饰,代码变成了这个样子:

func { a ->     print(a) }

Kotlin 协程就是通过这样的包装,将比如 launch 方法,实际上是 launch 最后一个参数接收的是 lambda 参数。也就是把外部逻辑传递给函数内部执行。

回过头来再来理解 suspend 关键字,我们知道带有 suspend 关键字的方法会对协程的取消进行检查,从而取消协程的执行。从这个能力上来看,我理解他应该会自动生成类似下面的逻辑代码:

生成的函数 {     if(!当前协程.isActive) {         throw CancellationException()     }     // ... 这里是函数真实逻辑 }

suspend 修饰的函数,会自动生成一个挂起点,来检查协程是否应该被挂起。

显然 Continuation 中声明的函数也证实了挂起的功能:

``` public interface Continuation {     /*      * The context of the coroutine that corresponds to this continuation.      /     public val context: CoroutineContext

/*      * 恢复相应协程的执行,将成功或失败的结果作为最后一个挂起点的返回值传递。      /     public fun resumeWith(result: Result) } ```

协程本质上是产生了一个 switch 语句,每个挂起点之间的逻辑都是一个 case 分支的逻辑。参考 协程是如何实现的 中的例子:

```         Function1 lambda = (Function1)(new Function1((Continuation)null) {             int label;

@Nullable             public final Object invokeSuspend(@NotNull Object $result) {                 byte text;                 @BlockTag1: {                     Object result;                     @BlockTag2: {                         result = IntrinsicsKt.getCOROUTINE_SUSPENDED();                         switch(this.label) {                             case 0:                                 ResultKt.throwOnFailure($result);                                 this.label = 1;                                 if (SuspendTestKt.dummy(this) == result) {                                     return result;                                 }                                 break;                             case 1:                                 ResultKt.throwOnFailure($result);                                 break;                             case 2:                                 ResultKt.throwOnFailure($result);                                 break @BlockTag2;                             case 3:                                 ResultKt.throwOnFailure($result);                                 break @BlockTag1;                             default:                                 throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");                         }

text = 1;                         System.out.println(text);                         this.label = 2;                         if (SuspendTestKt.dummy(this) == result) {                             return result;                         }                     }

text = 2;                     System.out.println(text);                     this.label = 3;                     if (SuspendTestKt.dummy(this) == result) {                         return result;                     }                 }                 text = 3;                 System.out.println(text);                 return Unit.INSTANCE;             }

@NotNull             public final Continuation create(@NotNull Continuation completion) {                 Intrinsics.checkNotNullParameter(completion, "completion");                 Function1 funcation = new (completion);                 return funcation;             }

public final Object invoke(Object object) {                 return (()this.create((Continuation)object)).invokeSuspend(Unit.INSTANCE);             }         }); ```

可以看出,在每个分支都会执行一次 ResultKt.throwOnFailure($result);,从名字上就知道,这就是检查是否需要取消并抛出异常的代码所在:

@PublishedApi @SinceKotlin("1.3") internal fun Result<*>.throwOnFailure() {     if (value is Result.Failure) throw value.exception }

这里的 Result 类是一个包装类,它将成功的结果封装为类型 T 的值,或将失败的结果封装为带有任意Throwable异常的值。

```         @Suppress("INAPPLICABLE_JVM_NAME")         @InlineOnly         @JvmName("success")         public inline fun  success(value: T): Result =             Result(value)

/*          * Returns an instance that encapsulates the given [Throwable] [exception] as failure.          /         @Suppress("INAPPLICABLE_JVM_NAME")         @InlineOnly         @JvmName("failure")         public inline fun  failure(exception: Throwable): Result =             Result(createFailure(exception)) ```

成功和失败的方法类型是不一样的,证实了这一点,success 方法接收类型为 T 的参数;failure 接收 Throwable 类型的参数。

到这里 suspend 方法挂起的原理就明了了:在协程的状态机中,通过挂起点会分割出不同的状态,对每一个状态,会先进行挂起结果的检查。 这会导致以下结果:

  • 协程的取消机制是通过挂起函数的挂起点检查来进行取消检查的。证实了为什么如果没有 suspend 函数(本质是挂起点),协程的取消就不会生效。
  • 协程的取消机制是需要函数合作的,就是通过 suspend 函数来增加取消检查的时机。
  • 父协程会执行完所有的子协程(挂起函数),因为代码的本质是一个循环执行 switch 语句,当一个子协程(或挂起函数)执行结束,会继续执行到下一个分支。但是最后一个挂起点后续的代码并不会被执行,因为最后一个挂起点检查到失败,不会继续跳到最后的 label 分支。