我们发现,在调用主题上的onNext事件后10-20 ms内订阅序列化PublishSubject时;未调用新订阅者的onNext。
在下面的代码片段中;要观察的值[1]被给出为“2000”,并且在使用值为1998[2]的主题上调用onNext()后调用订阅对象();我们看到,如果间隔为10ms,新订阅者将错过主题触发的值2000;然而,如果间隔为50ms或更大,那么新订阅者似乎会收到预期值;这是预期行为吗?
这种行为可以在RxJava 2.1.0上看到;似乎是某种种族状况
public class PublishSubjectTest {
private final Subject<String> singlePropertyUpdateSubject =
PublishSubject.<String>create().toSerialized();
public static void main(String[] args) {
PublishSubjectTest obj= new PublishSubjectTest();
obj.sendEvents();
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//[1]
private final String valueToObserve = "2000";
private void subscribeToSubject() {
System.out.println("Subscribing .....");
io.reactivex.Observable.range(1,10).subscribeOn(Schedulers.newThread()).subscribe(
value -> getAndObserve(valueToObserve).subscribe(observedValue -> System.out.println(" Value Received "+observedValue +" By "+Thread.currentThread() ))
);
}
private io.reactivex.Observable<String> getAndObserve(String value) {
final io.reactivex.Observable<String> observable = singlePropertyUpdateSubject
//.doOnNext(v-> System.out.println("Received value "+v))
.filter(v -> v.equals(value))
.doOnSubscribe(c-> System.out.println("Consumer subscribed "+c));
return observable;
}
// 50ms >= expected result ; Anything less than 10ms will fail.
private void sendEvents() {
io.reactivex.Observable.interval(10, TimeUnit.MILLISECONDS).subscribe(value -> {
String key = value.toString();
//System.out.println("Adding key "+key);
singlePropertyUpdateSubject.onNext(key);
//[2]
if (value == 1998){
subscribeToSubject();;
}
if (value%100==0) {
System.out.println(value);
}
});
}
好的,问题是PublishSubject;使用ReplaySubject似乎可以解决并发订阅的问题;测试代码调用系统也存在问题。使用ReplaySubject时退出(1);github提供了详细的讨论。com/ReactiveX/RxJava/issues/6414–
请将此问题视为已解决。
我一直在试验项目反应器和反应流。我在使用使流在不同的线程上运行时遇到了一个问题。将我的代码放在主线程中,我需要主线程块,直到流完成,所以我做了这样的事情: 然后我注意到有一个方法执行阻塞。但是我不能同时使用订阅和块最后,因为它们不返回。 有什么优雅的方法可以做到这一点吗?
这只是为了澄清发布/订阅线程。 我的疑问是在正常的发布者/订阅者模式中,订阅者和发布者是在同一个线程上运行还是在不同的线程中运行? 还是取决于实现? 到目前为止,我所想的是不同的订阅会有自己的线程,而publisher在其上运行的是自己的线程?
问题内容: 我有一个使用Redis发布/订阅在Java中使用Jedis客户端在客户端之间传输消息的应用程序。我希望能够在用户键入命令时在运行时订阅频道,但是由于订阅是一个阻塞操作,因为它在调用订阅的线程上进行侦听,所以我不确定以后如何订阅其他频道在原始线程上。 例: 这将起作用,除了调度命令的线程将用于轮询Redis,而我将无法使用该线程订阅更多的频道。 问题答案: 我观察到了同样的问题,即订阅后
问题内容: 我一直在尝试可以在网上找到的所有内容,但没有任何效果。希望大家能看到新的问题。这是我第一次使用ActionCable,在本地一切正常,但是当推送到heroku时。我的日志没有像我的开发服务器那样显示任何可操作的订阅: 在发送消息时,我确实看到了,但没有将它们追加,这是在猜测是否意味着未访问/调用该方法? 我确实在heroku的日志中注意到它说,开发人员正在localhost:3000监
例: 注意:是非Android 运行环境, 使用的是RxJava2.x
我在spring中有一个服务,它需要使用十种不同的方法获取数据。 我希望这些方法并行执行,以执行一些DB操作并返回到父线程。但是父线程应该等到所有响应出现,然后返回响应。 在我当前的方法中,我使用反应式mono异步执行所有方法,但主线程不等待订阅者方法完成。 下面是我订阅的两种方法 下面是我的主要方法 以下是我的输出: 我的输出显示,主线程没有等待订阅服务器完成其任务,因此我如何处理这种情况?