当前位置: 首页 > 知识库问答 >
问题:

只有在订阅时才会发出的可观察的

亢建白
2023-03-14
observable = Observable.defer(new Callable<ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> call() throws Exception {
            final AtomicInteger counter = new AtomicInteger();

            return Observable.create(new ObservableOnSubscribe<Long>() {

                @Override
                public void subscribe(ObservableEmitter<Long> e) throws Exception {
                    emitter = e;
                }
            }).doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    counter.incrementAndGet();
                    startEmitting(emitter);
                }
            }).doOnDispose(new Action() {
                @Override
                public void run() throws Exception {
                    if (counter.decrementAndGet() == 0) {
                        stopEmitting(emitter);
                    }
                }
            });
        }
    });

有谁能推荐一个更好的解决方案吗?

共有1个答案

龚迪
2023-03-14

我对rx-java2了解不多,但我有一些关于可观察模式的建议。

您可以创建一个名为ObservableEmitter的对象。在这个类中,您可以创建一个方法,比如这个subscribe(Subscriber Subscriber)和一个方法emit()。

然后由您的订户实现订户接口。我将调用方法receive(Message Message)。

public interface Subscriber {
  void receive(Message msg);
}

public class ObservableEmitter {
 private List<Subscriber> subscribers = new ArrayList<Subscriber>();
 public subscribe(Subscriber sub) {
  subscribers.add(sub);
 }
 public void emit(Message msg) {
   for(Subscriber sub : subscribers) {
     sub.receive(msg);
   }
 }
}
 类似资料:
  • 当我以匿名类型创建新的观察者时,它可以正常工作: 当我将观察者创建为动态类型时,它不会发出数据 第一个代码段的Logcat: com.tripleService.basesetupfordi/I/ZOKa:on订阅:com.tripleService.basesetupfordi/I/ZOKa:on下一篇:100:com.tripleService.basesetupfordi/I/ZOKa:on

  • 我必须从两个订阅服务器获取数据,但我总是获取第一个订阅服务器的数据。 我有一个数据共享服务: 在离开搜索组件之前,我调用update方法。 现在,我在results组件上。我得到的共享数据如下: 我的问题是:我需要共享数据来订阅另一个可观察的数据。首先,我构造了一个物体乘坐,在我调用搜索方法之后 问题是我总是从数据服务获取数据,而不是从api调用。api工作导致我在存储中拦截结果,而不是在组件中。

  • < li >哪些可观察的方法“订阅”结果?例如,flatMap订阅第一个可观察对象的结果,并在不调用subscribe方法的情况下传递结果。 < li >是否有办法确定可观察的方法(flatMap、mergeMap、forkJoin、concat、subscribe...)返回编译时间,是订阅还是可观察?我经常困惑这些方法的结果是什么。 < Li > rxjs方法是否有这种类型的分类? < li

  • 本文向大家介绍system.reactive 订阅/取消订阅可观察对象(IDisposable),包括了system.reactive 订阅/取消订阅可观察对象(IDisposable)的使用技巧和注意事项,需要的朋友参考一下 示例 订阅返回IDisposable: 当您准备取消订阅时,只需处置订阅即可:            

  • 我试图理解可观察对象是如何执行的,但似乎无法让这个简单的代码正常工作。 不应该是你好。订阅()执行?

  • 在ngOnDestory中,我取消了两个订阅,但仍然得到前面的错误。 现在我几乎可以肯定问题出在这行:即使我在注销之前取消了proposalSubscription和chatSubscription的订阅,但仍然会出现错误。有没有解决这个问题的方法?而且,我对RXJ和操作符没有太多的经验。有没有操作符可以用来避免这种嵌套订阅? 提前道谢。