kotlin coroutine源码解析之suspend挂起函数原理

  • Post author:
  • Post category:其他




suspend挂起函数

在idea中写某些协程函数的时候,会有一个绿色箭头图标的出现,如下图:

在这里插入图片描述

而且这些方法不放在协程里面写的话,idea编辑器还会报错,如下图:

在这里插入图片描述

上面所说的这些方法就是挂起函数,挂起函数必须要在协程中调用,或者在挂起函数中调用;放在挂起函数中调用挂起函数调用,那么说明还是间接在协程中被调用,也就是挂起函数调用需要协程环境。

在说挂起函数原理之前,先复习下

launch启动过程

所说的三类continuation:

  1. DispatchedContinuation用于分发continuation到指定的线程池中;
  2. ContinuationImpl用于包装launch的lambda代码块作为业务代码代理类;
  3. StandAloneCoroutine协程管理类管理Job生命周期以及协程的状态父子Job关系维护等等。



join原理

启动一个launch的时候,返回一个Job对象,这个Job对象对应着上面三个continuation实例,其中我们的业务代码在ContinuationImpl中,例如下面的代码:

  val myScope = CoroutineScope(CoroutineName("name") + IO + Job())

  val job = myScope.launch(CoroutineName("job")) {

      var job2 : Job? = launch(CoroutineName("job2")) {
          Thread.sleep(20000)
      }

      job2?.join()

      printlnM("job has run to end")
  }

  printlnM("job has launch")

这段代码编译后的代码如下:

BuildersKt.launch$default(
   myScope,
   (CoroutineContext)(new CoroutineName("job")),
   (CoroutineStart)null,
   (Function2)(new Function2((Continuation)null) {
       // $FF: synthetic field
       private Object L$0;
       int label;

       public final Object invokeSuspend(@NotNull Object $result) {
           Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
           switch(this.label) {
               case 0:
                   ResultKt.throwOnFailure($result);
                   CoroutineScope $this$launch = (CoroutineScope)this.L$0;
                   //创建job2
                   Job job2 = BuildersKt.launch$default(/*省略。。。省略部分后面贴出*/)
                   
                   this.label = 1;
                   //调用join方法
                   if (job2.join(this) == var5) {
                       return var5;
                   }
                   break;

               case 1:
                   ResultKt.throwOnFailure($result);
                   break;

               default:
                   throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
           }

           MainActivity.Companion.printlnM("job has run to end");
           return Unit.INSTANCE;
       }
	    //省略。。。
   }), 
   2, (Object)null);


Job job2 = BuildersKt.launch$default(/*省略。。。省略部分后面贴出*/)

job2的省略代码后面贴出,不影响代码流程。

可以看到,我们的业务代码被编译到invokeSuspend()函数中了,是通过switch case语句来运行对应的代码逻辑,

第一步:

case 0

执行的是

Job job2 = BuildersKt.launch$default(/*省略。。。*/)

也就是逻辑代码里面的创建

job2

那句代码,然后

this.label = 1;

,label设置为1,接着调用

job2.join()

方法,join方法一般都是返回

COROUTINE_SUSPENDED

的,那么在这里就返回结束了,除了立马就有返回值的挂起函数返回值,这个if判断就不成立了,直接走下面第三步;

第二步:label为1了,那么

case 1

的情况成立,入参设置一下异常,这个入参是上一步传递进来的,没有异常的话这里是空的,break;

第三步:调用

printlnM("job has run to end");

打印语句。

我们可以看到,遇到有函数返回COROUTINE_SUSPENDED的,那么invokeSuspend函数会立刻return结束掉。按照之前

launch启动过程

分析的,launch的lambda逻辑代码被调用的调用链是:

DispatchedContinuation -> ContinuationImpl(内部调用了invokeSuspend) -> StandAloneCoroutine

如果invokeSuspend函数因为返回了COROUTINE_SUSPENDED直接结束掉的话,岂不是这个Job2协程的代码还没有跑完,就结束了?显然是不行的,仔细观察发现,只需要多次调用invokeSuspend方法,label会随着每次调用都会递增或者变动,那么对应的case一定会让所有的代码都执行一遍的。

可以把这种switch case语句当成是人们常说的

状态机模式

,在安卓开发中,常用的handler.sendMessage就很类似状态机模式,根据不同的what去处理不同的逻辑,只是协程会将顺序代码,根据case分成一个接着一个连续的代码段。

这里我们直接去跟踪

join

源码看是怎么实现的:

job2.join(this)

