当前位置: 首页 > 知识库问答 >
问题:

RxJava通过自动连接处理可连接的可观察对象

皮景龙
2023-03-14

我有一个上游,它以块的形式发出数据。这个流应该使用slttleFirst进行限制。此外,在所有油门计时器完成后,应该发出最后一个值。不幸的是,RxJava 2中没有slttleFierstBu的运算符,因此我实现了一个观察变压器:

upstream -> {
      Observable<T> autoConnectingUpstream =
          upstream //
              .publish()
              .autoConnect(2);

      return Observable.merge(
              autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
              autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
          //if debounce and throttle emit the same item
          .distinctUntilChanged();
    }

除了处理之外,它工作得很好。在处理完生成的可观察对象后,我也想处理上游。我该怎么做?

我尝试使用autoConnect(2,一次性)访问一次性-

Observable.<T>create(
            emitter -> {
              Observable<T> autoConnectingUpstream =
                  upstream //
                      .publish()
                      .autoConnect(2, emitter::setDisposable);

              Observable.merge(
                      autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
                      autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
                  //if debounce and throttle emit the same item
                  .distinctUntilChanged()
                  .subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
            });

共有1个答案

龚迪
2023-03-14

据我所知,我在这里回答我自己的问题,如果我错了,请告诉我。

根据akarnokd的评论,解决方案如下所示:

Observable.<T>create(
        emitter -> {
          Observable<T> autoConnectingUpstream =
              upstream //
                  .publish()
                  .autoConnect(2, emitter::setDisposable);

          Observable.merge(
                  autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
                  autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
              //if debounce and throttle emit the same item
              .distinctUntilChanged()
              .subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
        });

自动连接的第二个参数是一个动作,它表示由两个连接的观察者建立的连接。

这可以与emitter::SetDisposal一起使用,以便在观察者处理生成的可观察对象时处理自动连接。

 类似资料:
  • 我正在尝试开发我的第一个RxJava例子 我有一个带有文本框和三个按钮的主要活动。第一个按钮初始化单独类中的整数。第二个按钮订阅一个可观察量,该可观察量假定正在观察整数。第三个按钮将整数的值减小 1。 这是我的密码 和班级 当我尝试使用 订阅时,它只是给了我 的值(即 6),然后它给了我完成! 然后我尝试使用,认为我需要使用,只是而不是,但后来我得到了一个返回的空的,然后再次完成! 有人能帮助我从

  • 我正在开发利用RxJava、realm和改进的应用程序。 我需要创建非常具体的数据处理链。我需要在io调度程序上执行改装调用,然后在我的自定义单线程领域调度程序上处理提供的数据,然后将结果推送到主线程调度程序上的ui。我试图通过使用多个组合来实现这一点,包括观察(observeOn)和订阅(subscribeOn),但我无法让中间部分在调度程序(scheduler)上执行。 我的目标是这样的

  • 我正在尝试创建一个RxJava BlockingObservable,它将每隔X毫秒发出一个变量的值,直到(条件==true)或超时发生。 下面的代码似乎与我想要的很接近,但它总是发出一次,然后退出。奇怪的是,我在中有一个永远不会正确的条件——我希望这个可观察到的持续发出并最终超时,但事实并非如此。 我错过了什么/做错了什么?

  • 在android 6.0.1 Samsung s6 Edge+上的测试 当device screen脱机并从debug中拔出时,可观察到的只是停止发射项目。如果设备打开,则开始发射对象。另一个问题是,在停止接收项目之前,我会按照相同项目的顺序随机地得到2/3个重复调用 ____________________________edit_________________________________

  • 我有以下资料: > AJava类:ClassA实现观察者 Java接口:Inter(扩展可观察,不可能) Java类:ClassB实现可观察的内部扩展 Java类:ClassC实现可观察的内部扩展 现在ClassA的代码有点像这样。 现在,如果一个特定的事件发生在ClassB或ClassC中,我希望ClassA知道它。我想过使用观察者/可观察的,但问题是接口不能扩展可观察的。 如果有人理解这个问题

  • 我面临的问题是我需要一个身份验证令牌来创建我的Retrofit服务。我目前使用可观察来获取所述令牌,导致一个相当丑陋的可观察构造: 我忍不住觉得这不是应该做的。我说得对吗?