Kotlin | 深入理解协程

  • Post author:
  • Post category:其他


本文已同步发表于我的微信公众号,搜索

代码说

即可关注,欢迎与我沟通交流。



Kotlin协程


协程由程序自己创建和调度,不需要操作系统调度,所以协程比线程更加轻量。相比于线程的切换,协程切换开销更小,速度更快

我们知道线程是

CPU

调度的基本单位,而协程不能独立存在,必须依赖于线程。在

Kotlin

中,

Dispatcher(内部是线程池或者Android主线程)

是调度器,可以调度协程运行在

一个或多个线程

之中。

一个线程中可以有多个协程,同一个协程可以运行在一个线程的不同时刻;多个协程可以运行在一个或多个线程的不同时刻

。进程、线程、协程三者的关系:

进程、线程、协程


  • 进程:线程 = 1:N

  • 线程:协程 = 1:N


协程优势:


  • 更安全的代码



    kotlin

    中提供了许多语言功能,避免

    Java

    中最常见的

    null

    空指针等异常

  • 语法简洁、富有表现力

    :相比于

    Java



    kotlin

    可以使用更少的代码实现更多的功能。

  • 可互操作

    :与

    Java

    语言无缝互通。即可以在

    kotlin

    代码中调用

    Java

    代码,同时也可以在

    Java

    代码中调用

    kotlin

    代码。

    kotlin

    代码本质上也是通过

    kotlin

    编译器编译后生成

    VM

    能识别的字节码。

  • 结构化并发



    使用看似阻塞式的写法来实现异步功能

    。即可以同步的方式去写异步代码,相比于回调方式大幅简化了后台任务管理,例如网络请求、数据库访问等任务的管理。



非阻塞式挂起


协程在常规函数的基础上添加了两项操作,用于处理长时间运行的任务

。在

invoke(或call)



return

之外,协程添加了

suspend

非阻塞式挂起 和

resume

恢复:


  • suspend

    用于暂停执行当前协程,并保存所有局部变量。

  • resume

    用于让已挂起的协程从挂起处恢复执行。

注意

suspend

函数不能在普通函数中调用,否则会报

Suspend function 'xxx' should be called only from a coroutine or another suspend function

的提示;如需调用

suspend

函数,只能从其他

suspend

函数进行调用,或通过使用协程构建器(例如

launch

)来启动新的协程。



suspend

函数相比于普通函数内部多了一个

Continuation

续体实例(Kotlin转成Java代码之后即可看到),

suspend

函数中可以调用普通函数,但普通函数却不能调用

suspend

函数

 suspend fun fetchDocs() {                             // Dispatchers.Main
    val result = get("https://developer.android.com") // Dispatchers.IO for `get`
    show(result)                                      // Dispatchers.Main
}

suspend fun get(url: String) = withContext(Dispatchers.IO) { /* ... */ }


suspend

修饰的函数为非阻塞式挂起函数,何为非阻塞式挂起呢?不同于

Java

中的

Thread.sleep()

会阻塞当前线程,


suspend

修饰的函数当遇到启动子线程操作时,会在切线程时将协程挂起并记录当前挂起点,接着主线程暂停了当前协程,但主线程依然可以去执行协程之外逻辑而不会被阻塞;此时协程中启动的子线程也可以继续执行(相当于兵分两路,互不影响),当子线程中的逻辑执行完毕后,会自动从协程的挂起点恢复,这样就可以继续在主线程往下执行了

。整体流程如下:

执行suspend函数 -> 启动子线程 -> 函数挂起并记录挂起点 -> 协程暂停(主线程依然是活跃的) -> 子线程执行完毕 -> 协程从挂起点恢复

如在上面的示例中,

get()

在主线程中调用,内部在它启动网络请求之前挂起协程。虽然协程被挂起,但主线程并没有被阻塞,此时如果主线程中收到其他消息事件依然可以去处理(如点击事件等)。当网络请求完成时,

get

会恢复已挂起的协程,继续执行

show(result)

方法。这里注意一下,

挂起函数并不一定会导致协程挂起,只有在发生异步调用时才会挂起


Kotlin

使用

堆栈帧

