当前位置: 首页 > 知识库问答 >
问题:

RxJava:在每个订阅者处理资源后关闭它

堵彬彬
2023-03-14

我是RxJava的新手,我正在努力弄清楚如何正确关闭资源,尤其是在处理多个订阅者时。

我有一个<代码>可观察

我可能在observable上有多个订户。我想在每个订阅者处理完资源后,close()resource。换句话说,在新资源被交付/发出后关闭旧资源,并且在最后一个订户取消订阅时也最终关闭最后一个资源。

我尝试使用一个自定义操作符让它工作,我称之为< code>AutoCloseOperator,它几乎可以工作了,但是不太正确。也就是说,我仍然处于竞争状态和/或泄漏,例如,资源没有关闭。

在RxJava中执行此操作的正确方法是什么?

假设我得到了这个代码:

final AutoCloseOperator<MyResource> autoClose = new AutoCloseOperator<MyResource>();
Subject<MyResource, MyResource> subject = PublishSubject.create();
Observable<MyResource> o = subject.lift(autoClose);

Subscription s1 = o.subscribe(new Action1<MyResource>() {
    public void call(MyResource myObj) {
        System.out.println("s1 handling " + myObj);
    }
});

subject.onNext(new MyResource(1));
subject.onNext(new MyResource(2)); // This should close Resource #1 after Resource #2 is delivered

Subscription s2 = o.subscribe(new Action1<MyResource>() {
    public void call(MyResource myObj) {
        System.out.println("s2 handling " + myObj);
    }
});

subject.onNext(new MyResource(3));
subject.onNext(new MyResource(4));

s1.unsubscribe();

subject.onNext(new MyResource(5));
subject.onNext(new MyResource(6));

s2.unsubscribe();

subject.onNext(new MyResource(7));
subject.onNext(new MyResource(8));

然后,我希望出现以下行为:

s1 handling Resource #1
s1 handling Resource #2
Closing Resource #1
s1 handling Resource #3
Closing Resource #2
s2 handling Resource #3
s1 handling Resource #4
s2 handling Resource #4
Closing Resource #3
s2 handling Resource #5
Closing Resource #4
s2 handling Resource #6
Closing Resource #5
Closing Resource #6
Closing Resource #7
Closing Resource #8

注意:在我的真实代码中,我没有使用<code>PublishSubject,每次更新数据库表时都会发出一个<code>光标。。。

为了概括问题:我可以使用 doOnNextdoOnUnsubition 来关闭旧项目,但这并没有考虑到这些事件将发生多次(对于每个订阅者),我只想在所有订阅者都收到新项目时关闭资源。

使用<code>lift()

我已经将我的问题简化为GitHub上的一个小型命令行应用程序。感谢您的关注!

共有1个答案

方宏才
2023-03-14

是可观察的。使用()是您所需要的。

如果你有T类型的t,它有一个.close()方法,并且你想从t(你的光标)中提取一些东西,说可观察

Func0<T> resourceFactory = () -> t;
Func1<T, Observable<R>> observableFactory = x -> ...
Action1<T> disposeAction = x -> x.close();

Observable<R> results = Observable.using(resourceFactory, observableFactory, disposeAction);

您提到您有可观察性

Observable<T> source = ...
Observable<R> results = 
    source.flatMap(t -> {
        Func0<T> resourceFactory = () -> t;
        Func1<T, Observable<R>> observableFactory = x -> ...
        Action1<T> disposeAction = x -> x.close();
        return Observable.using(resourceFactory, observableFactory, disposeAction);});

 类似资料:
  • 调用.unsubscribe()将取消一个成员的回调监听Observable流。 当创建Observable时,您还可以返回自定义回调onUnsubscribe,当收听流的成员取消订阅时将调用该回调。 这对于必须实现的任何类型的清理都很有用。 如果我们没有清除setTimeout,那么值仍然会发射,但是没有人听。 为了节省资源,我们应该停止发射值。 一个重要的事情要注意的是,当您调用.unsubs

  • 问题内容: 我正在使用RxJava计算Android中某些传感器数据的标准化自动相关性。奇怪的是,我的代码引发了一个异常(“ java.lang.IllegalStateException:只允许一个订阅者!”),我不确定该怎么做:我知道GroupedObservables订阅多个订阅者时可能会抛出此异常,但是我不认为我在任何地方都在使用这种东西。 在下面,您找到(最有可能)触发异常的方法: 这是

  • 我正在尝试了解RxJava并发的一些细节,但我不确定我的想法是否正确。我对SubscribeOn/观察的工作原理有很好的了解,但我正在尝试确定池调度程序的一些细节。为此,我正在考虑尽可能简单地实现一个1-N生产者-消费者链,其中消费者的数量与CPU一样多。 根据文档,Schedulers.computation()由与内核一样多的线程池支持。但是,根据Reactive合约,运算符只能获得顺序调用。

  • 我正在使用来使用来自spring-boot应用程序中某个主题的消息,我需要定期运行该应用程序。spring-kafka版本是2.2.4.发行版。

  • 我正在寻找一种将多个订阅者附加到RxJava可观察流的方法,每个订阅者异步处理发出的事件。 我第一次尝试使用。flatMap(),但这似乎对任何后续订阅服务器都不起作用。所有订阅服务器都在同一线程上处理事件。 最终工作的是通过每次创建一个新的可观察的来消耗新线程中的每个事件: 输出: 以及多个订阅者的最终结果: 输出: 然而,这似乎有点笨拙。有没有更优雅的解决方案,或者RxJava不是一个很好的用

  • 我尝试使用Vertx HttpClient/WebClient来使用GraphQL订阅,但没有按预期工作。 与服务器端相关的代码(使用Vertx Web GraphQL编写)如下所示,添加注释后,触发onNext将注释发送到发布者。 在客户端中,我混合使用HttpClient/WebClient,大多数时候,我想使用WebClient,它更容易处理表单帖子。但它似乎没有连接。 所以websocke