我有一个上游,它以块的形式发出数据。这个流应该使用slttleFirst进行限制。此外,在所有油门计时器完成后,应该发出最后一个值。不幸的是,RxJava 2中没有slttleFierstBu的运算符,因此我实现了一个观察变压器:
upstream -> {
Observable<T> autoConnectingUpstream =
upstream //
.publish()
.autoConnect(2);
return Observable.merge(
autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
//if debounce and throttle emit the same item
.distinctUntilChanged();
}
除了处理之外,它工作得很好。在处理完生成的可观察对象后,我也想处理上游。我该怎么做?
我尝试使用autoConnect(2,一次性)访问一次性-
Observable.<T>create(
emitter -> {
Observable<T> autoConnectingUpstream =
upstream //
.publish()
.autoConnect(2, emitter::setDisposable);
Observable.merge(
autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
//if debounce and throttle emit the same item
.distinctUntilChanged()
.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
});
据我所知,我在这里回答我自己的问题,如果我错了,请告诉我。
根据akarnokd的评论,解决方案如下所示:
Observable.<T>create(
emitter -> {
Observable<T> autoConnectingUpstream =
upstream //
.publish()
.autoConnect(2, emitter::setDisposable);
Observable.merge(
autoConnectingUpstream.throttleFirst(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler),
autoConnectingUpstream.debounce(message.updatelimit().get(), TimeUnit.MILLISECONDS, scheduler))
//if debounce and throttle emit the same item
.distinctUntilChanged()
.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
});
自动连接的第二个参数是一个动作,它表示由两个连接的观察者建立的连接。
这可以与emitter::SetDisposal一起使用,以便在观察者处理生成的可观察对象时处理自动连接。
我正在尝试开发我的第一个RxJava例子 我有一个带有文本框和三个按钮的主要活动。第一个按钮初始化单独类中的整数。第二个按钮订阅一个可观察量,该可观察量假定正在观察整数。第三个按钮将整数的值减小 1。 这是我的密码 和班级 当我尝试使用 订阅时,它只是给了我 的值(即 6),然后它给了我完成! 然后我尝试使用,认为我需要使用,只是而不是,但后来我得到了一个返回的空的,然后再次完成! 有人能帮助我从
我正在开发利用RxJava、realm和改进的应用程序。 我需要创建非常具体的数据处理链。我需要在io调度程序上执行改装调用,然后在我的自定义单线程领域调度程序上处理提供的数据,然后将结果推送到主线程调度程序上的ui。我试图通过使用多个组合来实现这一点,包括观察(observeOn)和订阅(subscribeOn),但我无法让中间部分在调度程序(scheduler)上执行。 我的目标是这样的
我正在尝试创建一个RxJava BlockingObservable,它将每隔X毫秒发出一个变量的值,直到(条件==true)或超时发生。 下面的代码似乎与我想要的很接近,但它总是发出一次,然后退出。奇怪的是,我在中有一个永远不会正确的条件——我希望这个可观察到的持续发出并最终超时,但事实并非如此。 我错过了什么/做错了什么?
在android 6.0.1 Samsung s6 Edge+上的测试 当device screen脱机并从debug中拔出时,可观察到的只是停止发射项目。如果设备打开,则开始发射对象。另一个问题是,在停止接收项目之前,我会按照相同项目的顺序随机地得到2/3个重复调用 ____________________________edit_________________________________
我有以下资料: > AJava类:ClassA实现观察者 Java接口:Inter(扩展可观察,不可能) Java类:ClassB实现可观察的内部扩展 Java类:ClassC实现可观察的内部扩展 现在ClassA的代码有点像这样。 现在,如果一个特定的事件发生在ClassB或ClassC中,我希望ClassA知道它。我想过使用观察者/可观察的,但问题是接口不能扩展可观察的。 如果有人理解这个问题
我面临的问题是我需要一个身份验证令牌来创建我的Retrofit服务。我目前使用可观察来获取所述令牌,导致一个相当丑陋的可观察构造: 我忍不住觉得这不是应该做的。我说得对吗?