当前位置: 首页 > 知识库问答 >
问题:

RxJava 2.0-在publish()中为未捕获的子服务器错误处理Resources。refCount()

南门焱
2023-03-14

我对RxJava相当陌生,和其他许多人一样,我正试图了解异常处理。我在网上读了不少帖子(例如,这里讨论了如何处理observer的onNext抛出的异常),认为我已经了解了这些概念的基本概念。

在上面提到的讨论中,其中一张海报说,当在订阅服务器中引发异常时,RxJava会执行以下操作:

实现通用处理以记录故障并停止发送它事件(任何类型)并清理由于该订阅者的任何资源并继续处理任何剩余的订阅。

这或多或少也是我所看到的,我唯一有问题的是“清理任何资源”位。为了说明这一点,我们假设以下示例:

我想创建一个可观察的,它在每个接收到的消息上监听异步事件源(例如JMS队列)和onNext()s。所以在(伪)代码中,我会做类似的事情:

Observable<String> observable = Observable.create( s -> {
  createConnectionToBroker();
  getConsumer().setMessageListener(message -> s.onNext(transform(message)));
  s.setDisposable(new Disposable() {
    public void dispose() {
      tearDownBrokerConnection();
    }
  });
});

由于我想为许多订阅者/观察者重用消息侦听器,所以我不直接在创建的可观察中订阅,而是使用发布(). refCount()团队。类似的东西:

Observable<String> observableToSubscribeTo = observable.publish().refCount();

Disposable d1 = observableToSubscribeTo.subscribe(s -> ...);
Disposable d2 = observableToSubscribeTo.subscribe(s -> ...);

这一切都按预期工作。代码仅在建立第一个订阅时连接到JMS,并且在最后一个观察者是dispose()d时关闭与代理的连接。

然而,当订阅者在启用onNext()时抛出异常时,事情似乎变得一团糟。正如预期的那样,抛出的观察者将被屏蔽,并且无论何时发布新事件,都不会再通知它。我的问题是,当所有剩余订阅者都是dispose()时,维护到消息代理连接的Observable将不再被通知。在我看来,抛出异常的订户似乎处于某种僵尸状态。当涉及到事件分布时,它会被忽略,但它会以某种方式阻止根可观察对象在最后一个订阅者是dispose()时得到通知。

我知道RxJava希望观察者确保不抛出而是正确处理最终的异常。不幸的是,在我想提供一个库向调用者返回可观察的情况下,我无法控制我的订阅者。这意味着,我永远无法保护我的库免受愚蠢的观察者的攻击。

所以,我在问自己:我是否遗漏了什么?订阅者抛出时是否真的没有机会正确清理?这是一个bug还是只是我不理解库?

非常感谢您的任何见解!

共有1个答案

连厉刚
2023-03-14

如果您可以展示一些单元测试来演示问题(不需要JMS),那就太好了。

此外,RxJava 2中的onNext不应抛出;如果它这样做了,那就是一种未定义的行为。如果你不信任你的消费者,你可以有一个终端可观察的转换器,它可以安全订阅,而不是简单的订阅,它可以防止下游的不当行为:

.compose(o -> v -> o.safeSubscribe(v))

.compose(new ObservableTransformer<T>() {
    @Override public Observable<T> apply(final Observable<T> source) {
        return new Observable<T>() {
            @Override public void subscribeActual(Observer<? super T> observer) {
                 source.safeSubscribe(observer);
            }
        };
    }
})
 类似资料:
  • 问题内容: 我喜欢Flask的错误捕获。很简单: 就像魅力。但这对于500错误代码不起作用。当出现问题时,我想捕获Python错误,代码中引发了异常。那可能吗? 我应该注意,如果我显式调用视图,则500错误处理程序确实可以工作。因此,这明确适用于Python代码失败的情况。 问题答案: 默认情况下,你所描述的是Flask的工作方式。我的假设是你正在调试模式下运行,因此在调试屏幕中会向你显示异常。确

  • 问题内容: 我知道可可中有一个UncaughtExceptionHandler,但是我正在为Swift寻找相同的东西。即,每当应用程序中有任何错误/异常由于任何错误而未在本地捕获时,它应该一直冒泡到顶级应用程序对象,在那里我应该能够妥善处理它并适当地响应用户。 Android有它。Flex有它。Java有它。想知道为什么Swift缺少此关键功能。 问题答案: Swift没有机制来捕获所有任意的运行

  • 如果 Scala 未来失败,并且没有延续“观察到”该故障(或者唯一的延续使用 map/flatMap 并且在发生故障时不运行),那么错误就不会被发现。我希望至少记录这些错误,以便我可以找到错误。 我使用术语“观察到的错误”,因为在.Net Tasks中,当GC收集Task对象时,有机会捕获“未观察到的任务异常”。同样,使用同步方法,可以记录终止线程的未捕获异常。 在Scala futures中,“

  • 我的代码有问题。如何处理从服务到GSP的错误?我使用render from service或controller进行了尝试,但类似于[值为[{2}]的类[{1}]的属性[{0}]不是有效的电子邮件地址],并得到错误500:带有完整异常跟踪的内部服务器错误。我的消息来源: UserController.groovy

  • 我现在必须学习通过fire base编写移动应用程序web服务。我点击了这个链接:https://firebase-php.readthedocs.io/en/stable/ 在我的核心网站中,我创建web服务文件夹,然后创建我的fire。php文件。这个文件代码在这里, 我得打电话给我的支持档案:https://github.com/kreait/firebase-php/ 但我还是得到了一个:

  • 我正在尝试创建一个过滤器来处理异常(请参见在JSF中处理未捕获的异常) 我在日志中看到错误: 我做错了什么?