该函数的目标是创建一个周期性地发出值的流,直到遇到与谓词匹配的值。
下面是我提出的一些框架代码:
class Watcher<T : Any>(
/**
* Emits the data associated with the provided id
*/
private val callable: (id: String) -> T,
/**
* Checks if the provided value marks the observable as complete
*/
private val predicate: (id: String, value: T) -> Boolean
) {
private val watchPool: MutableMap<String, Observable<T>> = ConcurrentHashMap()
fun watch(id: String): Observable<T> {
// reuse obesrvable if exists
val existing = watchPool[id]
if (existing != null)
return existing
val value = callable(id)
if (predicate(id, value)) return Observable.just(value)
// create new observable to fetch until complete,
// then remove from the map once complete
val observable = Observable.fromCallable<T> {
callable(id)
}.repeatWhen { /* What to put here? */ }.doOnComplete {
watchPool.remove(id)
}.distinctUntilChanged()
watchPool[id] = observable
return observable
}
}
enum class Stage {
CREATED, PROCESSING, DELIVERING, FINISHED
}
和一些将检索正确阶段的callable,我应该能够传递callable和谓词检查是否stage==finished
,并轮询直到获得finished
事件。
我遇到的问题是,当收到的事件不是最终事件时,生成一个可观察的事件。在这种情况下,observable应该继续轮询事件,直到它接收到与谓词匹配的事件,或者直到它没有更多的订阅服务器。
这一点可以观察到:
最后,我只使用TakeToil
,并使用Observal的interval方法进行轮询。
abstract class RxWatcher<in T : Any, V : Any> {
/**
* Emits the data associated with the provided id
* At a reasonable point, emissions should return a value that returns true with [isCompleted]
* This method should be thread safe, and the output should not depend on the number of times this method is called
*/
abstract fun emit(id: T): V
/**
* Checks if the provided value marks the observable as complete
* Must be thread safe
*/
abstract fun isCompleted(id: T, value: V): Boolean
/**
* Polling interval in ms
*/
open val pollingInterval: Long = 1000
/**
* Duration between events in ms for which the observable should time out
* If this is less than or equal to [pollingInterval], it will be ignored
*/
open val timeoutDuration: Long = 5 * 60 * 1000
private val watchPool: MutableMap<T, Observable<V>> = ConcurrentHashMap()
/**
* Returns an observable that will emit items every [pollingInterval] ms until it [isCompleted]
*
* The observable will be reused if there is polling, so the frequency remains constant regardless of the number of
* subscribers
*/
fun watch(id: T): Observable<V> {
// reuse observable if exists
val existing = watchPool[id]
if (existing != null)
return existing
val value = emit(id)
if (isCompleted(id, value)) return Observable.just(value)
// create new observable to fetch until complete,
// then remove from the map once complete
val observable = Observable.interval(pollingInterval, TimeUnit.MILLISECONDS, Schedulers.io()).map {
emit(id)
}.takeUntil {
isCompleted(id, it)
}.doOnComplete {
watchPool.remove(id)
}.distinctUntilChanged().run {
if (timeoutDuration > pollingInterval) timeout(timeoutDuration, TimeUnit.MILLISECONDS)
else this
}
watchPool[id] = observable
return observable
}
/**
* Clears the observables from the watch pool
* Note that existing subscribers will not be affected
*/
fun clear() {
watchPool.clear()
}
}
我想在找到第一个值时终止流的执行。然而,当我运行下面的代码时,它显示,即使第一个方法中存在值,也会调用两个方法。 我登记了文件: 表示 如果存在值,则返回{@code true},否则返回{@code false}。 返回描述此流的第一个元素的{@link Optional},如果流为空,则返回空的{@code Optional}。如果流没有遭遇顺序,那么可以返回任何元素。 所以它满足了第一个条件
想改进这个问题吗?通过编辑这篇文章添加细节并澄清问题。 我所拥有的是, 我想要的是, 其中,
在 scala 中,您将如何编写一个函数来获取期货序列,运行所有函数,不断重试任何失败的函数,并返回结果? 例如,签名可以是: 加分为一个可配置的超时,在这一点上的功能失败,被调用者可以处理这种情况。< br >如果错误案例处理程序可以接收失败的期货列表,则奖励加分。 谢谢!
我是一名学习使用jsp和Servlet构建Web应用程序的学生。一个月以来,我的Web应用程序项目一直运行良好,但今天它的行为突然变得奇怪了。当我提交jsp页面时,它无法找到我的servlet。我已经使用servlet注释来映射请求。 以下是我的JSP:- 以下是我的servlet:- 以下是我的控制台日志:-
我在运行Java应用程序时遇到了一个小问题(当我通过Maven运行它时,它工作得很好)。我得到一个错误: 模块位于my文件夹中,我的文件夹结构如下: 对于- 对于我的类 奇怪的是,当我在文件它再次开始工作,但在几次运行后中断。 编辑: 将位置更改为修复了该问题,但是,它破坏了我的Maven install命令并引发以下错误:
我有一个像下面这样的Python脚本: 我以以下方式执行: 然后我检查是否有任何初始输出,但没有: 我至少希望在几秒钟后看到“开始”。即使等了几分钟,然后杀了它,我也没有输出。如果可能,我该如何解决这个问题?