当前位置: 首页 > 知识库问答 >
问题:

如何将流动转换为流动?

孙梓
2023-03-14

我刚刚补充道

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.3.3"

对这个项目。我有<code>suspend fun foo():Flow

我需要获得可流动


共有1个答案

祁坚壁
2023-03-14

你必须偷偷取出返回的< code>Foo

// SomeSuspendAPI.kt
// -----------------

// the method to convert
suspend fun <T> Flow<T>.foo() : Flow<Int> {
    return flow { emit(0) }
}

@ExperimentalCoroutinesApi
fun <T> Flow<T>.fooRx() : CompletableFuture<Flowable<Int>> {
    val self = this
    val future = CompletableFuture<Flowable<Int>>()
    GlobalScope.launch {
        try {
            future.complete(self.foo().asFlowable())
        } catch (ex: Throwable) {
            future.completeExceptionally(ex);
        }
    }
    return future
}

// Demo purposes
fun <T> just(v: T) = flow { emit(v) }

然后,您可以在 Java 中使用它:

public class UseFoo {
    public static void main(String[] args) throws Exception {
        SomeSuspendAPIKt.fooRx(
                SomeSuspendAPIKt.just(1)
        )
        .thenAccept(flowable -> flowable.subscribe(System.out::println))
        .join();
    }
}

编辑 1:

当然,您可以将一些代码移回kotlin端:

fun <T> Flow<T>.fooRx2() : Flowable<Int> {
    val self = this
    val subject = SingleSubject.create<Flowable<Int>>()
    GlobalScope.launch {
        try {
            subject.onSuccess(self.foo().asFlowable())
        } catch (ex: Throwable) {
            subject.onError(ex)
        }
    }
    return subject.flatMapPublisher { it }
}

然后

public class UseFoo {
    public static void main(String[] args) throws Exception {
        SomeSuspendAPIKt.fooRx2(SomeSuspendAPIKt.just(1))
                .blockingSubscribe(System.out::println);
    }
}

编辑2:

您可以通过在 Kotlin 端使用变换来概括这一点,该变换会为您提供一个要传递的延续对象:

fun <T, R: Any> Flow<T>.transformAsync(fn: suspend (t: Flow<T>) -> Flow<R>) : Flowable<R> {
    val self = this
    val subject = SingleSubject.create<Flowable<R>>()
    GlobalScope.launch {
        try {
            val r = fn(self).asFlowable();
            subject.onSuccess(r)
        } catch (ex: Throwable) {
            subject.onError(ex)
        }
    }
    return subject.flatMapPublisher { it }
}
public class UseFoo {
    public static void main(String[] args) throws Exception {

        SomeSuspendAPIKt.transformAsync(
                SomeSuspendAPIKt.just(1),
                (source, cont) -> SomeSuspendAPIKt.foo(source, cont)
        )
        .blockingSubscribe(System.out::println);
    }
}
 类似资料:
  • 我想用可选的。由于只能连接流,我有以下问题: 如何将可选 转换为流 ? 示例:

  • 我在处理流时遇到了一些麻烦。本质上,我有一个

  • 问题内容: //或将多部分文件保存到数据库的任何其他解决方案。我尝试用这种方式,但出现错误。 问题答案:

  • 问题内容: 我有一个第三方图书馆给我一个图书馆。我想像Java 8那样懒惰地使用该枚举,并调用诸如此类的东西。 有没有现成的图书馆?我已经在引用Guava和Apache Commons,所以如果其中任何一个都有理想的解决方案。 另外,在保留所有内容的懒惰性质的同时将a 变成最佳/最简单的方法是什么? 问题答案: 这个答案已经提供了一个解决方案,可以解决以下问题: 应当强调的是,由此而来 的 懒任何

  • 我已经了解了一些关于流的知识,并且知道它们可以用来代替循环。对于这个玩具示例,我使用一个图形数据库来存储一组字符串。数据库将它们存储为顶点。我想检索这些顶点,并将它们转换为字符串,而是使用流。每个顶点都有一组性质;我给它一个键,它返回一个值。如果一个顶点具有我正在寻找的属性,我将它添加到列表中。如果没有,我存储顶点ID。 我有一个for循环,但我不确定如何使用流来代替。代码如下:

  • 我正在寻找一种简洁的方法来将转换为或者更具体地说,将迭代器作为流“查看”。 出于性能原因,我希望避免在新列表中出现迭代器的副本: 基于评论中的一些建议,我还尝试使用: 但是,我得到一个(因为没有调用) 我查看了和,但没有找到任何东西。