public fun <T> flow(@BuilderInference block:suspend FlowCollector<T>.() -> Unit):
Flow<T> = SafeFlow(block)
public interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
会将flow传入的方法封装成一个FlowCollector的扩展函数,因此在flow代码块中使用emit是自然地。
第二个代码块中观察到FlowCollector的泛型是通过emit来推导的,这也就是为什么emit方法传入不同的类型flow所构造的FlowCollector的类型也不同。
总结:flow方法只是将传入的方法扩展成了一个FlowCollector的扩展函数,并可共享已有的emit方法。
public suspend inline fun <T> Flow<T>.collect(crossinline action:
suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
可以看到调用Flow的扩展函数collect时会手动构建一个FlowCollector,并重写emit方法的逻辑为执行collect中的代码块。
接下来看Flow的collect成员方法接收到FlowCollector对象后做什么处理。
public interface Flow<out T> {
@InternalCoroutinesApi
public suspend fun collect(collector: FlowCollector<T>)
}
收集collect的具体行为默认是通过具体的flow构建时构造出来的。如默认上文构造出来的是SafeFlow,collect收集行为被封装在AbstractFlow中。
先看下AbstractFlow:
abstract class AbstractFlow<T> : Flow<T> {
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
可以看到构建了一个SafeCollector调用collectSafely将改参数传进去。
SafeCollector会保存协程上下文(为了之后防止再次创建续体导致的浪费)和collect方法传进来的FlowCollector。
重写collectSafely方法的类也就是最上面的SafeFlow做的事情:
SafeFlow:
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
可以看到最后是用SafeCollector调用的flow里面的代码块
注:这个collector就是collect中创建的SafeCollector而block方法则是之前flow中创建的扩展函数(也就是flow代码块)。
所以这就是为什么是冷流的原因,只有调用collect才会构建这个SafeCollector对象并调用flow传进来的方法(flow代码块会添加到FlowCollector的扩展函数中,为了之后SafaCollector调用block)
到此flow代码块开始运行了,flow中的调用者this即为collect中创建的SafeCollector对象
接下来看看SafeCollector中的emit方法:
override suspend fun emit(value: T) {
//该方法中会获取当前协程的续体,当执行不需要挂起时(不返回SUSPEND关键字),
//会直接运行resumeWith并且不会执行拦截器,这也是为什么不能够在
//flow中切换上下文的原因,不会执行intercept切换线程
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
try {
//调用第二个emit方法
emit(uCont, value)
} catch (e: Throwable) {
// Save the fact that exception from emit (or even check context) has been thrown
lastEmissionContext = DownstreamExceptionElement(e)
//emit
throw e
}
}
}
//带续体参数的emit方法:
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context
currentContext.ensureActive()
// This check is triggered once per flow on happy path.
val previousContext = lastEmissionContext
if (previousContext !== currentContext) {
checkContext(currentContext, previousContext, value)
}
completion = uCont
//最终会调用到emitfun中。
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}
再来看下emitfun:
读者先记下这三个变量具体含义:
collect为FlowCollector对象(该对象的emit方法实现是执行collect代码块),value为emit的参数,最后一个代表续体
private val emitFun =
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
这里科普下Kotlin中的::语法 和 表达式:
Class::函数名/属性名 代表获取这个class的函数引用和属性引用。属性引用还好理解可以针对这个属性进行指定赋值/取值操作(静态属性不需要具体对象,成员属性需要具体的对象)。函数引用代表持有了这个方法的引用可以调用这个函数,由于是函数不区分对象。
=表达式 对应于上面的表达式则为emitfun返回了一个经强转后的方法。也就是emitfun返回的是一个方法,当调用表达式时该方法的invoke会自动执行(后面就会看到了)
可以看到将这个FlowCollector<Any?>的emit方法强转为了Function3<FlowCollector<Any?>, Any?, Continuation, Any?>方法,最直观的感受就是将原来只接受一个value的emit方法强转成了一个FlowCollector,value,和续体的方法。
看下反编译代码
private static final Function3 emitFun = (Function3)TypeIntrinsics.beforeCheckcastToFunctionOfArity(new Function3() {
// $FF: synthetic method
// $FF: bridge method
public Object invoke(Object var1, Object var2, Object var3) {
return this.invoke((FlowCollector)var1, var2, (Continuation)var3);
}
@Nullable
public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
InlineMarker.mark(0);
Object var10000 = p1.emit(p2, continuation);
InlineMarker.mark(2);
InlineMarker.mark(1);
return var10000;
}
public final KDeclarationContainer getOwner() {
return Reflection.getOrCreateKotlinClass(FlowCollector.class);
}
public final String getName() {
return "emit";
}
public final String getSignature() {
return "emit(Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;";
}
}, 3);
讲到这里读者应该已经猜到了
1.p1:对应于调用collect方法所构建的FlowCollector对象,重写emit方法执行collect代码块内容
2.p2:emit方法的参数,对应于发送的值
3.continuation:当前协程的续体
1.当调用emifun表达式时,表达式所构建的Function3方法的invoke方法将会被调用
2.会调用到emit方法,该方法最终会调用collect代码块的内容也就是action方法,并把emit的参数传入。
至此,emit每调用一次,都会执行一次collect方法。
flow方法的主要作用是将传入的方法参数变成FlowCollector的扩展函数。
collect方法首先会创建一个FlowCollector对象并重写其emit方法的逻辑用于执行传入的方法参数;
接着还会创建一个SafaCollector对象保存刚创建的重写emit方法的FlowCollector对象;
由于flow中的代码块其实是FlowCollector的扩展函数,所以会利用SafaCollector去调用扩展函数,从这里面可以得出一个结论flow中的this即为SafaCollector对象。
emit方法在SafaCollector中最终会执行collect函数