我仍在尝试将RXJava2用于多个订阅者使用的轮询服务。它工作得很好,除了它总是调用两次。
我尝试使用publish(1)、take(1)、share()、refCount()等,但结果总是一样。叫两次或多次。
我还是不明白为什么它叫了两次。新订阅服务器应该接收最新发出的值,并且只在更改后的值(如果hashmap更改了)返回为modified List。
我的民意调查可观察到(单例)
public Observable<List<Light>> lightPolling = CallFactory
.with(context)
.getLights(MainApplication.lastaccespoint)
.repeatWhen(o -> o.concatMap((Function<Object, ObservableSource<?>>) v -> Observable.timer(1000, TimeUnit.MILLISECONDS)))
.filter(new MapPredicate<>())
.distinctUntilChanged()
.map(l -> {
List<Light> lights = new ArrayList<>();
for (Map.Entry<String, Light> entry : l.entrySet()) {
Light light = entry.getValue();
light.setId(entry.getKey());
lights.add(light);
}
return lights;
})
// .replay(1)
// .distinct()
//.publish()
//.share()
.compose(ReplayingShare.instance()); // its like .share(), just including the last result etc.
我的谓词用于筛选hashmap是否真的发生了更改。Stringutil.EqualMap只是一个比较两个HashMap的简单方法,效果很好。
class MapPredicate<T> implements Predicate<Map<String, T>> {
private Map<String, T> lastMap;
@Override
public boolean test(Map<String, T> map) throws Exception {
if (!StringHelper.equalMap(map, lastMap)) {
Timber.d("Result in MapPredicate doesnt equals last result");
lastMap = map;
return false;
}
return true;
}
}
最后,我的一次性订阅上了可观察的。
lightsDisposable = LightManager
.getInstance(context)
.lightPolling
.subscribeWith(new BulbsObserver());
还有一些测试用的
LightManager.getInstance(context).lightPolling.debounce(10, TimeUnit.SECONDS).subscribe(lights -> Timber.d("Received light second Subscriber "+ lights.size()));
LightManager.getInstance(context).lightPolling.debounce(15, TimeUnit.SECONDS).subscribe(lights -> Timber.d("Received light Third Subscriber "+ lights.size()));
private class BulbsObserver extends DisposableObserver<List<Light>> {
@Override
public void onNext(List<Light> newLights) {
Timber.d("Received lights=" + newLights.size());
lights.clear();
lights.addAll(newLights);
adapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable throwable) {
Timber.e(throwable);
}
@Override
public void onComplete() {
Timber.d("onComplete called");
}
}
重播共享来自杰克·沃顿https://github.com/jakewharton/rxreplayingshare
refcount()
在第一个订阅者到来时订阅(调用订阅函数),并在最后一个订阅者离开后取消订阅。如果您的订阅者按顺序来(即它们之间的引用计数下降到零),您将经历对原始源的多次订阅/取消订阅。
如果希望只订阅一次并且从不取消订阅,请使用autoconnect()
而不是refcount()
。例如:
hotObservable = coldObservable.replay(1).autoConnect();
假设我有一个间隔,我给了它一个计算调度器。这样地: 那么,平面图{...}中发生的一切是否也会被调度在计算线程上? 在Observable.interval的源代码中,它说: 作为RxJava的初学者,我很难理解这个评论。我知道间隔计时器/等待逻辑发生在计算线程上。但是,关于要发出的项目的最后一部分是否也意味着发出的项目将在同一个线程上使用?还是需要观察?这样地: 如果我想在计算线程上处理emit
问题内容: 我有两个完成。我想做以下情形:如果第一个Completable到达onComplete,则继续第二个Completable。最终结果将是第二完成的onComplete。 当我有单个 getUserIdAlreadySavedInDevice() 和Completable login() 时,这就是我的方法: 问题答案: 您正在寻找运营商。 返回一个Completable,它首先运行此C
我使用REST API查询Person对象列表。最大限制是100人响应。我需要把所有的人都叫来,总数不详。第一个响应中有一个名为“next”的字段,包含下一页的url。我需要使用rxjava/rxandroid和referfit链接这些调用,直到最后一个响应有一个空的“next”字段。由于“Next”字段包含分页url,所有后续调用都将具有与第一个不同的url。做这件事最方便的方法是什么?
我觉得有一个简单的解决办法,但由于某些原因,我不能把我的头围绕它。 我有一个情况,一个按钮必须先点击,才能点击另一个。用户为这2个JButtons选择的选项将决定程序的下一步。这意味着我必须调用actionListener两次,对吗?我如何在一个actionPerformed方法中做到这一点? 如果actionPerformed方法中的(e.getSource()==Square[1][4]&&e
Learning RxJava 2 for Android by example Take the MindOrks Android Online Course and Learn RxJava How to use RxJava 2 in Android Application How to migrate from RxJava 1.0 to RxJava 2.0 How to use RxJ
问题内容: 我在Go中关注一个简单的Web服务器示例。 我插入了一条语句,使生成的代码如下所示: 问题是,每当我在Web浏览器中加载端口8000时,此函数就会被调用两次。这是一个问题,因为我打算在每次页面访问时增加一个计数器。通过这种行为,计数器将增加两次。OTOH,如果我这样做,它只会被调用一次。 我觉得我在这里失踪真的很愚蠢。 问题答案: 只需记录请求。您将意识到您的浏览器还请求/favico