管理要运行哪个函数以及所有局部变量。挂起协程时,系统会复制并保存当前的堆栈帧以供稍后使用。恢复时,会将堆栈帧从其保存位置复制回来,然后函数再次开始运行。即使代码可能看起来像普通的顺序阻塞请求,协程也能确保网络请求避免阻塞主线程。



CPS变换 + Continuation续体 + 状态机


CPS变换 (Continuation-Passing-Style Transformation)

是一种编程风格, 用来将内部要执行的逻辑封装到一个闭包里面, 然后再返回给调用者。上一节的例子中,协程就是通过传递

Continuation

来控制异步调用流程的:

将函数挂起之后执行的逻辑包装起成一个

Continuation

, 里面包含了挂起点信息,这样当协程恢复时就可以在挂起点继续执行


Continuation

意为续体,只存在于挂起函数中。

Kotlin

中当一个普通函数加上

suspend

关键字之后,就成为了挂起函数,如:

private suspend fun suspendFuc() {}

我们对此函数进行反编译,查看对应的Java代码为:

private final Object suspendFuc(Continuation $completion) {
      return Unit.INSTANCE;
}

可以看到

Java

代码中系统帮我们多生成了一个

Continuation

类型的参数,看下这个参数内部:

@SinceKotlin("1.3")
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}


Continuation

是一个接口类型,表示在返回

T类型值的挂起点

之后的延续,其中类型T代表的是原来函数的返回值类型。

resumeWith

恢复执行相应协程,并传递一个结果作为最后一个挂起点的返回值。


  • 状态机

上述函数中加入一个

delay

函数,

delay

函数本身也是一个挂起函数:

    private suspend fun suspendFuc(): String {
        delay(1000)
        return "value"
    }

经过

Tools -> Kotlin -> Show Kotlin Bytecode

反编译查看:

   private final Object suspendFuc(Continuation var1) {
      Object $continuation;
      label20: {
         if (var1 instanceof <undefinedtype>) {
            $continuation = (<undefinedtype>)var1;
            if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
               ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
               break label20;
            }
         }
          
          //将要执行的Continuation逻辑传入ContinuationImpl中
         $continuation = new ContinuationImpl(var1) {
            // $FF: synthetic field
            Object result;
            int label;

             //invokeSuspend()会在恢复协程挂起点时调用
            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return CoroutineBaseFragment.this.suspendFuc(this);
            }
         };
      }

      Object $result = ((<undefinedtype>)$continuation).result;
      //返回此状态意味着函数要被挂起
      Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      
      // 状态机逻辑,通过label进行分块执行
      switch(((<undefinedtype>)$continuation).label) {
      case 0:
         ResultKt.throwOnFailure($result);
         ((<undefinedtype>)$continuation).label = 1;
         if (DelayKt.delay(1000L, (Continuation)$continuation) == var4) {
            return var4;
         }
         break;
      case 1:
         ResultKt.throwOnFailure($result);
         break;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }
      return "value";
   }


suspend

挂起函数经过

Kotlin

编译器编译之后会进行

CPS

变换,并且函数里的逻辑会进行分块执行:

分块逻辑数量 = 挂起函数数量 + 1

,上述函数逻辑中,通过

switch(label){}

状态机将逻辑分块执行,首先

label = 0

,此时会先将

label

置为1,接着调用了

delay

挂起函数,返回

Intrinsics.COROUTINE_SUSPENDED

意味着函数要被挂起;

当函数从协程挂起点恢复执行时,会调用

$continuation#invokeSuspend(Object $result)

方法恢复执行,

$result

是上一次函数执行的结果,以参数形式带入之后,就可以从挂起点继续执行了。

此时

label变为1



ResultKt.throwOnFailure($result)

中通过

if (value is Result.Failure) throw value.exception

判断上面的分块结果是否有异常,如果有直接抛异常;否则直接

break

,执行到最后的

return "value"

逻辑,这样整个方法的流程就运行完毕了。


总结:

suspend

挂起函数的执行流程就是通过

CPS变换 + Continuation + 状态机

来运转的



CoroutineContext

我们知道

Android

中的

Context

(子类有

Application、Activity、Service

)可以获取应用资源,可以启动

