observable = Observable.defer(new Callable<ObservableSource<Long>>() {
@Override
public ObservableSource<Long> call() throws Exception {
final AtomicInteger counter = new AtomicInteger();
return Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> e) throws Exception {
emitter = e;
}
}).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
counter.incrementAndGet();
startEmitting(emitter);
}
}).doOnDispose(new Action() {
@Override
public void run() throws Exception {
if (counter.decrementAndGet() == 0) {
stopEmitting(emitter);
}
}
});
}
});
有谁能推荐一个更好的解决方案吗?
我对rx-java2了解不多,但我有一些关于可观察模式的建议。
您可以创建一个名为ObservableEmitter的对象。在这个类中,您可以创建一个方法,比如这个subscribe(Subscriber Subscriber)和一个方法emit()。
然后由您的订户实现订户接口。我将调用方法receive(Message Message)。
public interface Subscriber {
void receive(Message msg);
}
public class ObservableEmitter {
private List<Subscriber> subscribers = new ArrayList<Subscriber>();
public subscribe(Subscriber sub) {
subscribers.add(sub);
}
public void emit(Message msg) {
for(Subscriber sub : subscribers) {
sub.receive(msg);
}
}
}
当我以匿名类型创建新的观察者时,它可以正常工作: 当我将观察者创建为动态类型时,它不会发出数据 第一个代码段的Logcat: com.tripleService.basesetupfordi/I/ZOKa:on订阅:com.tripleService.basesetupfordi/I/ZOKa:on下一篇:100:com.tripleService.basesetupfordi/I/ZOKa:on
我必须从两个订阅服务器获取数据,但我总是获取第一个订阅服务器的数据。 我有一个数据共享服务: 在离开搜索组件之前,我调用update方法。 现在,我在results组件上。我得到的共享数据如下: 我的问题是:我需要共享数据来订阅另一个可观察的数据。首先,我构造了一个物体乘坐,在我调用搜索方法之后 问题是我总是从数据服务获取数据,而不是从api调用。api工作导致我在存储中拦截结果,而不是在组件中。
< li >哪些可观察的方法“订阅”结果?例如,flatMap订阅第一个可观察对象的结果,并在不调用subscribe方法的情况下传递结果。 < li >是否有办法确定可观察的方法(flatMap、mergeMap、forkJoin、concat、subscribe...)返回编译时间,是订阅还是可观察?我经常困惑这些方法的结果是什么。 < Li > rxjs方法是否有这种类型的分类? < li
本文向大家介绍system.reactive 订阅/取消订阅可观察对象(IDisposable),包括了system.reactive 订阅/取消订阅可观察对象(IDisposable)的使用技巧和注意事项,需要的朋友参考一下 示例 订阅返回IDisposable: 当您准备取消订阅时,只需处置订阅即可:
我试图理解可观察对象是如何执行的,但似乎无法让这个简单的代码正常工作。 不应该是你好。订阅()执行?
在ngOnDestory中,我取消了两个订阅,但仍然得到前面的错误。 现在我几乎可以肯定问题出在这行:即使我在注销之前取消了proposalSubscription和chatSubscription的订阅,但仍然会出现错误。有没有解决这个问题的方法?而且,我对RXJ和操作符没有太多的经验。有没有操作符可以用来避免这种嵌套订阅? 提前道谢。