我正在探索反应式编程和RxJava。这很有趣,但是我陷入了无法找到答案的问题。我的基本问题是:什么是合适的反应方式来终止否则将无限运行的Observable?我也欢迎对我的代码的批评和反应最佳实践。
作为练习,我正在编写日志文件tail实用程序。日志文件中的行流由表示Observable<String>
。为了BufferedReader
继续读取添加到文件中的文本,我忽略了通常的reader.readLine() == null
终止检查,而是将其解释为意味着我的线程应该休眠并等待更多的记录器文本。
但是,尽管我可以使用来终止Observer
takeUntil
,但我需要找到一种干净的方法来终止原本无限运行的文件监视程序。我可以编写自己的terminateWatcher
方法/字段,但是这破坏了Observable
/ Observer封装-我想尽可能严格地遵守反应性范例。
这是Observable<String>
代码:
public class FileWatcher implements OnSubscribeFunc<String> {
private Path path = . . .;
@Override
// The <? super String> generic is pointless but required by the compiler
public Subscription onSubscribe(Observer<? super String> observer) {
try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
String newLine = "";
while (!Thread.interrupted()) { // How do I terminate this reactively?
if ((newLine = reader.readLine()) != null)
observer.onNext(newLine);
else
try {
// Wait for more text
Thread.sleep(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
return null; // Not sure what Subscription I should return
}
}
这是观察者代码,用于在出现新行时将它们打印出来:
public static void main(String... args) {
. . .
Observable<String> lines = Observable.create(createWatcher(file));
lines = lines.takeWhile(new Func1<String, Boolean>() {
@Override
public Boolean call(String line) {
// Predicate for which to continue processing
return !line.contains("shutdown");
}
}).subscribeOn(Schedulers.threadPoolForIO())
.observeOn(Schedulers.currentThread());
// Seems like I should use subscribeOn() and observeOn(), but they
// make my tailer terminate without reading any text.
Subscription subscription = lines.subscribe(new Action1<String>() {
@Override
public void call(String line) {
System.out.printf("%20s\t%s\n", file, line);
}
});
}
我的两个问题是:
由于在请求订阅时您正在启动文件监视,因此在订阅结束时(即Subscription
关闭时)终止文件是有意义的。这解决了代码注释中的一个松散的结局:返回a
Subscription
,告诉FileWatcher
停止观看文件。然后替换您的循环条件,以便它检查预订是否已取消,而不是检查当前线程是否已中断。
问题是,如果您的onSubscribe()
方法永不返回,那不是很有帮助。也许您FileWatcher
应该要求指定一个调度程序,并在该调度程序上进行读取。
我对使用rxjava进行反应性编程是新手,在经历了更简单的示例之后,我现在试图弄清楚如何使用连续流。下面这个例子的问题是,在我接受了3个元素后,程序不会终止。我的假设是,我不知何故需要取消订阅我的可观察的,但我不完全掌握如何终止while循环并使程序退出。 我遇到了下面的RxJava帖子--终止无限流,但我仍然不知道我遗漏了什么。
我在尝试一个测试,我写了这个程序。。。 六羟甲基三聚氰胺六甲醚。。。在下面的代码中,我知道我用了“I”而不是“j”,所以为了解决这个问题,我用了这个 我在节目中遇到的问题是- 在上面的代码中,当我删除这个if块时- 这个节目无限期地进行。。。。。。。。。但当我再次放置这个if块时,程序在执行一段时间后自动终止。我也在这个链接上读到了这一点,但它展示了java的例子 它们显示了负值的原因,但在我的程
问题内容: 我有一段简单的代码,该代码 应该 是一个无休止的循环,因为它将一直在增长,并且始终会比更大。 但实际上,它可以打印并且不会无限循环。我不知道为什么。但是,当我以以下方式调整代码时: 这变成一个无尽的循环,我不知道为什么。java是否会识别出一个无穷循环并在第一种情况下跳过它,而在第二种情况下必须执行一个方法调用,以使其表现出预期的效果?困惑:) 问题答案: 这两个例子并不是无止境的。
我正在尝试编写一个线程,该线程将在我的主程序的后台运行,并监视某些内容。在某个时候,主程序应该向线程发出安全退出的信号。下面是一个以固定间隔将本地时间写入命令行的最小示例。 当“on”变量没有通过引用传递时,此代码会编译并产生预期的结果,只是线程永远不会终止。一旦变量通过引用传递,我就会收到编译器错误 您能建议一种修复此代码的方法吗? 额外的问题:哪里出了问题,为什么它可以与std::ref一起工
我使用RxJava与Spring Boot。现在对于每个请求,我启动一个说10个子线程(使用Schedulers.nextThread())。 我将在production env中使用它,在那里我可以有多达500个并发请求 所以我的问题是a)这是一种良好的做法吗。b) 我可以为要生成的线程数量添加任何上限吗?c) 我可以在Java中运行多少并发线程(比如32GB RAM)? 所以在我的案例中,如果
我有一个单线程进程,它不会在终止条件下死亡。处理信号掩码未显示SIGTERM被阻塞。我以root身份执行“kill”。我可以使用SIGKILL终止进程,但这是更大系统的一部分,我希望SIGTERM能够工作。 注意Sig*属性。SigCgt、SigIgn和SigBlk表示SIGTERM既没有被捕获、忽略或阻塞(第15位未设置-将最低有效位计算为#1)。由于SIGTERM的默认配置是终止进程,我希望它