Activity



Service

等。

CoroutineContext

意为协程上下文,其作用可以类比

Context

,通过它可以控制协程

在哪个线程中执行、捕获协程异常、设置协程名称

等。

public interface CoroutineContext {
    //从该上下文中返回具有给定[key]的元素 或 null 
    public operator fun <E : Element> get(key: Key<E>): E?

    //从initial开始累加上下文中的条目,并从左到右对当前累加器值和上下文中的每个元素应用operation
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R

    //返回一个上下文,其中包含来自此上下文的元素和来自其他上下文context的元素。具有相同键的元素会被覆盖。
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }

    //删除带有指定[key]的元素。
    public fun minusKey(key: Key<*>): CoroutineContext

    /**
     * Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
     */
    public interface Key<E : Element>

    //CoroutineContext的一个元素。协程上下文的一个元素本身就是一个单例上下文。
    public interface Element : CoroutineContext {
        /**
         * A key of this coroutine context element.
         */
        public val key: Key<*>

        public override operator fun <E : Element> get(key: Key<E>): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)

        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }
}

协程总是会运行在

CoroutineContext

类型的上下文中,其中

plus

操作符被重写,多个

CoroutineContext

相加时,会生成一个

CombinedContext

( 内部可以理解成一个

Element

都不同的链表);同时

CombinedContext

拥有

map

索引能力,集合中的每个元素都有一个唯一的

Key



继承关系

协程上下文


Element

定义在

CoroutineContext

内部,是它的内部接口。

CoroutineContext

使用以下元素集定义协程的行为:


  • Job

    :控制协程的生命周期。

  • CoroutineDispatcher

    :将工作分派到适当的线程。

  • CoroutineName

    :协程的名称,可用于调试。

  • CoroutineExceptionHandler

    :处理未捕获的异常。

下面就来详细看看几种元素的使用。



CoroutineContext几种具体实现



1、Job & SupervisorJob



Job

是协程的句柄。使用

launch



async

创建的协程都会返回一个

Job

实例,该实例是对应协程的唯一标识并可以管理协程的生命周期

。如:

val job1 = GlobalScope.launch { }
val job2 = MainScope().launch { }

override fun onDestroy() {
    super.onDestroy()
    job1.cancel()
    job2.cancel()
}

注:上面的协程启动方式并不推荐在项目中直接使用,因为生命周期比较长,如果没有主动关闭可能就会产生内存泄漏。推荐在

ViewModel

中使用

viewModelScope



LifecycleOwner

中使用

lifecycleScope

,可以在各自的生命周期中自动关闭协程。


Job

常用的

API

  val job = GlobalScope.launch(
      context = Job() + Dispatchers.Main,
      start = CoroutineStart.LAZY) { // 逻辑部分
   }
    
  -  job.start()  // 对应 start = CoroutineStart.LAZY
  -  job.cancelAndJoin()  //cancel() + join()
  -  job.cancel() // 取消
  -  job.isActive // 协程是否存活
  -  job.isCancelled // 协程是否被取消
  -  job.isCompleted //协程是否已经执行完毕


Job

有以下状态:

State isActive isCompleted isCancelled
New (optional initial state) false false false
Active (default initial state) true false false
Completing (transient state) true false false
Cancelling (transient state) false false true
Cancelled (final state) false true true
Completed (final state) false true false

状态流转:

                                          wait children
    +-----+ start  +--------+ complete   +-------------+  finish  +-----------+
    | New | -----> | Active | ---------> | Completing  | -------> | Completed |
    +-----+        +--------+            +-------------+          +-----------+
                     |  cancel / fail       |
                     |     +----------------+
                     |     |
                     V     V
                 +------------+                           finish  +-----------+
                 | Cancelling | --------------------------------> | Cancelled |
                 +------------+                                   +-----------+


SupervisorJob的使用

val exceptionHandler =
                CoroutineExceptionHandler { context, throwable -> log("throwable:$throwable") }

GlobalScope.launch(exceptionHandler) {
   //子Job1
    launch(SupervisorJob()) {
        delay(200)
        throw NullPointerException()
     }
   //子Job2
    launch {
        delay(300)
        log("child2 execute successfully")
    }
    //子Job3
    launch {
        delay(400)
        log("child3 execute successfully")
    }
    log("parent execute successfully")
}

