我正在为一个流创建一个'take untilsignal'运算符--一个扩展方法,当另一个流生成输出时,它会取消一个流。
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T>
我最初的努力是尝试在与主要流收集相同的协同范围内启动信号流的收集,并取消协同范围:
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> = flow {
kotlinx.coroutines.withContext(coroutineContext) {
launch {
signal.take(1).collect()
println("signalled")
cancel()
}
collect {
emit(it)
}
}
}
编辑:我拼凑了以下这些讨厌的东西,它不太符合定义(结果流只会在第一次从主流发射后取消),我觉得有一个更好的方法:
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> =
combine(
signal.map { it as Any? }.onStart { emit(null) }
) { x, y -> x to y }
.takeWhile { it.second == null }
.map { it.first }
edit2再试一次,使用ChannelFlow:
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> =
channelFlow {
launch {
signal.take(1).collect()
println("hello!")
close()
}
collect { send(it) }
close()
}
使用couroutineScope
并在内部启动新的coroutine:
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> = flow {
try {
coroutineScope {
launch {
signal.take(1).collect()
println("signalled")
this@coroutineScope.cancel()
}
collect {
emit(it)
}
}
} catch (e: CancellationException) {
//ignore
}
}
试图从其他人那里得到答案 假设我有一个简单的控制器,如下所示: 我希望处理忽略“取消”信号,这通常在客户端断开连接时出现。试图通过nginx的
我想测试我的ViewModel的一个收集流的方法。在收集器内部,一个LiveData对象发生了变化,我希望最后检查它。以下是设置的大致情况: 当我现在在单元测试中调用方法时,测试会在收集流之前完成。这是测试可能会出现的情况: 我正在通过这个Junit5扩展使用TestCoroutineDispatcher,还使用LiveData的即时执行器扩展:
我在转换器上创建了一个定时标点器,并将其定期运行(使用kafka v2.1.0)。每次我接受一个特定的密钥时,我都会创建一个这样的新密钥 我的问题是,我创建的所有这些标点符号都经常运行,我找不到取消它们的方法。我在互联网上找到了一个片段来使用 但不幸的是,这似乎只取消了最新创建的标点符号。 我编辑我的帖子只是为了进一步了解我的方法,以及这与沃兹尼亚的评论之间的关系。所以我的方法非常类似,只是使用一
奇怪的问题 我试图关闭我的Java应用程序正确接收信号,要么通过杀死手动发送。我试图杀死SIGTERM、SIGHUP、SIGINT等。每次JVM停止而不调用运行时关闭钩子时,最终阻止或信号陷阱在Java代码中创建。 并在shutdownHook不起作用时添加了处理程序 我在Ubuntu 12.04上运行,Java版本为“1.6.0_24” OpenJDK运行时环境(IcedTea6 1.11.5)
有几个消费者线程在等待由单个生产者线程提供的异步到达的数据的信号量。如果消费者已经获取了信号量,生产者如何获取信号量来通知他们呢?在这里,它被阻塞了,不能执行notifyAll() 当然,删除“synchronized”会导致 同样,生产者线程被阻塞,无法获取信号量来通知消费者。 更新:我应该提到,这在第二次数据准备就绪时失败。第一次数据准备就绪时,生产者没有阻塞。下一次产生数据时,生产者阻塞了。
我正在尝试用kafka绑定构建一个简单的云流应用程序。让我描述一下设置。1、我有一位制作人正在制作主题1 2。有一个流活页夹,经过一些处理后将主题1绑定到主题2。 我是这些技术的新手。无法弄清楚这里出了什么问题。有人能帮忙吗?