当前位置: 首页 > 面试题库 >

RxJava —终止无限流

太叔逸春
2023-03-14
问题内容

我正在探索反应式编程和RxJava。这很有趣,但是我陷入了无法找到答案的问题。我的基本问题是:什么是合适的反应方式来终止否则将无限运行的Observable?我也欢迎对我的代码的批评和反应最佳实践。

作为练习,我正在编写日志文件tail实用程序。日志文件中的行流由表示Observable<String>。为了BufferedReader继续读取添加到文件中的文本,我忽略了通常的reader.readLine() == null终止检查,而是将其解释为意味着我的线程应该休眠并等待更多的记录器文本。

但是,尽管我可以使用来终止Observer
takeUntil,但我需要找到一种干净的方法来终止原本无限运行的文件监视程序。我可以编写自己的terminateWatcher方法/字段,但是这破坏了Observable
/ Observer封装-我想尽可能严格地遵守反应性范例。

这是Observable<String>代码:

public class FileWatcher implements OnSubscribeFunc<String> {
    private Path path = . . .;

    @Override
    // The <? super String> generic is pointless but required by the compiler
    public Subscription onSubscribe(Observer<? super String> observer) {
        try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
            String newLine = "";
            while (!Thread.interrupted()) {  // How do I terminate this reactively?
                if ((newLine = reader.readLine()) != null)
                    observer.onNext(newLine);
                else
                    try {
                        // Wait for more text
                        Thread.sleep(250);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
            }
            observer.onCompleted();
        } catch (Exception e) {
            observer.onError(e);
        }

        return null;  // Not sure what Subscription I should return
    }
}

这是观察者代码,用于在出现新行时将它们打印出来:

public static void main(String... args) {
    . . .
    Observable<String> lines = Observable.create(createWatcher(file));
    lines = lines.takeWhile(new Func1<String, Boolean>() {
        @Override
        public Boolean call(String line) {
            // Predicate for which to continue processing
            return !line.contains("shutdown");
        }
    }).subscribeOn(Schedulers.threadPoolForIO())
            .observeOn(Schedulers.currentThread());
    // Seems like I should use subscribeOn() and observeOn(), but they
    // make my tailer terminate without reading any text.

    Subscription subscription = lines.subscribe(new Action1<String>() {
        @Override
        public void call(String line) {
             System.out.printf("%20s\t%s\n", file, line);
        }
    });
}

我的两个问题是:

  1. 终止无限运行的流的反应一致方法是什么?
  2. 我的代码中还有哪些其他错误使您哭泣?:)

问题答案:

由于在请求订阅时您正在启动文件监视,因此在订阅结束时(即Subscription关闭时)终止文件是有意义的。这解决了代码注释中的一个松散的结局:返回a
Subscription,告诉FileWatcher停止观看文件。然后替换您的循环条件,以便它检查预订是否已取消,而不是检查当前线程是否已中断。

问题是,如果您的onSubscribe()方法永不返回,那不是很有帮助。也许您FileWatcher应该要求指定一个调度程序,并在该调度程序上进行读取。



 类似资料:
  • 我对使用rxjava进行反应性编程是新手,在经历了更简单的示例之后,我现在试图弄清楚如何使用连续流。下面这个例子的问题是,在我接受了3个元素后,程序不会终止。我的假设是,我不知何故需要取消订阅我的可观察的,但我不完全掌握如何终止while循环并使程序退出。 我遇到了下面的RxJava帖子--终止无限流,但我仍然不知道我遗漏了什么。

  • 我在尝试一个测试,我写了这个程序。。。 六羟甲基三聚氰胺六甲醚。。。在下面的代码中,我知道我用了“I”而不是“j”,所以为了解决这个问题,我用了这个 我在节目中遇到的问题是- 在上面的代码中,当我删除这个if块时- 这个节目无限期地进行。。。。。。。。。但当我再次放置这个if块时,程序在执行一段时间后自动终止。我也在这个链接上读到了这一点,但它展示了java的例子 它们显示了负值的原因,但在我的程

  • 问题内容: 我有一段简单的代码,该代码 应该 是一个无休止的循环,因为它将一直在增长,并且始终会比更大。 但实际上,它可以打印并且不会无限循环。我不知道为什么。但是,当我以以下方式调整代码时: 这变成一个无尽的循环,我不知道为什么。java是否会识别出一个无穷循环并在第一种情况下跳过它,而在第二种情况下必须执行一个方法调用,以使其表现出预期的效果?困惑:) 问题答案: 这两个例子并不是无止境的。

  • 我正在尝试编写一个线程,该线程将在我的主程序的后台运行,并监视某些内容。在某个时候,主程序应该向线程发出安全退出的信号。下面是一个以固定间隔将本地时间写入命令行的最小示例。 当“on”变量没有通过引用传递时,此代码会编译并产生预期的结果,只是线程永远不会终止。一旦变量通过引用传递,我就会收到编译器错误 您能建议一种修复此代码的方法吗? 额外的问题:哪里出了问题,为什么它可以与std::ref一起工

  • 我使用RxJava与Spring Boot。现在对于每个请求,我启动一个说10个子线程(使用Schedulers.nextThread())。 我将在production env中使用它,在那里我可以有多达500个并发请求 所以我的问题是a)这是一种良好的做法吗。b) 我可以为要生成的线程数量添加任何上限吗?c) 我可以在Java中运行多少并发线程(比如32GB RAM)? 所以在我的案例中,如果

  • 我有一个单线程进程,它不会在终止条件下死亡。处理信号掩码未显示SIGTERM被阻塞。我以root身份执行“kill”。我可以使用SIGKILL终止进程,但这是更大系统的一部分,我希望SIGTERM能够工作。 注意Sig*属性。SigCgt、SigIgn和SigBlk表示SIGTERM既没有被捕获、忽略或阻塞(第15位未设置-将最低有效位计算为#1)。由于SIGTERM的默认配置是终止进程,我希望它