注意子

Job1

传入的是

SupervisorJob

,当发生异常时,兄弟协程(

job2/job3

)不会被取消;如果是默认配置,那么兄弟协程也会被取消,上述代码执行结果:

2022-08-30 23:31:30.142  E/TTT: parent execute successfully
2022-08-30 23:31:30.346  E/TTT: throwable:java.lang.NullPointerException
2022-08-30 23:31:30.446  E/TTT: child2 execute successfully
2022-08-30 23:31:30.545  E/TTT: child3 execute successfully

如果将

job1

中的更换为默认时,执行结果为:

2022-08-30 23:31:30.142  E/TTT: parent execute successfully
2022-08-30 23:31:30.346  E/TTT: throwable:java.lang.NullPointerException

可以看到

job1

中使用默认设置时,当

job1

发生了异常,兄弟协程也会被取消。注意这里

job2/job3

可以被取消是因为

delay()

中有对取消状态的判断,

通知job2/job3取消可以类比Thread.interrupt(),interrupt只是通知线程要中断并设置一个中断状态,最终要不要中断还是线程自己说了算

。所以如果改成以下代码,即使

job1

使用默认配置,

job3

也不会被取消:

//子Job3
 launch(Dispatchers.IO) {
     Thread.sleep(400)
     log("child3 execute successfully")
 }


job3

中没有了对协程状态的判断,所以即使

job3

被通知要取消协程了,依然会继续执行直到结束。


SuperviorJob源码浅析

public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)

public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)

private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
    override fun childCancelled(cause: Throwable): Boolean = false
}

当子协程

job1/job2/job3

启动时,会和协程本身的

Job

形成父子关系。而当子协程抛异常时,

父Job



childCancelled()

方法会被执行,且默认返回的是

true

,表示取消

父Job

及其所有

子Job

;如果

子Job

中使用的是

SuperviorJob



childCancelled()

返回的是

false

,表示

父Job

及其未发生异常的

子Job

都不会受影响。



2、CoroutineDispatcher
public actual object Dispatchers {
   @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
    
    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
    
    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
    
    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
}

将工作任务分派到适当的线程。


  • Dispatchers.Main

    :运行在

    UI

    线程中。

    Dispatchers.Main.immediate

    : 如果在UI线程加载,不会做特殊处理;如果是在子线程,会通过

    Handler

    转发到主线程

  • Dispatchers.IO

    : 执行

    IO密集型

    任务,最多提交

    max(64, CPU核心数)

    个任务执行。

  • Dispatchers.DEFAULT

    :执行

    CPU密集

    型任务,

    CoroutineScheduler

    最多有

    corePoolSize

    个线程被创建,

    corePoolSize

    的取值为

    max(2, CPU核心数)

    ,即它会尽量的等于

    CPU

    核心数

  • Dispatchers.Unconfined

    :不给协程指定运行的线程,由启动协程的线程决定;但当被挂起后, 会由恢复协程的线程继续执行。内部通过

    ThreadLocal

    保存执行协程时对应的线程,用于恢复协程时在取出对应线程并在其继续执行协程。


withContext


withContext()

可以在不引入回调的情况下控制任何代码行的线程池,因此可以将其应用于非常小的函数,例如从数据库中读取数据或执行网络请求。一种不错的做法是使用

withContext()

来确保每个函数都是主线程安全的,这意味着,可以从主线程调用每个函数。这样,调用方就从不需要考虑应该使用哪个线程来执行函数了。



3、CoroutineName

协程的名称,可用于调试。



4、CoroutineExceptionHandler
public interface CoroutineExceptionHandler : CoroutineContext.Element {
  
    public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>

    //当有未处理的异常发生时,该方法就会执行
    public fun handleException(context: CoroutineContext, exception: Throwable)
}


CoroutineExceptionHandler

当有未捕获的异常时就会触发执行。如果不在顶级协程中设置,那么当有异常发生时会导致程序的

crash

,可以通过自定义

CoroutineExceptionHandler

来拦截异常,如下:

