当前位置: 首页 > 工具软件 > collect > 使用案例 >

flow函数和collect函数浅析

翟学文
2023-12-01

flow方法:


public fun <T> flow(@BuilderInference block:suspend FlowCollector<T>.() -> Unit):
Flow<T> = SafeFlow(block)
public interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

会将flow传入的方法封装成一个FlowCollector的扩展函数,因此在flow代码块中使用emit是自然地。

第二个代码块中观察到FlowCollector的泛型是通过emit来推导的,这也就是为什么emit方法传入不同的类型flow所构造的FlowCollector的类型也不同。

总结:flow方法只是将传入的方法扩展成了一个FlowCollector的扩展函数,并可共享已有的emit方法。

collect方法:

public suspend inline fun <T> Flow<T>.collect(crossinline action: 
                suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

可以看到调用Flow的扩展函数collect时会手动构建一个FlowCollector,并重写emit方法的逻辑为执行collect中的代码块。

接下来看Flow的collect成员方法接收到FlowCollector对象后做什么处理。

public interface Flow<out T> {
    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}

收集collect的具体行为默认是通过具体的flow构建时构造出来的。如默认上文构造出来的是SafeFlow,collect收集行为被封装在AbstractFlow中。

先看下AbstractFlow:

 abstract class AbstractFlow<T> : Flow<T> {

    public final override suspend fun collect(collector: FlowCollector<T>) {
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }
    
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)

可以看到构建了一个SafeCollector调用collectSafely将改参数传进去。

SafeCollector会保存协程上下文(为了之后防止再次创建续体导致的浪费)和collect方法传进来的FlowCollector。

重写collectSafely方法的类也就是最上面的SafeFlow做的事情:

SafeFlow:

// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

可以看到最后是用SafeCollector调用的flow里面的代码块

注:这个collector就是collect中创建的SafeCollector而block方法则是之前flow中创建的扩展函数(也就是flow代码块)

所以这就是为什么是冷流的原因,只有调用collect才会构建这个SafeCollector对象并调用flow传进来的方法(flow代码块会添加到FlowCollector的扩展函数中,为了之后SafaCollector调用block)

到此flow代码块开始运行了,flow中的调用者this即为collect中创建的SafeCollector对象

SafeCollector中的emit方法

接下来看看SafeCollector中的emit方法:

 override suspend fun emit(value: T) {
 //该方法中会获取当前协程的续体,当执行不需要挂起时(不返回SUSPEND关键字),
 //会直接运行resumeWith并且不会执行拦截器,这也是为什么不能够在
 //flow中切换上下文的原因,不会执行intercept切换线程
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                //调用第二个emit方法
                emit(uCont, value)
            } catch (e: Throwable) {
                // Save the fact that exception from emit (or even check context) has been thrown
                lastEmissionContext = DownstreamExceptionElement(e)
                //emit
                throw e
            }
        }
    }
     
//带续体参数的emit方法:
    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        val currentContext = uCont.context
        currentContext.ensureActive()
        // This check is triggered once per flow on happy path.
        val previousContext = lastEmissionContext
        if (previousContext !== currentContext) {
            checkContext(currentContext, previousContext, value)
        }
        completion = uCont
        //最终会调用到emitfun中。 
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }
    

再来看下emitfun

读者先记下这三个变量具体含义:

collect为FlowCollector对象(该对象的emit方法实现是执行collect代码块),value为emit的参数,最后一个代表续体

private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

这里科普下Kotlin中的::语法 和 表达式

Class::函数名/属性名 代表获取这个class的函数引用和属性引用。属性引用还好理解可以针对这个属性进行指定赋值/取值操作(静态属性不需要具体对象,成员属性需要具体的对象)。函数引用代表持有了这个方法的引用可以调用这个函数,由于是函数不区分对象。

=表达式 对应于上面的表达式则为emitfun返回了一个经强转后的方法。也就是emitfun返回的是一个方法,当调用表达式时该方法的invoke会自动执行(后面就会看到了)

可以看到将这个FlowCollector<Any?>的emit方法强转为了Function3<FlowCollector<Any?>, Any?, Continuation, Any?>方法,最直观的感受就是将原来只接受一个value的emit方法强转成了一个FlowCollector,value,和续体的方法。

看下反编译代码

   private static final Function3 emitFun = (Function3)TypeIntrinsics.beforeCheckcastToFunctionOfArity(new Function3() {
      // $FF: synthetic method
      // $FF: bridge method
      public Object invoke(Object var1, Object var2, Object var3) {
         return this.invoke((FlowCollector)var1, var2, (Continuation)var3);
      }

      @Nullable
      public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
         InlineMarker.mark(0);
         Object var10000 = p1.emit(p2, continuation);
         InlineMarker.mark(2);
         InlineMarker.mark(1);
         return var10000;
      }

      public final KDeclarationContainer getOwner() {
         return Reflection.getOrCreateKotlinClass(FlowCollector.class);
      }

      public final String getName() {
         return "emit";
      }

      public final String getSignature() {
         return "emit(Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;";
      }
   }, 3);

讲到这里读者应该已经猜到了

参数讲解:

1.p1:对应于调用collect方法所构建的FlowCollector对象,重写emit方法执行collect代码块内容

2.p2:emit方法的参数,对应于发送的值

3.continuation:当前协程的续体

流程讲解:

1.当调用emifun表达式时,表达式所构建的Function3方法的invoke方法将会被调用

2.会调用到emit方法,该方法最终会调用collect代码块的内容也就是action方法,并把emit的参数传入。

至此,emit每调用一次,都会执行一次collect方法。

总结

flow方法的主要作用是将传入的方法参数变成FlowCollector的扩展函数。

collect方法首先会创建一个FlowCollector对象并重写其emit方法的逻辑用于执行传入的方法参数;

接着还会创建一个SafaCollector对象保存刚创建的重写emit方法的FlowCollector对象;

由于flow中的代码块其实是FlowCollector的扩展函数,所以会利用SafaCollector去调用扩展函数,从这里面可以得出一个结论flow中的this即为SafaCollector对象。

emit方法在SafaCollector中最终会执行collect函数

 类似资料: