当前位置: 首页 > 知识库问答 >
问题:

使用多个订阅服务器的RxJava2轮询

商茂勋
2023-03-14

>

  • 糟糕的图案设计
  • 仅在连接订阅服务器时才启动,如果没有可用的订阅服务器时停止
  • 不成功共享数据,需要两个类(主题类和重复可观察类)

    public class SingleTonClass { 
    private PublishSubject<List<Data>> subject = PublishSubject.create();
    
    
    public PublishSubject getSubject() {
        return this.subject;
    }
    
    public void setData(List<Data> data) { 
       subject.onNext(data);
    }
    }
    

    我希望避免监听器/接口来共享周围的信息,让rxjava2来完成它的工作。

    经过研究,我发现有refcount()和share(),但我不确定这是否是解决这个问题的正确方法。在我的例子中,这是一个REST服务,它轮询服务器,如果至少有一个订阅服务器连接到其他地方,它应该如何停止轮询,因为在这种情况下获取数据是没有意义的。

    我试图一次解决它,但它不起作用,除非:

    使用RXJava2/rxandroid2和改型进行轮询

  • 共有1个答案

    公孙黎昕
    2023-03-14

    我会这样做:

    Observable<Data> dataSource = Observable.interval(INTERVAL, TIME_UNIT)
        .observeOn(Schedulers.io()) // make REST requests on IO threads
        .map(n -> {
                return requestData();
            })
        .replay(1);
    

    replay()运算符包括share(),后者又包括publish()refcount()函数。这使得您可以观察到的热,即所有订阅者共享单一订阅。它自动向第一个订阅者订阅(启动新的interval序列),并在最后一个订阅者消失时取消订阅(停止interval)。

    replay(1)还缓存上次发出的值,即新订阅服务器不必等待新数据到达。

     类似资料:
    • 我正在使用mosquitto(http://mosquitto.org/)作为MQTT代理,并寻求关于负载平衡订阅服务器的建议(针对相同的主题)。这是如何实现的?我所读到的关于该协议的所有内容都表明,相同主题的所有订阅者都将获得一条发布消息。 这似乎效率很低,因此我正在寻找一种方法,将发布的消息以循环方式提供给连接的订阅服务器之一,以确保负载平衡状态。

    • 我成功创建了publisher,但使用以下方法创建订阅服务器失败: 得到以下错误:从线程[system-akka.zeromq.socket-dispatcher-7]关闭JVM的未捕获错误,因为在Akka.zeromq.concurrentSocketActor$$AnonFun$10处为ActorSystem[System]java.lang.NosuchMethoderror:org.zer

    • 我有一个使用ActiveMQ的JMS生产者/订阅者的简单Spring应用程序,配置如下: 我试过所有可能的解决办法,但没有一个奏效。我们非常感谢任何帮助

    • 我有一个类处理一个图像,这可能是一个缓慢的过程。当工作完成时,该类包含图像的一些特性,如主色。 我有许多其他的代码想知道主颜色,当他们要求它时,它可能是或可能没有准备好。 我还没有找到使用RXJava2实现这一点的简单方法。有人能帮帮我吗? null ReplaySubject似乎有一些我正在寻找的属性,但我不确定如何正确地实现它。

    • 我正在尝试使用spring-integration-kafka-2.1.0。在我公司的项目中发布。但是,由于下面列出的例外情况,它不起作用:org。springframework。信息。MessageDeliveryException:Dispatcher没有频道“org”的订户。springframework。网状物上下文WebApplicationContext:/order。“奥Kafka”

    • 我正在尝试实现一个RXJava2