当前位置: 首页 > 知识库问答 >
问题:

RxObservable,它一直重复到找到期望的值

云俊名
2023-03-14

该函数的目标是创建一个周期性地发出值的流,直到遇到与谓词匹配的值。

下面是我提出的一些框架代码:

class Watcher<T : Any>(
        /**
         * Emits the data associated with the provided id
         */
        private val callable: (id: String) -> T,
        /**
         * Checks if the provided value marks the observable as complete
         */
        private val predicate: (id: String, value: T) -> Boolean
) {

    private val watchPool: MutableMap<String, Observable<T>> = ConcurrentHashMap()

    fun watch(id: String): Observable<T> {
        // reuse obesrvable if exists
        val existing = watchPool[id]
        if (existing != null)
            return existing
        val value = callable(id)
        if (predicate(id, value)) return Observable.just(value)
        // create new observable to fetch until complete,
        // then remove from the map once complete
        val observable = Observable.fromCallable<T> {
            callable(id)
        }.repeatWhen { /* What to put here? */ }.doOnComplete {
            watchPool.remove(id)
        }.distinctUntilChanged()
        watchPool[id] = observable
        return observable
    }

}
enum class Stage {
    CREATED, PROCESSING, DELIVERING, FINISHED
}

和一些将检索正确阶段的callable,我应该能够传递callable和谓词检查是否stage==finished,并轮询直到获得finished事件。

我遇到的问题是,当收到的事件不是最终事件时,生成一个可观察的事件。在这种情况下,observable应该继续轮询事件,直到它接收到与谓词匹配的事件,或者直到它没有更多的订阅服务器。

这一点可以观察到:

    null

共有1个答案

戚阳曜
2023-03-14

最后,我只使用TakeToil,并使用Observal的interval方法进行轮询

abstract class RxWatcher<in T : Any, V : Any> {

    /**
     * Emits the data associated with the provided id
     * At a reasonable point, emissions should return a value that returns true with [isCompleted]
     * This method should be thread safe, and the output should not depend on the number of times this method is called
     */
    abstract fun emit(id: T): V

    /**
     * Checks if the provided value marks the observable as complete
     * Must be thread safe
     */
    abstract fun isCompleted(id: T, value: V): Boolean

    /**
     * Polling interval in ms
     */
    open val pollingInterval: Long = 1000

    /**
     * Duration between events in ms for which the observable should time out
     * If this is less than or equal to [pollingInterval], it will be ignored
     */
    open val timeoutDuration: Long = 5 * 60 * 1000

    private val watchPool: MutableMap<T, Observable<V>> = ConcurrentHashMap()

    /**
     * Returns an observable that will emit items every [pollingInterval] ms until it [isCompleted]
     *
     * The observable will be reused if there is polling, so the frequency remains constant regardless of the number of
     * subscribers
     */
    fun watch(id: T): Observable<V> {
        // reuse observable if exists
        val existing = watchPool[id]
        if (existing != null)
            return existing
        val value = emit(id)
        if (isCompleted(id, value)) return Observable.just(value)
        // create new observable to fetch until complete,
        // then remove from the map once complete
        val observable = Observable.interval(pollingInterval, TimeUnit.MILLISECONDS, Schedulers.io()).map {
            emit(id)
        }.takeUntil {
            isCompleted(id, it)
        }.doOnComplete {
            watchPool.remove(id)
        }.distinctUntilChanged().run {
            if (timeoutDuration > pollingInterval) timeout(timeoutDuration, TimeUnit.MILLISECONDS)
            else this
        }
        watchPool[id] = observable
        return observable
    }

    /**
     * Clears the observables from the watch pool
     * Note that existing subscribers will not be affected
     */
    fun clear() {
        watchPool.clear()
    }

}
 类似资料:
  • 我想在找到第一个值时终止流的执行。然而,当我运行下面的代码时,它显示,即使第一个方法中存在值,也会调用两个方法。 我登记了文件: 表示 如果存在值,则返回{@code true},否则返回{@code false}。 返回描述此流的第一个元素的{@link Optional},如果流为空,则返回空的{@code Optional}。如果流没有遭遇顺序,那么可以返回任何元素。 所以它满足了第一个条件

  • 想改进这个问题吗?通过编辑这篇文章添加细节并澄清问题。 我所拥有的是, 我想要的是, 其中,

  • 在 scala 中,您将如何编写一个函数来获取期货序列,运行所有函数,不断重试任何失败的函数,并返回结果? 例如,签名可以是: 加分为一个可配置的超时,在这一点上的功能失败,被调用者可以处理这种情况。< br >如果错误案例处理程序可以接收失败的期货列表,则奖励加分。 谢谢!

  • 我是一名学习使用jsp和Servlet构建Web应用程序的学生。一个月以来,我的Web应用程序项目一直运行良好,但今天它的行为突然变得奇怪了。当我提交jsp页面时,它无法找到我的servlet。我已经使用servlet注释来映射请求。 以下是我的JSP:- 以下是我的servlet:- 以下是我的控制台日志:-

  • 我在运行Java应用程序时遇到了一个小问题(当我通过Maven运行它时,它工作得很好)。我得到一个错误: 模块位于my文件夹中,我的文件夹结构如下: 对于- 对于我的类 奇怪的是,当我在文件它再次开始工作,但在几次运行后中断。 编辑: 将位置更改为修复了该问题,但是,它破坏了我的Maven install命令并引发以下错误:

  • 我有一个像下面这样的Python脚本: 我以以下方式执行: 然后我检查是否有任何初始输出,但没有: 我至少希望在几秒钟后看到“开始”。即使等了几分钟,然后杀了它,我也没有输出。如果可能,我该如何解决这个问题?