>
不成功共享数据,需要两个类(主题类和重复可观察类)
public class SingleTonClass {
private PublishSubject<List<Data>> subject = PublishSubject.create();
public PublishSubject getSubject() {
return this.subject;
}
public void setData(List<Data> data) {
subject.onNext(data);
}
}
我希望避免监听器/接口来共享周围的信息,让rxjava2来完成它的工作。
经过研究,我发现有refcount()和share(),但我不确定这是否是解决这个问题的正确方法。在我的例子中,这是一个REST服务,它轮询服务器,如果至少有一个订阅服务器连接到其他地方,它应该如何停止轮询,因为在这种情况下获取数据是没有意义的。
我试图一次解决它,但它不起作用,除非:
使用RXJava2/rxandroid2和改型进行轮询
我会这样做:
Observable<Data> dataSource = Observable.interval(INTERVAL, TIME_UNIT)
.observeOn(Schedulers.io()) // make REST requests on IO threads
.map(n -> {
return requestData();
})
.replay(1);
replay()
运算符包括share()
,后者又包括publish()
和refcount()
函数。这使得您可以观察到的热,即所有订阅者共享单一订阅。它自动向第一个订阅者订阅(启动新的interval
序列),并在最后一个订阅者消失时取消订阅(停止interval
)。
replay(1)
还缓存上次发出的值,即新订阅服务器不必等待新数据到达。
我正在使用mosquitto(http://mosquitto.org/)作为MQTT代理,并寻求关于负载平衡订阅服务器的建议(针对相同的主题)。这是如何实现的?我所读到的关于该协议的所有内容都表明,相同主题的所有订阅者都将获得一条发布消息。 这似乎效率很低,因此我正在寻找一种方法,将发布的消息以循环方式提供给连接的订阅服务器之一,以确保负载平衡状态。
我成功创建了publisher,但使用以下方法创建订阅服务器失败: 得到以下错误:从线程[system-akka.zeromq.socket-dispatcher-7]关闭JVM的未捕获错误,因为在Akka.zeromq.concurrentSocketActor$$AnonFun$10处为ActorSystem[System]java.lang.NosuchMethoderror:org.zer
我有一个使用ActiveMQ的JMS生产者/订阅者的简单Spring应用程序,配置如下: 我试过所有可能的解决办法,但没有一个奏效。我们非常感谢任何帮助
我有一个类处理一个图像,这可能是一个缓慢的过程。当工作完成时,该类包含图像的一些特性,如主色。 我有许多其他的代码想知道主颜色,当他们要求它时,它可能是或可能没有准备好。 我还没有找到使用RXJava2实现这一点的简单方法。有人能帮帮我吗? null ReplaySubject似乎有一些我正在寻找的属性,但我不确定如何正确地实现它。
我正在尝试使用spring-integration-kafka-2.1.0。在我公司的项目中发布。但是,由于下面列出的例外情况,它不起作用:org。springframework。信息。MessageDeliveryException:Dispatcher没有频道“org”的订户。springframework。网状物上下文WebApplicationContext:/order。“奥Kafka”
我正在尝试实现一个RXJava2