当前位置: 首页 > 知识库问答 >
问题:

使用Observable和take运算符读取文件的RxJava问题

百里成仁
2023-03-14
...
Observable<String> fileLineObservable = Observable.defer(new Callable<String>(){
    return new ObservableSource<String> call() throws Exception {
        return new ObservableSource<String>() {
            public void subscribe(Observer<String> observer) {
                BufferedReader reader = null;
                try {
                    reader = new BufferedReader(new FileReader(filePath));
                    String line = null;
                    while ((line = reader.readLine()) != null) {
                        observer.onNext(line);
                    }
                    observer.onComplete();

                    ... catching exception and close reader
                }
            }
        }
    }
});

我还想用一个take(count)运算符使一个观察上述情况的观察者成为可观察的,如下所示:

fileLineObservable.take(2)
                  .subscribe(new Consumer<String>() {
                       public void onNext(String line) {
                           ... do something with the file line string
                       }
                  });

我在执行上述代码时遇到NullPointerException,我知道原因。NPE是由于onNext的第二次调用导致在TakeObserver实例上执行onComplete,并且在onComplete方法内部调用未设置(null)的upStream.Dispose。TakeObserver的上游变量在订阅一个Observable时应该设置为onSubscribe(Disposable Disposable)。

我该如何解决这个问题呢?我是否应该实现我自己的一次性类来设置TakeObserver的上游?

共有1个答案

索和璧
2023-03-14

这个解决方案呢?

Observable<String> observableFile2(Path path) {
        return Observable.using(
                () -> Files.newBufferedReader(path),
                reader -> {
                    return Observable.fromIterable(() -> {
                        return new Iterator<>() {
                            private String nextLine = null;

                            @Override
                            public boolean hasNext() {
                                try {
                                    nextLine = reader.readLine();
                                    return nextLine != null;
                                } catch (Exception ex) {
                                    return false;
                                }
                            }

                            @Override
                            public String next() {
                                if (nextLine != null) {
                                    return nextLine;
                                }
                                throw new IllegalStateException("nextLine can not be null.");
                            }
                        };
                    });
                },
                BufferedReader::close
        );
    }
  • Observable#using确保在disposable/OnError上正确关闭BufferedReader
  • Observable#fromiterable包装readLine调用并为我们处理onComplete。

测试

testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.2")
testRuntimeOnly("org.junit.platform:junit-platform-launcher:1.6.2")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.6.2")
testRuntimeOnly("org.junit.vintage:junit-vintage-engine:5.6.2")
testImplementation("com.google.jimfs:jimfs:1.1")
@Test
void name() {
    observableFile2(hello).take(2)
            .test()
            .assertValues("line0", "line1")
            .assertComplete();
}

@Test
void name2() {
    observableFile2(hello).take(10)
            .test()
            .assertValues("line0", "line1", "line2", "line3")
            .assertComplete();
}

@Test
void name3() {
    observableFile2(hello2)
            .test()
            .assertComplete();
}
 类似资料:
  • 主要内容:RxJava 条件运算符 介绍,RxJava 条件运算符 示例RxJava 条件运算符 介绍 以下是用于 Observable 的条件运算符。 运算符 描述 All 评估发出的所有项目以满足给定标准。 Amb 仅在给定多个 Observable 的情况下从第一个 Observable 发出所有项目。 Contains 检查 Observable 是否发出特定项目。 DefaultIfEmpty 如果 Observable 不发出任何内容,则发出默认项。 Se

  • 我看到的是一个rxjava操作符,它等待另一个observable发出一个条目来观察一个条目。我可以用flatMap和map运算符来完成,但我只是想知道是否有一个运算符可以完成这项工作。我在找takeUntil操作员的对立面。我还想让它在等待其他可观察的项目时缓冲项目。

  • 我正在学习RxJava运算符,我发现下面的这些代码没有打印任何东西: 作为ReactiveX, 创建一个可观察对象,该对象发出一系列以特定时间间隔隔开的整数 我是不是搞错了或者忘了什么?

  • 问题内容: 我在RxJava上有一连串的运算符。我希望能够根据布尔值应用两个运算符之一,而不会“破坏链”。 我是Rx(Java)的新手,我觉得这样做可能比我目前引入临时变量的方法更具惯用性和可读性。 这是一个具体示例,如果批处理大小字段为非空,则从可观察项中缓冲项目,否则使用发出单个无边界大小的批处理: 这样的事情可能吗?(伪lambdas,因为Java): 问题答案: 您可以用来保持顺序但可以自

  • 主要内容:RxJava 连接运算符 介绍,RxJava 连接运算符 示例RxJava 连接运算符 介绍 以下是 Observable 的连接运算符。 运算符 描述 Connect 指示可连接的 Observable 向其订阅者发送项目。 Publish 将 Observable 转换为可连接的 Observable。 RefCount 将可连接的 Observable 转换为普通的 Observable。 Replay 确保每个订阅者都可以看到相同的发出项目序列,即使

  • 主要内容:RxJava 数学运算符 介绍,RxJava 数学运算符 示例RxJava 数学运算符 介绍 以下是 Observable 的数学运算符。 运算符 描述 Average 评估所有项目的平均值并发出结果。 Concat 不交错地从多个 Observable 发出所有项目。 Count 计算所有项目并发出结果。 Max 评估所有项目的最大值项目并发出结果。 Min 评估所有项目的最小值项目并发出结果。 Reduce 对每个项目应用一个函数并返回结果。 Sum 评