当前位置: 首页 > 面试题库 >

RxJava:“ java.lang.IllegalStateException:只允许一个订阅者!”

司马萧迟
2023-03-14
问题内容

我正在使用RxJava计算Android中某些传感器数据的标准化自动相关性。奇怪的是,我的代码引发了一个异常(“
java.lang.IllegalStateException:只允许一个订阅者!”),我不确定该怎么做:我知道GroupedObservables订阅多个订阅者时可能会抛出此异常,但是我不认为我在任何地方都在使用这种东西。

在下面,您找到(最有可能)触发异常的方法:

public Observable<Float> normalizedAutoCorrelation(Observable<Float> observable, final int lag) {
    Observable<Float> laggedObservable = observable.skip(lag);

    Observable<Float> meanObservable = mean(observable, lag);
    Observable<Float> laggedMeanObservable = mean(laggedObservable, lag);

    Observable<Float> standardDeviationObservable = standardDeviation(observable, meanObservable, lag);
    Observable<Float> laggedStandardDeviationObservable = standardDeviation(laggedObservable, laggedMeanObservable, lag);

    Observable<Float> deviation = observable.zipWith(meanObservable, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float value, Float mean) {
            return value - mean;
        }
    });

    Observable<Float> laggedDeviation = observable.zipWith(laggedMeanObservable, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float value, Float mean) {
            return value - mean;
        }
    });

    Observable<Float> autoCorrelationPartObservable = deviation.zipWith(laggedDeviation, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float value, Float laggedValue) {
            return value * laggedValue;
        }
    });

    Observable<Float> autoCorrelationObservable = flatten(autoCorrelationPartObservable.window(lag, 1).scan(new Func2<Observable<Float>, Observable<Float>, Observable<Float>>() {
        @Override
        public Observable<Float> call(Observable<Float> memoObservable, Observable<Float> observable) {
            if(memoObservable == null) return observable;

            return memoObservable.zipWith(observable, new Func2<Float, Float, Float>() {
                @Override
                public Float call(Float memo, Float value) {
                    return memo + value;
                }
            });
        }
    }));

    Observable<Float> normalizationObservable = standardDeviationObservable.zipWith(laggedStandardDeviationObservable, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float standardDeviation, Float laggedStandardDeviation) {
            return lag * standardDeviation * laggedStandardDeviation;
        }
    });

    return autoCorrelationObservable.zipWith(normalizationObservable, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float autoCorrelation, Float normalization) {
            return autoCorrelation / normalization;
        }
    });
}

这是我得到的stacktrace:

java.lang.IllegalStateException: Only one subscriber allowed!
at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:124)
at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:81)
at rx.Observable.unsafeSubscribe(Observable.java:7304)
at rx.internal.operators.OperatorZip$Zip.start(OperatorZip.java:210)
at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:154)
at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:120)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:41)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:30)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable.unsafeSubscribe(Observable.java:7304)
at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMerge.java:188)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:158)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:93)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:110)
at rx.internal.operators.OperatorWindowWithSize$InexactSubscriber.onNext(OperatorWindowWithSize.java:173)
at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:255)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:326)
at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:255)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:326)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:635)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
  at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
  at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:98)
  at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
  at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
  at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
  at rx.internal.operators.BufferUntilSubscriber.onCompleted(BufferUntilSubscriber.java:161)
  at rx.internal.operators.OperatorWindowWithSize$InexactSubscriber.onNext(OperatorWindowWithSize.java:183)
  at rx.internal.operators.OperatorSkip$1.onNext(OperatorSkip.java:58)
  at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
  at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
  at rx.subjects.PublishSubject.onNext(PublishSubject.java:121)
  at com.github.joopaue.smartphonesensing.SensorService$3.onSensorChanged(SensorService.java:102)
  at android.hardware.SystemSensorManager$SensorEventQueue.dispatchSensorEvent(SystemSensorManager.java:418)
  at android.os.MessageQueue.nativePollOnce(Native Method)
  at android.os.MessageQueue.next(MessageQueue.java:138)
  at android.os.Looper.loop(Looper.java:123)
  at android.app.ActivityThread.main(ActivityThread.java:5146)
  at java.lang.reflect.Method.invokeNative(Native Method)
  at java.lang.reflect.Method.invoke(Method.java:515)
  at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:732)
  at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:566)
  at dalvik.system.NativeStart.main(Native Method)

