当前位置: 首页 > 知识库问答 >
问题:

RxJava2,2个可观察/可流动的订户,但下一个会在任何一个上被调用

杜英叡
2023-03-14

rxjava2版本2.1.5

试图了解RxJava2的多个可观察订阅。有一个简单的文件监视服务,跟踪创建,修改,删除文件在一个目录。我添加了2个订阅服务器,并期望在两个订阅服务器上打印事件。当我将一个文件复制到监视目录中时,我看到一个订阅者打印出事件。然后,当我删除文件时,我看到第二个订阅服务器打印出事件。我希望两个订阅者都能打印事件。我在这里漏掉了什么?

import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;

import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.concurrent.TimeUnit;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

public class MyRxJava2DirWatcher {

    public Flowable<WatchEvent<?>> createFlowable(WatchService watcher, Path path) {

        return Flowable.create(subscriber -> {

            boolean error = false;
            WatchKey key;
            try {

                key = path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
            }
            catch (IOException e) {
                subscriber.onError(e);
                error = true;
            }

            while (!error) {
                key = watcher.take();

                for (final WatchEvent<?> event : key.pollEvents()) {
                    subscriber.onNext(event);
                }

                key.reset();
            }

        }, BackpressureStrategy.BUFFER);

    }

    public static void main(String[] args) throws IOException, InterruptedException {
        Path path = Paths.get("c:\\temp\\delete");
        final FileSystem fileSystem = path.getFileSystem();
        WatchService watcher = fileSystem.newWatchService();

        MyRxJava2DirWatcher my = new MyRxJava2DirWatcher();
        my.createFlowable(watcher, path).subscribeOn(Schedulers.computation()).subscribe(event -> {
            System.out.println("1>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
                    + Thread.currentThread().getName());

        }, onError -> {
            System.out.println("1>>" + Thread.currentThread().getName());
            onError.printStackTrace();
        });

        // MyRxJava2DirWatcher my2 = new MyRxJava2DirWatcher();

        my.createFlowable(watcher, path).subscribeOn(Schedulers.computation()).subscribe(event -> {
            System.out.println("2>>Event kind:" + event.kind() + ". File affected: " + event.context() + ". "
                    + Thread.currentThread().getName());

        }, onError -> {
            System.out.println("2>>" + Thread.currentThread().getName());
            onError.printStackTrace();
        });

        TimeUnit.MINUTES.sleep(1000);

    }
}

输出如下所示

2>>Event kind:ENTRY_CREATE. File affected: 1.txt. RxCachedThreadScheduler-2
2>>Event kind:ENTRY_MODIFY. File affected: 1.txt. RxCachedThreadScheduler-2
1>>Event kind:ENTRY_DELETE. File affected: 1.txt. RxCachedThreadScheduler-1

共有1个答案

元鸿波
2023-03-14

发生的情况是,您在两个flowable之间共享相同的WatchService,并且它们在其中竞争事件。如果传入文件系统并在flowable.create中调用newWatchService(),则接收所有事件的次数应与订阅服务器的次数一样:

public Flowable<WatchEvent<?>> createFlowable(FileSystem fs, Path path) {

    return Flowable.create(subscriber -> {

        WatchService watcher = fs.newWatchService();

        subscriber.setCancellable(() -> watcher.close());

        boolean error = false;
        WatchKey key;
        try {

            key = path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
        }
        catch (IOException e) {
            subscriber.onError(e);
            error = true;
        }

        while (!error) {
            key = watcher.take();

            for (final WatchEvent<?> event : key.pollEvents()) {
                subscriber.onNext(event);
            }

            key.reset();
        }

    }, BackpressureStrategy.BUFFER);

}

还要注意,您应该使用subscribeon(schedules.computation(),false)来避免与订阅服务器轮询死锁。

 类似资料:
  • 我有这个问题,我一直在寻找,但找不到解决方案(或者也许我不能根据其他答案做出解决方案)。 我的问题是,我需要找到一种方法来等待可观察的(有自己的订户)并等待另一个可观察的(有自己的订户)完成。 场景是这样的: 奥布1- 奥布斯2 - 我主要担心的是我需要两个订阅者。在我看来,obs1 和 obs2 并行运行,但需要检查 obs1 是否以新的会话令牌完成。也许这不是RxJava的主要目的。 Obs1

  • 在一个服务中,我有两个API调用,每个调用都返回一个可观察的,在我的组件中,我有一些条件,如果为true,我必须调用这两个函数,但我需要等待get()调用,这样我就可以使用get调用返回的参数执行post函数。如果为false,我只想用已经定义的参数调用post函数。 服务: 组成部分: 我不想重复帖子调用的代码,或者如果不可能的话,只是不要在另一个订阅()中使用订阅()。我怎么能这么做?没有异步

  • 函数应该从get请求返回一个 在这种情况下,我只能在为真时执行请求,否则我在函数

  • 我有一个组件订阅服务中的一个可观察对象。该方法反过来订阅另一个服务中的可观察对象。我想将一个数组从最后一个服务传递回第一个服务,然后第一个服务将该数组传递回组件。更具体地说,该组件调用其本地服务,然后调用一个数据服务,该数据服务通过http客户端访问我的数据库。http客户端正在工作,数据服务将数组返回给本地服务。本地服务接收数组,但我不知道如何将该数组作为可观察对象传递回组件。以下是简短的代码块

  • 我是这样理解的,从可观察的角度来看: > 有人订阅了我,我应该开始发送项目 [订阅者:1][要发送的项目:1,2,3] 向订阅服务器发送项“1” [订阅服务器:1][要发送的项:2,3] ... 但它不是这样运作的。就像它们是两个独立的可观测物在一个。这让我很困惑,为什么他们不把项目给所有的订户? 奖金: 谢了!

  • 我有一个id列表,并且有一个方法,它接受id并返回可观察的 假设我有用户id(1),我需要使用此方法更新他的配置文件