public final override suspend fun join() {
    if (!joinInternal()) { // fast-path no wait
        coroutineContext.checkCompletion()
        return // do not suspend
    }
    return joinSuspend() // slow-path wait
}

private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
    // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
    cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
}

主要看

joinSuspend

函数,里面有几个重要的点:


  1. suspendCancellableCoroutine<Unit> { cont ->

    让当前协程在调用该函数处挂起,给当前协程的lambda代码块提供一个CancellableContinuation。
    suspendCoroutineUninterceptedOrReturn { uCont ->
        val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
        /*
         * For non-atomic cancellation we setup parent-child relationship immediately
         * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
         * properly supports cancellation.
         */
        cancellable.initCancellability()
        block(cancellable)
        cancellable.getResult()
    }

首先创建一个CancellableContinuationImpl类型的continuation,入参是uCont.intercepted(),这个uCont是调用的协程,那么就是

val job

这个协程,uCont.intercepted()也就是

launch启动过程

那章分析的DispatchedContinuation对象,由于

val job = myScope.launch(CoroutineName("job"))

这句话已经将DispatchedContinuation生成了,所有会复用之前的对象,代码如下:

    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

那么cancellable对象持有的就是

val job

的DispatchedContinuation,

接着在调用

cancellable.initCancellability()

private fun setupCancellation() {
    //省略
    val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent
    parent.start() // make sure the parent is started
    val handle = parent.invokeOnCompletion(
        onCancelling = true,
        handler = ChildContinuation(parent, this).asHandler
    )
    parentHandle = handle
	//省略
}

看代码,

val handle = parent.invokeOnCompletion

之前分析过,是parent和当前job组成父子关系的作用,parent代表

val job

,当前job就是

cancellable

对象,意思就是父子Job可以相互取消对方,和之前父子Job关联的分析是差不多的,那么

val job



cancellable

组成了父子关系。

在调用

block(cancellable)

,这个就进入下面2了。


  1. invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler)

    ,这个方法是将入参ResumeOnCompletion和当前的JobSupport关联起来,加入到当前Jobsupport.state.list中,asHandler返回一个DisposableHandler然后将这个handler加入到cont的state.list中去,cont也是上面1中变量

    cancellable

    ,是CancellableContinuationImpl类型的继承自continuation,这样

    val Job2

    和新创建的

    cancellable

    也组成了父子关系了。

  2. ResumeOnCompletion类型,传入的this,和cont参数,用来恢复cont协程的作用:

private class ResumeOnCompletion(
    job: Job,
    private val continuation: Continuation<Unit>
) : JobNode<Job>(job)  {
    override fun invoke(cause: Throwable?) = continuation.resume(Unit)
    override fun toString() = "ResumeOnCompletion[$continuation]"
}

调用了

invoke -> continuation.resume(Unit)

,这样continuation就会继续执行了。

经过上面的分析之后,可以看出整个Job树的结构如下:

在这里插入图片描述

看图:其中cancellable是可以被job取消的,取消之后,会将cancellable移除掉,cancellable被取消后,会遍历自己的state.list列表调用invoke方法,那么就会调用DIsposableHandle的invoke方法,将自己从job2中移除掉,cancellable就失去了作用。

结合一下job2的被编译后的代码,Job2编译后的代码如下:

BuildersKt.launch$default(
	$this$launch, 
	(CoroutineContext)(new CoroutineName("job2")),
 	(CoroutineStart)null, 
	(Function2)(new Function2((Continuation)null) {
      int label;
      @Nullable
      public final Object invokeSuspend(@NotNull Object var1) {
          Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
          switch(this.label) {
              case 0:
                  ResultKt.throwOnFailure(var1);
                  Thread.sleep(1000L);
                  return Unit.INSTANCE;
              default:
                  throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
          }
      }
	  //省略。。。
  }), 
  2, (Object)null);

invokeSuspend方法被调用后,休眠了1s中,之后结束了,就会继续调用completion的resume方法,这样就会调用到job2对应的

AbstractCoroutine.resumeWith -> makeCompletingOnce -> 
notifyHandlers(list: NodeList, cause: Throwable?) -> 
(ResumeOnCompletion) node.invoke(cause) -> continuation.resume(Unit) ->

进入到CancellableContinuationImpl类中,

CancellableContinuationImpl.resumeWith -> resumeImpl -> 
dispatchResume -> dispatch -> dispatcher.dispatch(context, this)

上面已经分析了

dispatcher.dispatch(context, this)

dispatcher这个对象是根

val job

的dispatcher,在

val job

laucnh的时候已经创建好了的,此时正在挂起中,所以这里调用dispatch就有下面的调用链:

