当前位置: 首页 > 知识库问答 >
问题:

RxJava2.1.0:在不同线程上订阅时未调用PublishSubject onNext

高迪
2023-03-14

我们发现,在调用主题上的onNext事件后10-20 ms内订阅序列化PublishSubject时;未调用新订阅者的onNext。

在下面的代码片段中;要观察的值[1]被给出为“2000”,并且在使用值为1998[2]的主题上调用onNext()后调用订阅对象();我们看到,如果间隔为10ms,新订阅者将错过主题触发的值2000;然而,如果间隔为50ms或更大,那么新订阅者似乎会收到预期值;这是预期行为吗?

这种行为可以在RxJava 2.1.0上看到;似乎是某种种族状况

public class PublishSubjectTest {

    private final Subject<String> singlePropertyUpdateSubject =
            PublishSubject.<String>create().toSerialized();


    public static void main(String[] args) {
        PublishSubjectTest obj= new PublishSubjectTest();
        obj.sendEvents();
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }


//[1]
    private final String valueToObserve = "2000";
    private void subscribeToSubject() {
        System.out.println("Subscribing .....");
        io.reactivex.Observable.range(1,10).subscribeOn(Schedulers.newThread()).subscribe(
                value -> getAndObserve(valueToObserve).subscribe(observedValue -> System.out.println("  Value Received   "+observedValue +" By "+Thread.currentThread() ))
        );



    }

    private io.reactivex.Observable<String> getAndObserve(String value) {
        final io.reactivex.Observable<String> observable = singlePropertyUpdateSubject
                //.doOnNext(v-> System.out.println("Received value "+v))
                .filter(v -> v.equals(value))
                .doOnSubscribe(c-> System.out.println("Consumer subscribed "+c));
        return observable;
    }


// 50ms >= expected result ;  Anything less than 10ms will fail.
    private void sendEvents() {
        io.reactivex.Observable.interval(10, TimeUnit.MILLISECONDS).subscribe(value -> {
            String key = value.toString();
            //System.out.println("Adding key "+key);
            singlePropertyUpdateSubject.onNext(key);
//[2]           
 if (value == 1998){
                subscribeToSubject();;
            }
            if (value%100==0) {
                System.out.println(value);
            }

        });
    }

共有1个答案

杜英范
2023-03-14

好的,问题是PublishSubject;使用ReplaySubject似乎可以解决并发订阅的问题;测试代码调用系统也存在问题。使用ReplaySubject时退出(1);github提供了详细的讨论。com/ReactiveX/RxJava/issues/6414–

请将此问题视为已解决。

 类似资料:
  • 我一直在试验项目反应器和反应流。我在使用使流在不同的线程上运行时遇到了一个问题。将我的代码放在主线程中,我需要主线程块,直到流完成,所以我做了这样的事情: 然后我注意到有一个方法执行阻塞。但是我不能同时使用订阅和块最后,因为它们不返回。 有什么优雅的方法可以做到这一点吗?

  • 这只是为了澄清发布/订阅线程。 我的疑问是在正常的发布者/订阅者模式中,订阅者和发布者是在同一个线程上运行还是在不同的线程中运行? 还是取决于实现? 到目前为止,我所想的是不同的订阅会有自己的线程,而publisher在其上运行的是自己的线程?

  • 问题内容: 我有一个使用Redis发布/订阅在Java中使用Jedis客户端在客户端之间传输消息的应用程序。我希望能够在用户键入命令时在运行时订阅频道,但是由于订阅是一个阻塞操作,因为它在调用订阅的线程上进行侦听,所以我不确定以后如何订阅其他频道在原始线程上。 例: 这将起作用,除了调度命令的线程将用于轮询Redis,而我将无法使用该线程订阅更多的频道。 问题答案: 我观察到了同样的问题,即订阅后

  • 问题内容: 我一直在尝试可以在网上找到的所有内容,但没有任何效果。希望大家能看到新的问题。这是我第一次使用ActionCable,在本地一切正常,但是当推送到heroku时。我的日志没有像我的开发服务器那样显示任何可操作的订阅: 在发送消息时,我确实看到了,但没有将它们追加,这是在猜测是否意味着未访问/调用该方法? 我确实在heroku的日志中注意到它说,开发人员正在localhost:3000监

  • 例: 注意:是非Android 运行环境, 使用的是RxJava2.x

  • 我在spring中有一个服务,它需要使用十种不同的方法获取数据。 我希望这些方法并行执行,以执行一些DB操作并返回到父线程。但是父线程应该等到所有响应出现,然后返回响应。 在我当前的方法中,我使用反应式mono异步执行所有方法,但主线程不等待订阅者方法完成。 下面是我订阅的两种方法 下面是我的主要方法 以下是我的输出: 我的输出显示,主线程没有等待订阅服务器完成其任务,因此我如何处理这种情况?