当前位置: 首页 > 知识库问答 >
问题:

暂停并恢复基于RxJava 2中的布尔门的可观察对象。十、

吕亮
2023-03-14

假设我有一个处理器,每当按下一个按钮时,它都会发出一个布尔值,可以将此视为切换。

    boolean gateValue = true;
    PublishProcessor<Boolean> gate = PublishProcessor.create();
    view.onButtonClicked()
            .subscribe(new Action1<Void>() {
                @Override
                public void call(final Void aVoid) {
                    gate.onNext(gateValue = !gateValue);
                }
            }));

我想做的是使用gate的值暂停并恢复一个可观察的序列,在暂停时缓冲发出的值。

我已经读了很多,虽然在其他语言的反应式扩展中似乎是可能的,但RxJava似乎不支持它。

这是我想要实现的一个示例,它只是每秒输出一个增量值。当我按下按钮时,我希望输出停止,直到我再次按下它,这将输出两次按下按钮之间发出的每个项目:

Flowable.interval(1, TimeUnit.SECONDS)
                .bufferWhile(gate)
                .flatMapIterable(longs -> longs)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(final Long aLong) throws Exception {
                        view.displayTime(aLong);
                    }
                });

有没有人知道如何实现这样的目标?

编辑我写了一篇关于如何实现这个的博客文章https://medium.com/@scottalancooper/暂停和恢复-a-stream-in-rxjava-988a0977b771#. gj7fsi1xk

共有3个答案

上官英哲
2023-03-14

只需使用现成的可观察的。获取一个可观察的

左丘源
2023-03-14

您可以使用默认的RxJava运算符来实现这一点:

    final Random random = new Random();
    final Observable<Boolean> gates = Observable.interval(10, TimeUnit.SECONDS)
            .map(it -> random.nextBoolean())
            .startWith(false)
            .doOnNext(it -> System.out.println("Gate " + (it ? "opened" : "closed")))
            .take(100);
    final Observable<Long> data = Observable.interval(3, TimeUnit.SECONDS)
            .doOnNext(it -> System.out.println("New value " + it))
            .take(100);

    gates.publish(innerGates -> data.publish(innerData -> Observable.merge(
            innerData.window(
                    innerGates.distinctUntilChanged().filter(it -> it),
                    (ignore -> innerGates.distinctUntilChanged().filter(it -> !it))
            ).flatMap(it -> it),
            innerData.buffer(
                    innerGates.distinctUntilChanged().filter(it -> !it),
                    (ignore -> innerGates.distinctUntilChanged().filter(it -> it))
            )
                    .flatMap(Observable::from)
    )))
            .subscribe(it -> System.out.println("On next " + it));
上官波鸿
2023-03-14

现在RxJava2扩展库中有一个运算符阀()来执行请求的行为。

 类似资料:
  • :) 我已经在一个(奇怪的)情况中结束了自己,简单地说,我不想使用来自Kafka的任何新记录,因此暂停主题中所有分区的sparkStreaming消费(InputStream[ConsumerRecord]),执行一些操作,最后,恢复消费记录。 首先这可能吗? 我一直在尝试这样的事情: 但是我得到了这个: 任何帮助我理解我遗漏了什么,以及为什么当消费者明确分配了分区时我会得到空结果的帮助都将受到欢

  • 问题内容: 我注意到,有很多主题是有关使用暂停/恢复MP3的,因此为了帮助所有人,我专门为此设计了整个课堂!请参阅下面的答案。 注意:这是供我个人使用的,因此它可能不如某些人希望的那样健壮。但是由于其简单性,进行简单的修改并不难。 问题答案: 播放器的一个非常简单的实现,实际上是暂停播放。它通过使用单独的线程播放流并告诉播放器线程是否/何时暂停和继续工作来工作。

  • 我相信答案是否定的,但是Twilio提供暂停/恢复录音的能力吗?用例是记录一个呼叫,但在收集敏感信息时暂停记录。从REST文档来看,它似乎不是一个受支持的功能。我想有人可能已经为这个要求找到了一些选择。

  • 我正在努力理解如何合并两个可观察对象并利用它们合并后的产品。我在mergeMap、switchMap、flatMap、大理石图等上看了无数视频,但我仍然不知道合并观测值是如何工作的。我觉得在使用RxJS的时候,我不会有效率,甚至不会正确。 我有一个要订阅的可观测值,我还想订阅代码中特定表单数组的valueChanges可观测值。但是,我需要确保只有在正确构建表单数组之后才能进行第二次订阅,否则将出

  • 问题内容: 我一直在寻找新的rx java 2,但我不确定我是否已经明白了这个主意… 我知道我们所拥有的并没有支持。 因此,基于例如,可以说我有有: 在大约128个值之后,这将崩溃,这很明显我消耗的速度比获取项目要慢。 但是,我们有相同的 即使我延迟使用它,它仍然完全不会崩溃。为了工作,可以说我放了一个运算符,崩溃已经消失了,但并不是所有值都被发出。 因此,我目前在脑海中找不到答案的基本问题是,为

  • 我一直在看新的rx java 2,我不太确定我是否理解了< code >背压的概念... 我知道我们有没有支持的和有背压。 因此,基于示例,假设我有与: 这将在大约128个值之后崩溃,很明显,我的消费速度比获取物品要慢。 但是< code>Observable也是如此 这将不会崩溃,即使我把一些消费延迟,它仍然工作。为了使< code >可流动工作,假设我将< code>onBackpressur