val exceptionHandler =
                CoroutineExceptionHandler { context, throwable -> log("throwable:$throwable") }
lifecycleScope.launch(exceptionHandler) { // do something }              

自定义

CoroutineExceptionHandler

调用的是如下方法:

public inline fun CoroutineExceptionHandler(crossinline handler: (CoroutineContext, Throwable) -> Unit): CoroutineExceptionHandler =
    object : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler {
        override fun handleException(context: CoroutineContext, exception: Throwable) =
            handler.invoke(context, exception)
    }

所以当发生异常时,会在

handleException()

中执行我们传入的

handler(CoroutineContext, Throwable)

方法。


CoroutineExceptionHandler使用示例

 val exceptionHandler =
                CoroutineExceptionHandler { context, throwable -> log("parent throwable:$throwable") }
 lifecycleScope.launch(exceptionHandler) {
     log("parent execute start")
     val childExHandler =
                    CoroutineExceptionHandler { context, throwable -> log("child throwable:$throwable") }
     //子协程中如果使用SupervisorJob()、Job(),则异常不会往上传播;否则异常会在顶层协程中处理
     val childJob = launch(childExHandler) {
         delay(1000)
         log("child execute")
         throw  IllegalArgumentException("error occur")
     }
     childJob.join()
     log("parent execute end")
 }

执行结果:

2022-09-04 00:38:51.792 15415-15415/ E/TTT: parent execute start
2022-09-04 00:38:52.796 15415-15415/ E/TTT: child execute
2022-09-04 00:38:52.805 15415-15415/ E/TTT: parent throwable:java.lang.IllegalArgumentException: error occur

可见虽然在子协程中也自定义了

CoroutineExceptionHandler

,但是最终子协程中的异常还是在顶层的父协程种处理的,如果就想在子协程中处理异常呢?可以在子协程中加上

SupervisorJob()、Job()

,则异常会在子协程中处理而不会往上传播:

 //其他内容不变
 val childJob = launch(childExHandler + SupervisorJob()) {
     //其他内容不变
 }

执行结果:

2022-09-04 00:42:48.465 15755-15755/ E/TTT: parent execute start
2022-09-04 00:42:49.468 15755-15755/ E/TTT: child execute
2022-09-04 00:42:49.473 15755-15755/ E/TTT: child throwable:java.lang.IllegalArgumentException: error occur
2022-09-04 00:42:49.474 15755-15755/ E/TTT: parent execute end

可见异常最终是在子协程中处理的,且虽然子协程中发生了异常,父协程依然能执行完毕。




:如果启动协程的是

async

,则

CoroutineExceptionHandler

中的

handler

方法并不会马上执行,必须调用

deffered.await()

时才会执行。



CoroutineScope 协程作用域


CoroutineScope

会跟踪它使用

launch



async

创建的所有协程。

CoroutineScope

本身并不运行协程,但是通过

CoroutineScope

可以保证对协程进行统一管理,避免发生内存泄漏等,

常见的

CoroutineScope


  • GlobalScope

    : 全局协程作用域,如果不主动通过

    job.cancel()

    关闭,其生命周期与

    Application

    一致。

  • MainScope

    :

    MainScope

    的内部:

    public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

    ,可以看到

    MainScope

    其实是通过

    SupervisorJob() + Dispatchers.Main

    自定义的协程作用域,其内部运行在

    UI线程

    且内部异常不会往上传播。

  • runBlocking

    : 顶层函数,和

    GlobalScope

    不一样,它会阻塞当前线程直到其内部所有子协程执行结束;

  • ktx扩展库中的自定义GlobalScope

    :典型应用就是在

    LifecycleOwner(androidX中的FragmentActivity、Fragment都实现了该接口)

    使用的

    lifecycleScope

    ,以及

    ViewModel

    中使用的

    ViewModelScope

    ,其内部可以在合适的时机自动关闭协程,从而避免内存泄露的发生。



参考

【1】

官网:利用 Kotlin 协程提升应用性能


【2】

官网:Android 上的 Kotlin 协程


【3】

Kotlin挂起函数原理


【4】

揭秘Kotlin协程中的CoroutineContext



版权声明:本文为u013700502原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。