考虑以下示例:
Observable.range(1, 10).subscribe(i -> {
System.out.println(i);
if (i == 5) {
throw new RuntimeException("oops!");
}
}, Throwable::printStackTrace);
这将输出从1到5的数字,然后打印异常。
我要实现的是使观察者保持订阅状态,并在引发异常后继续运行,即打印从1到10的所有数字。
我曾尝试使用retry()
和其他各种错误处理运算符,但是,正如文档中所述,它们的目的是处理可观察对象自身发出的错误。
最直接的解决方案是将整个过程包装onNext
到try-
catch块中,但这对我来说似乎不是一个好的解决方案。在类似的Rx.NET问题中,提出的解决方案是使扩展html" target="_blank">方法可以通过创建可观察的代理来进行包装。我试图重做:
Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));
proxy.subscribe(i -> {
System.out.println(i);
if (i == 5) {
throw new RuntimeException("oops!");
}
}, Throwable::printStackTrace);
这不会改变任何东西,因为RxJava本身将订户包装为SafeSubscriber
。使用unsafeSubscribe
绕过它似乎也不是一个好的解决方案。
我该怎么做才能解决这个问题?
这是学习Rx时出现的常见问题。
建议您将异常处理逻辑放入订阅服务器中,而不是创建一个通用的可观察包装器。
记住,Rx是将事件推送给订户。
从可观察的界面来看,很明显,除了可观察的对象处理事件花费的时间或任何引发的异常中包含的信息外,可观察的对象并没有真正了解用户。
通用包装器处理订户异常并继续向该订户发送事件是一个坏主意。
为什么?那么,可观察者应该只真正知道订户现在处于未知的故障状态。在这种情况下继续发送事件是不明智的,例如,订户可能处于一种状态,即从此刻开始的每个事件都将引发异常并花一些时间来处理。
一旦订户抛出异常,可观察者只有两种可行的行动方案:
对订户异常的特定处理将是一个糟糕的设计选择。这将在订户和可观察者之间造成不适当的行为耦合。因此,如果您想对不良订户有弹性,以上两个选择实际上就是可观察对象本身的明智责任极限。
如果您希望 订户 具有弹性并继续前进,则应绝对将其包装在异常处理逻辑中,该逻辑旨在处理您知道如何恢复的 特定异常
(并可能处理瞬时异常,日志记录,重试逻辑,断路等)。 )。
只有订户本身才有上下文来了解是否适合在遇到故障时接收其他事件。
如果您的情况需要开发可重用的错误处理逻辑,则将自己放在包装观察者的事件处理程序而不是 可
观察的事件的心态上,并且请注意不要在失败时盲目地进行事件传递。放开!尽管没有写过有关Rx的文章,但对于娱乐性软件工程经典来说,在最后一点上还有很多话要说。如果您还没有阅读,我强烈建议您。
考虑以下示例: 这将输出从1到5的数字,然后打印异常。 我想要实现的是使观察器保持订阅状态,并在抛出异常后继续运行,即打印从1到10的所有数字。 我尝试过使用和其他各种错误处理操作符,但正如文档中所述,它们的目的是处理可观察对象本身发出的错误。 最直接的解决方案是将的整个主体包装成一个try-catch块,但对我来说这听起来不是一个好的解决方案。在类似的Rx中。NET问题,提出的解决方案是制作一个
我正在从事一个涉及Hystrix的项目,我决定使用RxJava。现在,忘记Hystrix的其余部分,因为我相信主要问题是我完全搞砸了正确编写可观察代码。 需要:我需要一种方法来返回一个代表多个可观察对象的可观察对象,每个可观察对象都运行一个用户任务。我希望该可观察对象能够返回任务的所有结果,甚至错误。 问题:可观测流会因错误而消亡。如果我有三个任务,而第二个任务引发了一个异常,那么即使第三个任务成
当我创建5个observables并用单独的订阅者订阅它们时,直觉上我认为每个订阅者都将获得其observables的相应数据,这些数据通过onNext()调用发出: 然而,我在日志中看到的是一两个“testit onnext”。 如果有任何提示,我将不胜感激。
我正在尝试创建一个RxJava BlockingObservable,它将每隔X毫秒发出一个变量的值,直到(条件==true)或超时发生。 下面的代码似乎与我想要的很接近,但它总是发出一次,然后退出。奇怪的是,我在中有一个永远不会正确的条件——我希望这个可观察到的持续发出并最终超时,但事实并非如此。 我错过了什么/做错了什么?
我正在尝试开发我的第一个RxJava例子 我有一个带有文本框和三个按钮的主要活动。第一个按钮初始化单独类中的整数。第二个按钮订阅一个可观察量,该可观察量假定正在观察整数。第三个按钮将整数的值减小 1。 这是我的密码 和班级 当我尝试使用 订阅时,它只是给了我 的值(即 6),然后它给了我完成! 然后我尝试使用,认为我需要使用,只是而不是,但后来我得到了一个返回的空的,然后再次完成! 有人能帮助我从
我正在学习RxJS,对于“听众”在哪里(在可观察的或观察者中),他们是如何订阅/取消订阅的,以及当观察者“不再对”可观察的“不感兴趣”时会发生什么,比如当你使用或。 对于第一部分——什么是订阅什么,什么是倾听者——我对这些陈述之间看似矛盾的地方感到困惑。从http://reactivex.io/rxjs/manual/overview.html我们读到观察者不是观察者的“听众” 这与addEven