BaseContinuationImpl..resumeWith() ->   val outcome = invokeSuspend(param)  ->
completion.resumeWith(outcome)

job之前在挂起点结束了,现在又再一次被CancellableContinuationImpl类型的continuation调用,那么job就从这个挂起点被唤醒了。这样就从case 0,运行到case 1了,然后job就结束了。



await原理

将上面代码的Join换成await方法,代码如下:

  val myScope = CoroutineScope(CoroutineName("name") + IO + Job())

  val job = myScope.launch(CoroutineName("job")) {

       var deferred = async (CoroutineName("job2")) {
          Thread.sleep(20000)
      }

      job2?.await()

      printlnM("job has run to end")
  }

  printlnM("job has launch")

原理其实和Join方法差不多,流程是一样的,只是其中的某些节点的类型不一样,代码如下:

private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
    val cont = AwaitContinuation(uCont.intercepted(), this)
    
    cont.initCancellability()
 	cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))
    cont.getResult()
}
  1. 创建一个AwaitContinuation类型的continuation,
  2. 然后调用initCancellability方法,这个在上一节讲过,是cont节点和自己的父节点产生父子关系关联,这个父节点也就是uCont,intercepted() 对象,也就是

    val job

    的协程,
  3. 接着
cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))

这句话在join方法里面看到了,是job2和AwaitContinuation产生父子关系的功能。

那么我们看下ResumeAwaitOnCompletion类型的节点是怎么实现的吧,

private class ResumeAwaitOnCompletion<T>(
    private val continuation: CancellableContinuationImpl<T>
) : JobNode() {
    override fun invoke(cause: Throwable?) {
        val state = job.state
        if (state is CompletedExceptionally) {
            continuation.resumeWithException(state.cause)
        } else {
            continuation.resume(state.unboxState() as T)
        }
    }
}

和ResumeOnCompletion节点其实差不多,只不过调用resume的时候参数值是await的返回值,所以在挂起点恢复的时候,还带有挂起函数执行完成的返回值;

而且在出现异常的时候,还可以将异常抛出去给父Job处理,这一点好像比Job功能更完善。

可以画个图描述一下await的Job树:

在这里插入图片描述

job2是通过AwaitContinuation完成挂起点恢复的,这个类也是继承子 CancellableContinuationImpl的,只是覆盖

fun getContinuationCancellationCause(parent: Job): Throwable

用于获取异常信息的接口。

其他流程完全和join方法一模一样、



delay函数

跟踪一下delay函数:

delay(1000)
    
public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}

internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay

和join,await差不多,首先利用delay调用处,父协程作用域的continuation,创建一个CancellableContinuation,让cancellable节点和父协程产生父子关联,该处返回的挂起值

COROUTINE_SUSPENDED

让父协程挂起,接着调用cancellable的上下文集合context.

delay.scheduleResumeAfterDelay

方法,该context是父协程上下文集合,意思就是delay对象是父协程的dispatcher元素,这个元素的成员函数scheduleResumeAfterDelay实现的delay功能。

为了简单理解,展示下HandlerDIspatcher的实现:

//HandlerDIspatcher
 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
     val block = Runnable {
         with(continuation) { resumeUndispatched(Unit) }
     }
     handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
     continuation.invokeOnCancellation { handler.removeCallbacks(block) }
 }

就是使用线程自带的Handler延迟发送一个消息,这个消息的runnable的实现是,调用

CancellableContinuation



resumeUndispatched

方法,跟踪之后,调用链条是:

resumeUndispatched -> resumeImpl -> dispatchResume -> dispatch -> resumeUnconfined -> 
后面调用父协程的invokeSuspend方法了,同join方法是一样的。

delay的实质,就是父协程挂起后,延迟delay的一段时间(延迟这段时间,是任务加入队列,等待指定的时间后,线程取出任务然后执行,所以不存在阻塞线程的问题),使用父协程的dispatcher恢复父协程继续执行,有如下图所示:

在这里插入图片描述



suspend函数

通过上面的Join和Await的分析,感觉挂起协程之后,似乎需要恢复协程才可以让协程执行完成未完成的代码的。如果Join和Await方法不创建CancellableContinuationImpl这个continuation节点的话,其实

val job

协程挂起后就结束了,剩下的代码是不会完成,后面的打印语句是不会执行的。这个直觉是正确的,Suspend函数就是有这个作用的,挂起协程之后,需要逻辑代码在挂起函数内部,主动去调用resume和resumeWithException()方法,用于恢复协程。

suspend函数挂起代码实例:

