当前位置: 首页 > 知识库问答 >
问题:

Rxjava-订阅前在PublishSubject上发出

公良骁
2023-03-14

考虑一个场景,我们有一个发出字符串的流,我们想将字符串保存在文件中。

我正在使用Publish科目,这很好:

Subject<String> stream = PublishSubject.create();
stream.subscribe(str -> saveFile(str));
mReverseGeocoderStream.onNext("some-string1")
mReverseGeocoderStream.onNext("some-string2")

但是,这不起作用(只有thom-string2被交付)

Subject<String> stream = PublishSubject.create();
mReverseGeocoderStream.onNext("some-string1")
stream.subscribe(str -> saveFile(str));
mReverseGeocoderStream.onNext("some-string2")

有没有办法让第二个场景也能正常工作?

i、 例如,我们是否可以更改PublishSubject以确保它缓冲事件,直到订阅者使用它们?

请注意,BehaviorSubject不是一个选项,因为重新订阅会导致另一个文件保存。它没有“消费事件”的概念。

我找到了Unicastsubject,这几乎是我想要的,除了,当我取消订阅并稍后重新订阅不同的订阅者时,它会在IllegalStateException中失败。

用例:

假设我们有一个android应用程序。它发出网络请求,在网络请求的后面,它需要显示一个对话框。在发出请求时,用户会对应用程序进行背景设置。此时,我们将取消订阅正在侦听信号的观察员,以显示对话框。

网络请求返回并向流发出显示对话框的信号。现在没有人在听。用户将应用前景化。新订阅者将连接到网络请求管理器(ViewModel)。此时,我希望将“未使用”信号传递给订阅者。

注意:我不能使用行为主题。如果我这样做,每次用户背景和前景的应用程序的对话框将显示出来。我希望在显示对话框后使用并完成事件。

共有1个答案

乐城
2023-03-14

做了更多的研究,发现了这个:

https://gist.github.com/xsveda/8c556516079fde97d04b4b7e14a18463

发件人:RxJava中的类队列主题

请注意,它使用中继,但如果不想引入另一个依赖项,则可以轻松地将中继替换为Subject。

我对其他解决方案持开放态度,也许会批评为什么这个解决方案不好。

/**
 * Relay that buffers values when no Observer subscribed and replays them to Observer as requested. Such values are not replayed
 * to any other Observer.
 * <p>
 * This relay holds an unbounded internal buffer.
 * <p>
 * This relay allows only a single Observer at a time to be subscribed to it.
 * <p>
 * If more than one Observer attempts to subscribe to this Relay at the same time, they
 * will receive an IllegalStateException.
 *
 * @param <T> the value type received and emitted by this Relay subclass
 */
public final class CacheRelay<T> extends Relay<T> {

    private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
    private final PublishRelay<T> relay = PublishRelay.create();

    private CacheRelay() {
    }

    public static <T> CacheRelay<T> create() {
        return new CacheRelay<>();
    }

    @Override
    public void accept(T value) {
        if (relay.hasObservers()) {
            relay.accept(value);
        } else {
            queue.add(value);
        }
    }

    @Override
    public boolean hasObservers() {
        return relay.hasObservers();
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (hasObservers()) {
            EmptyDisposable.error(new IllegalStateException("Only a single observer at a time allowed."), observer);
        } else {
            for (T element; (element = queue.poll()) != null; ) {
                observer.onNext(element);
            }
            relay.subscribeActual(observer);
        }
    }
}
 类似资料:
  • 主要内容:RxJava PublishSubject类 介绍,RxJava PublishSubject类 声明,RxJava PublishSubject类 示例RxJava PublishSubject类 介绍 PublishSubject 向当前订阅的观察者和终端事件发送项目到当前或晚期观察者。 RxJava PublishSubject类 声明 RxJava PublishSubject类 示例 输出结果为:

  • 问题内容: 在该程序包下,您有诸如和之类的类,我想可以将其描述为一些可用的示例。 这些主题如何退订?没有方法,并且调用完全结束了Observable,对吗? 问题答案: 同时是an 和an ,可以像普通可观察对象一样取消订阅。使主题特别之处在于它是可观察者和观察者之间的桥梁。它可以通过释放观测到的项目,也可以发射新的项目。就像对期货的承诺一样,主体是可观察的对象。 这是主题科的简短说明: Asyn

  • 我们发现,在调用主题上的onNext事件后10-20 ms内订阅序列化PublishSubject时;未调用新订阅者的onNext。 在下面的代码片段中;要观察的值[1]被给出为“2000”,并且在使用值为1998[2]的主题上调用onNext()后调用订阅对象();我们看到,如果间隔为10ms,新订阅者将错过主题触发的值2000;然而,如果间隔为50ms或更大,那么新订阅者似乎会收到预期值;这是

  • 我正在运行RxJava并创建一个主题以使用方法生成数据。我正在使用Spring。 这是我的设置: 在RxJava流上生成新数据的方式是通过Autowire private SubjectObserver SubjectObserver,然后调用SubjectObserver。发布(newDataObjGenerated) 无论我为subscribeOn()指定了什么 Schedulers.io()

  • 问题内容: 跟随Redis Pub / Sub 这工作正常,我可以使用以下任何语言发布消息 使用,我可以验证此请求是否已正确发布 当我将订阅者 块 添加到 其他类(侦听器类)中的 该频道时,问题就开始了,如下所示 中的,还表明侦听器已正确订阅 问题是,当我将订户侦听器类添加到相同的Rails应用程序时…它停止工作,导致侦听Redis服务器并停止执行任何其他代码…它只是坐在那里侦听。 因此,有一种方

  • 在这篇文章之后,我正在使用带有RxJava/RxKotlin Flowable的Room。我让它运行,但在带有3个片段的ViewPager中使用它存在问题。 我将向您介绍我的代码: 我有一个带有选项卡布局和三个片段(A、B和收藏夹)的视图分页器。前两个片段包含可以添加到收藏夹的数据列表。 在最喜欢的片段中,我使用Flowable来监听A和B所做的更改并相应地更新列表。但是,当一个项目在A和B中成为