当前位置: 首页 > 知识库问答 >
问题:

是否存在只传播错误而不终止自身的可观察性?

蒋招
2023-03-14

我在负责同步的类中使用PublishSubject。当同步完成时,将通知所有订阅服务器。发生错误时也会发生同样的情况。我注意到,下次我在发生错误后订阅时,它会立即返回给订阅者。

因此类可能如下所示:

public class Synchronizer {
private final PublishSubject<Result> mSyncHeadObservable = PublishSubject.create();
    private final ThreadPoolExecutor mExecutor = new ThreadPoolExecutor(1, 1,
            10, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(true),
            new ThreadPoolExecutor.DiscardPolicy());


    public Observable<Result> syncHead(final int chunkSize) {
              mExecutor.execute(new Runnable() {
      @Override
      public void run() {
          try {
              //Do some work which either returns a result or throws an error
              //...

              mSyncHeadObservable.onNext(Notification.createOnNext(/*some result*/));
          } catch (Throwable error) {
              mSyncHeadObservable.onError(Notification.<Result>createOnError(error));
          }
      }
  });

是否有一个可观察的东西可以作为代理?可能是其他Rx方法?

更新:我遵循了@Akarnokd方法,发出包装到RxJava通知中的事件。然后通过flatmap()展开它们。因此synchronizer类的客户端不需要执行此操作。

    //...
private PublishSubject<Notification<Result>> mSyncHeadObservable = PublishSubject.create();

      public Observable<Result> syncHead(final int chunkSize) {

          return mSyncHeadObservable.flatMap(new Func1<Notification<Result>, Observable<Result>>() {
            @Override
            public Observable<Result> call(Notification<Result> result) {
                if (result.isOnError()) {
                    return Observable.error(result.getThrowable());
                }

                return Observable.just(result.getValue());
            }
        }).doOnSubscribe(
            new Action0() {
                @Override
                public void call() {
                    startHeadSync(chunkSize);
                }
            });
      }

      private void startHeadSync(final int chunkSize) {
          mExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    //Do some work which either returns a result or throws an error
                    //...

                    mSyncHeadObservable.onNext(Notification.createOnNext(/*some result*/));
                } catch (Throwable error) {
                    mSyncHeadObservable.onError(Notification.<Result>createOnError(error));
                }
            }
          });
       }
  //...

共有1个答案

云令
2023-03-14

我不确定您希望通过这种设置实现什么,但通常,为了避免PublishSubject出现终端条件,您应该将值和错误封装到一个公共结构中,并始终发出这些值和错误,永远不要发出任何onerroronCompleted。一种选择是使用RXJava自己的事件包装器通知,您的订阅服务器应该解压缩该值。

 类似资料:
  • 我有两个演员,父母和孩子。家长使用内容监视孩子。看(孩子)。如果孩子调用上下文。停止(自我)父级收到终止消息。但是,如果子级抛出异常,akka将重新启动该异常,但不会向父级发送终止消息。 家长参与者如何监视孩子并监视任何终止/重新启动? 我提出的一个选项是覆盖父级中的主管策略,以便在出现任何异常时停止: 据我所知,这将适用于这位演员的所有孩子。理想情况下,我希望对个别孩子(演员类型)有一个不同的监

  • 我已经阅读了ReactiveX留档几次,仍然无法完全理解当观察者订阅可观察文件时会发生什么。 我们来看一个简单的例子: StackBlitz代码。 我的问题: 传递给可观察对象的

  • 我的API对两个独立的服务进行大约100次下游调用。在我将回复返回给客户之前,所有回复都需要汇总。我使用hystrix-feign进行HTTP调用。 我提出了一个我认为是优雅的解决方案,直到在rxJava文档中我发现了以下内容 BlockingObservable是提供阻塞运算符的各种可观察对象。它可以用于测试和演示目的,但通常不适用于生产应用程序(如果您认为需要使用BlockingObserva

  • 让我们考虑下面的示例代码: 在函数gude()中,将创建一个新的observable,它将发出哈希值,该哈希值的前n个前导值设置为零。一个观察者订阅了那个可观察的,并立即取消订阅。让我们假设函数createHashWithNLeadingZeroes()需要相当长的时间来生成响应。 我想这里发生了以下事情: (1) 创建了一个新的可观察对象,描述可观察对象行为的函数被内部存储在属性_subscri

  • 问题内容: 我一直在阅读Observer模式,以保持UI处于最新状态,但仍然看不到它的用途。即使在我的特定对象中通知了我的MainActivity然后运行update();方法我仍然无法使用Pet对象来获取更新值,因为该对象是在Oncreate中创建的…而我只是无法创建新对象,因为那时变量会有所不同..这是我的实施,它似乎不起作用。 观察者/ MainActivity 可观察/宠物 问题答案: 首

  • 我想使用以下代码实现文件下载: RestAPI: 服务: 组件: 当我启动angular时,我得到错误:src/app/panel/service/download中的错误。服务ts(17,91):错误TS2339:类型“Observable”上不存在属性“map”。 将导入代码的正确wya是什么?当我点击下载按钮时,什么也没发生。