当前位置: 首页 > 知识库问答 >
问题:

用RxJava生成无限自然数序列

洪弘亮
2023-03-14

我试图用RxJava编写一个简单的程序来生成无限自然数序列。到目前为止,我已经找到了使用observable.timer()和observable.interval()生成数字序列的两种方法。我不确定这些函数是否是解决这个问题的正确方法。我期待一个简单的函数,就像Java8中的函数一样,可以生成无限自然数。

import rx.Observable;
import rx.functions.Action1;

import java.util.stream.IntStream;

public class NaturalNumbers {

    public static void main(String[] args) {
        Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
            IntStream stream = IntStream.iterate(1, val -> val + 1);
            stream.forEach(naturalNumber -> subscriber.onNext(naturalNumber));
        });

        Action1<Integer> first = naturalNumber -> System.out.println("First got " + naturalNumber);
        Action1<Integer> second = naturalNumber -> System.out.println("Second got " + naturalNumber);
        Action1<Integer> third = naturalNumber -> System.out.println("Third got " + naturalNumber);
        naturalNumbers.subscribe(first);
        naturalNumbers.subscribe(second);
        naturalNumbers.subscribe(third);

    }
}

共有1个答案

孙辰阳
2023-03-14

问题是您实现的onnaturalnumbers.subscribe(first);onsubscribe调用,并且您正在对无限流执行foreach,因此程序从未终止。

处理它的一种方法是在不同的线程上异步订阅它们。为了方便地看到结果,我必须在流处理中引入睡眠:

Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
    IntStream stream = IntStream.iterate(1, i -> i + 1);
    stream.peek(i -> {
        try {
            // Added to visibly see printing
            Thread.sleep(50);
        } catch (InterruptedException e) {
        }
    }).forEach(subscriber::onNext);
});

final Subscription subscribe1 = naturalNumbers
    .subscribeOn(Schedulers.newThread())
    .subscribe(first);
final Subscription subscribe2 = naturalNumbers
    .subscribeOn(Schedulers.newThread())
    .subscribe(second);
final Subscription subscribe3 = naturalNumbers
    .subscribeOn(Schedulers.newThread())
    .subscribe(third);

Thread.sleep(1000);

System.out.println("Unsubscribing");
subscribe1.unsubscribe();
subscribe2.unsubscribe();
subscribe3.unsubscribe();
Thread.sleep(1000);
System.out.println("Stopping");
 类似资料:
  • 问题内容: 我正在探索反应式编程和RxJava。这很有趣,但是我陷入了无法找到答案的问题。我的基本问题是:什么是合适的反应方式来终止否则将无限运行的Observable?我也欢迎对我的代码的批评和反应最佳实践。 作为练习,我正在编写日志文件tail实用程序。日志文件中的行流由表示。为了继续读取添加到文件中的文本,我忽略了通常的终止检查,而是将其解释为意味着我的线程应该休眠并等待更多的记录器文本。

  • 问题内容: 我想生成我的JSON数据的树状视图。因此,最好将 JSON数据 解析为 多级(!)无序HTML列表 。我找到了一些插件,但无法让它们使用我的JSON数据。 好的解决方案是调用函数并将json数据作为参数移交。结果可能是一个多级无序列表。我假设该函数必须遍历所有JSON数据并写入 ul 和 li 标签。 有没有直接的方法可以做到这一点? tia! PS:示例树(适用于我的JSOn数据):

  • 我们从右到左都知道并爱/恨作文: 什么是自然/从左到右组合的“最标准”操作符(如在某种公共库中):

  • 根据RxJava文档,区间操作符“创建一个可观察对象,该对象发出一个由给定时间间隔隔开的整数序列”。我编写了下面的程序,但未调用subscribe方法。我错过什么了吗? 我正在使用io。反应性X。rxjava2版本2.2.6

  • 我目前的Android应用程序采用Room/SQLite数据库 我的一个房间实体需要一个序列列,每次插入新行时该列都会递增。 我希望自动生成此序列,但是自动生成似乎仅适用于PrimaryKey 当Int列不是主键时,有没有办法在房间中自动递增?

  • 我需要为ant创建构建文件来构建我的,我已经搜索了很多,但没有任何帮助。它仍然显示错误并且不编译。当我尝试运行jar文件时,jar文件无法被删除。这里是 这里是错误