我认为我在这里没有做任何奇怪的事情:一些压缩,缩小,扫描和平面图。

我是否遗漏了一些完全显而易见的东西,是否有一些隐藏的规则要打破,还是RxJava中的错误?谢谢!

PS。如果缺少一些代码来得出结论,请问一下,我会发布!


问题答案:

在RxJava中,运算符groupBywindow返回只能被订阅一次的observable,如果被订阅,则将其累积的内容重播给唯一的订阅者并切换到“热”模式。

这是在返回完全热的可观察值和冒险缺失值之间或在返回无限制的重播可观察值之间进行权衡的方法,该值允许任何订户但无限期地保留累积的内容。

中间地带,即单个订户,冷到热可观察到的行为被认为是最不令人惊讶的行为,它使开发人员可以选择应用更多的运算符并在两个极端之间选择一个要点:

source.window(1, TimeUnit.SECONDS)
    .map(w -> w.publish())
    .doOnNext(w -> w.connect())
    .subscribe(...)

source.window(1, TimeUnit.SECONDS)
    .map(w -> w.cache())
    .subscribe(...)


 类似资料:
  • 问题内容: 我想使我的网站一次只允许一个会话。例如,假设用户已经登录到我在firefox上的网站,如果该用户再次登录到另一台浏览器(例如同一台计算机或另一台计算机上的Opera),则Firefox上的会话将被破坏。但是,如果仍为一届会议,则有关Firefox的会议仍将保留。我可以知道该怎么做吗?我正在使用php和apache。谢谢。 问候。本杰明 问题答案: 我建议您做这样的事情: 假设用户“ A

  • #include <stdio.h> #include <pthread.h> int a = 0; int b = 0; void *thread1_func(void *p_arg) { while (1) { a++; sleep(1); } } void *thread2_fu

  • 为方便开发测试,经常会在线下共用一个所有服务可用的注册中心,这时,如果一个正在开发中的服务提供者注册,可能会影响消费者不能正常运行。 可以让服务提供者开发方,只订阅服务(开发的服务可能依赖其它服务),而不注册正在开发的服务,通过直连测试正在开发的服务。 禁用注册配置 <dubbo:registry address="10.20.153.10:9090" register="false" /> 或

  • 我是这样理解的,从可观察的角度来看: > 有人订阅了我,我应该开始发送项目 [订阅者:1][要发送的项目:1,2,3] 向订阅服务器发送项“1” [订阅服务器:1][要发送的项:2,3] ... 但它不是这样运作的。就像它们是两个独立的可观测物在一个。这让我很困惑,为什么他们不把项目给所有的订户? 奖金: 谢了!

  • 物联网有很多设备,通过订阅设备的topic可以监听物联网设备接收到的消息。 请求方式: "|4|1|2|topic|\r" 参数: topic 设置订阅的topic,获取设备topic可参考教程 返回值: "|4|1|2|1|\r" 订阅成功 "|4|1|2|2|1|\r" topic订阅达到上限(一个OBLOQ最多订阅5个topic),订阅失败 "|4|1|2|2|2|\r" topic订阅失败

  • 我有这个问题,我一直在寻找,但找不到解决方案(或者也许我不能根据其他答案做出解决方案)。 我的问题是,我需要找到一种方法来等待可观察的(有自己的订户)并等待另一个可观察的(有自己的订户)完成。 场景是这样的: 奥布1- 奥布斯2 - 我主要担心的是我需要两个订阅者。在我看来,obs1 和 obs2 并行运行,但需要检查 obs1 是否以新的会话令牌完成。也许这不是RxJava的主要目的。 Obs1