当前位置: 首页 > 知识库问答 >
问题:

如何使用RxJava区间运算符

严兴言
2023-03-14

我正在学习RxJava运算符,我发现下面的这些代码没有打印任何东西:

public static void main(String[] args) {

    Observable
    .interval(1, TimeUnit.SECONDS)
    .subscribe(new Subscriber<Long>() {
        @Override
        public void onCompleted() {
            System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("onError -> " + e.getMessage());
        }

        @Override
        public void onNext(Long l) {
            System.out.println("onNext -> " + l);
        }
    });
}

作为ReactiveX,间隔

创建一个可观察对象,该对象发出一系列以特定时间间隔隔开的整数

我是不是搞错了或者忘了什么?

共有3个答案

方永贞
2023-03-14

正如他们告诉您的那样,间隔工作是异步的,因此您必须等待所有事件完成。

您可以在订阅后获得订阅,然后使用作为reactiveX平台一部分的TestSubcriber,它将为您提供等待所有事件终止的功能。

       @Test
public void testObservableInterval() throws InterruptedException {
    Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)
              .map(time-> "item emitted")
              .subscribe(System.out::print,
                         item -> System.out.print("final:" + item));
    new TestSubscriber((Observer) subscription)
            .awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}

如果您需要,我的github中有更多示例https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

庄飞
2023-03-14

放置<代码>线程。在订阅之后睡眠(1000000),您将看到它正在工作<代码>可观察。默认情况下,间隔在调度程序上运行。computation()因此流是在主线程以外的线程上运行的。

端木令雪
2023-03-14

您必须阻止直到可观察对象被消耗:

public static void main(String[] args) throws Exception {

    CountDownLatch latch = new CountDownLatch(1);

    Observable
    .interval(1, TimeUnit.SECONDS)
    .subscribe(new Subscriber<Long>() {
        @Override
        public void onCompleted() {
            System.out.println("onCompleted");
            // make sure to complete only when observable is done
            latch.countDown();
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("onError -> " + e.getMessage());
        }

        @Override
        public void onNext(Long l) {
            System.out.println("onNext -> " + l);
        }
    });

    // wait for observable to complete (never in this case...)
    latch.await();
}

例如,您可以添加. get(10)以查看可观察的完整。

 类似资料:
  • 根据RxJava文档,区间操作符“创建一个可观察对象,该对象发出一个由给定时间间隔隔开的整数序列”。我编写了下面的程序,但未调用subscribe方法。我错过什么了吗? 我正在使用io。反应性X。rxjava2版本2.2.6

  • 我看到的是一个rxjava操作符,它等待另一个observable发出一个条目来观察一个条目。我可以用flatMap和map运算符来完成,但我只是想知道是否有一个运算符可以完成这项工作。我在找takeUntil操作员的对立面。我还想让它在等待其他可观察的项目时缓冲项目。

  • 问题内容: 假设我正在Swift中实现一个根类,我声明该根类采用了协议(我希望能够知道我类型的数组是否包含给定的实例)。 *在此特定情况下,将协议的所需运算符实现为之间有 *什么区别( 如果有的话): …而不是仅仅这样做: 作为参考,这是文档中所说的: 类实例或元类型的唯一标识符。在Swift中,只有类实例和元类型才具有唯一标识。对于结构,枚举,函数或元组,没有身份的概念。 ......这是什么的

  • 主要内容:RxJava 连接运算符 介绍,RxJava 连接运算符 示例RxJava 连接运算符 介绍 以下是 Observable 的连接运算符。 运算符 描述 Connect 指示可连接的 Observable 向其订阅者发送项目。 Publish 将 Observable 转换为可连接的 Observable。 RefCount 将可连接的 Observable 转换为普通的 Observable。 Replay 确保每个订阅者都可以看到相同的发出项目序列,即使

  • 主要内容:RxJava 数学运算符 介绍,RxJava 数学运算符 示例RxJava 数学运算符 介绍 以下是 Observable 的数学运算符。 运算符 描述 Average 评估所有项目的平均值并发出结果。 Concat 不交错地从多个 Observable 发出所有项目。 Count 计算所有项目并发出结果。 Max 评估所有项目的最大值项目并发出结果。 Min 评估所有项目的最小值项目并发出结果。 Reduce 对每个项目应用一个函数并返回结果。 Sum 评

  • 主要内容:RxJava 条件运算符 介绍,RxJava 条件运算符 示例RxJava 条件运算符 介绍 以下是用于 Observable 的条件运算符。 运算符 描述 All 评估发出的所有项目以满足给定标准。 Amb 仅在给定多个 Observable 的情况下从第一个 Observable 发出所有项目。 Contains 检查 Observable 是否发出特定项目。 DefaultIfEmpty 如果 Observable 不发出任何内容,则发出默认项。 Se