fun SuspendFunc() {
    val myScope = CoroutineScope(Dispatchers.IO)

    myScope.launch {
        printlnM("suspend func run before")

        testRemoteRequest()

        printlnM("suspend func run after")
    }
}

suspend fun testRemoteRequest():String {
    return suspendCancellableCoroutine { cancellableContinuation ->
        getUserInfo(object : Callback {
            override fun success() {
                printlnM("成功请求用户信息")
                cancellableContinuation.resume("成功")
            }
            override fun failed() {
                printlnM("请求用户信息失败")
                cancellableContinuation.resumeWithException(Exception("失败"))
            }
        })
    }
}

interface Callback{
    fun success()
    fun failed()
}

fun getUserInfo(callback: Callback?) {
    try {
        printlnM("getUserInfo")
        Thread.sleep(3000)
        callback?.success()
    } catch (e:Exception) {
        callback?.failed()
        e.printStackTrace()
    }
}

调用挂起函数,创建一个挂起点

suspendCancellableCoroutine

,我们的逻辑代码写在

suspendCancellableCoroutine

的lambda代码块中,内部调用模拟的网络请求,三秒后,主动调用resume方法,让挂起协程恢复执行。

打印结果如下:

2022-11-17 02:59:44.336  E/MainActivity: DefaultDispatcher-worker-1 : suspend func run before
2022-11-17 02:59:44.337  E/MainActivity: DefaultDispatcher-worker-1 : getUserInfo
2022-11-17 02:59:47.338  E/MainActivity: DefaultDispatcher-worker-1 : 成功请求用户信息
2022-11-17 02:59:47.339  E/MainActivity: DefaultDispatcher-worker-1 : suspend func run after

可以看到协程运行到getUserInfo,中间过了三秒后才继续执行,说明Job被挂起后恢复执行了。

现在修改代码,让网络请求失败,看看异常是谁来处理的:

val myScope = CoroutineScope(Dispatchers.IO + CoroutineExceptionHandler { c,e ->
    printlnM("scopeExceptionHandler : " + e.message)
})

getUserInfo(object : Callback {
    override fun success() {
        printlnM("成功请求用户信息")
        cancellableContinuation.resumeWithException(Exception("失败"))
    }
    override fun failed() {
        printlnM("请求用户信息失败")
        cancellableContinuation.resumeWithException(Exception("失败"))
    }
})

打印日志如下:

2022-11-17 03:10:07.952  E/MainActivity: DefaultDispatcher-worker-1 : suspend func run before
2022-11-17 03:10:07.953  E/MainActivity: DefaultDispatcher-worker-1 : getUserInfo
2022-11-17 03:10:10.954  E/MainActivity: DefaultDispatcher-worker-1 : 成功请求用户信息
2022-11-17 03:10:10.955  E/MainActivity: DefaultDispatcher-worker-1 : scopeExceptionHandler : 失败

可以看到,协程挂起后,三秒后,网路请求失败,挂起点抛出的异常,被根部的CoroutineExceptionHandler处理了,也就是说挂起函数的异常,也是遵循

Job异常处理

所说的链路传播。

根据上面的分析,可以给挂起流程画个时序图,用于展示线程是怎么切换的,怎么恢复协程的,如下图所示:

在这里插入图片描述

其中带颜色的方框,代表的是DisptacherContinuation,它持有的线程池执行它持有的的协程,所以可以知道挂起点恢复之后,协程所执行的线程池是保持一致的,对于不同的协程之间,由它继承的Disptahcer决定,或者通过launch传递disptahcer参数覆盖,这样就可以修改协程运行所在的线程了。



总结


1. 父JobA的lambda表达式中有挂起函数,协程会在父JobA的挂起点处创建一个CancellableContinuationImpl类型的continuation,这个Cancellable会和父JobA进行父子关联;

如果挂起函数本身是某个JobB的挂起函数,那么Cancellable还会和JobB组成父子关系,JobB在结束自己的时候,会通知Cancellable自己完成了,Cancellable又会继续通知JobA继续执行lambda的代码块,这样JobA就从挂起点恢复过来了。

如果挂起函数是由suspendCancellableCoroutine函数完成的,那么需要在lambda代码块中,收到调用resume函数去主动唤起协程。


2. suspendCancellableCoroutine是挂起函数的重点,这个函数才是挂起函数的实现成为可能。


3. 由于JobA恢复执行的dispatcher不变,所以JobA的lambda代码在挂起点前后所执行的线程池是一样的,如果是单线程的话,那么线程前后是一样的。而挂起点continuation本身的逻辑代码执行线程由挂起点自己决定。



版权声明:本文为u012345683原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。