当前位置: 首页 > 知识库问答 >
问题:

RxJava2(Android)中的Subscribewith和subscribe?

姬俊能
2023-03-14

何时调用subscribeWith方法而不是普通的Subscribe?用例是什么?

compositeDisposable.add(get()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe(this::handleResponse, this::handleError));

VS

   compositeDisposable.add(get()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
              //  .subscribe(this::handleResponse, this::handleError);
                .subscribeWith(new DisposableObserver<News>() {
                    @Override public void onNext(News value) {
                        handleResponse(value);
                    }

                    @Override public void onError(Throwable e) {
                        handleError(e);
                    }

                    @Override public void onComplete() {
                       // dispose here ? why? when the whole thing will get disposed later
                       //via  compositeDisposable.dispose();  in onDestroy();
                    }
                }));

谢谢你

根据文档,这两个实例都返回一次性的SingleObserver实例:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <E extends SingleObserver<? super T>> E subscribeWith(E observer) {
    subscribe(observer);
    return observer;
}

@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) {
    ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ConsumerSingleObserver<T> s = new ConsumerSingleObserver<T>(onSuccess, onError);
    subscribe(s);
    return s;
}

其中ConsumerSingleObserver类实现SingleObserver和Disable。

共有1个答案

龙弘济
2023-03-14

可观察#订阅说明:

在第一个代码段中:

.subscribe(this::HandleResponse,this::HandleError));

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete) {

另一个选项允许您简单地传入观察者(注意:void方法)(编辑2-此方法在observableSource中定义,它是observable扩展的接口。)

public final void subscribe(Observer<? super T> observer)

在问题的第二个代码段中,您使用了subscribeWith方法,该方法只返回传入的observer(为了方便/缓存等):

public final <E extends Observer<? super T>> E subscribeWith(E observer)

观察员#关于完整的解释:

/**
 * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
 * <p>
 * The {@link Observable} will not call this method if it calls {@link #onError}.
 */
void onComplete();

编辑:对您评论的答复:

但是我还是没有使用subscribeWith,你说它通过观察者进行缓存等,它传递到哪里去了?完成了吗?根据我的理解,subscribeWith实际上并不是在消费可观察的(或单一的)权利?

为了进一步澄清SubscribeWith的解释,我的意思是它将使用您传递到SubscribeWith中的Observer对象(与Subscribe方法完全相同),但是它还将返回相同的观察者给您。在编写本报告时,subscribeWith的实现是:

public final <E extends Observer<? super T>> E subscribeWith(E observer) {
    subscribe(observer);
    return observer;
}
Observable<Integer> source = Observable.range(1, 10);
CompositeDisposable composite = new CompositeDisposable();

ResourceObserver<Integer> rs = new ResourceObserver<>() {
     // ...
};

composite.add(source.subscribeWith(rs));

请参阅此处,使用subscribeWith将返回与实例化相同的ResourceObserver对象。这为执行订阅提供了方便&在一行中将ResourceObserver添加到CompositeDisposable(注意,ResourceObservable实现了Disposable)

编辑2回复第二个评论。

Source.SubscribeWith(rs);Source.Subscribe(rs);两者都返回SingleObserver实例,

observableSource#subscribe(Observer<?super t>Observer)不返回Observer。它是一个void方法(请参阅上面Observable#subscribe解释下的注释)而可观察的#subscribeWith返回观察者。如果我们要使用observableSource#subscribe重写示例用法代码,那么我们必须在如下两行中执行:

source.subscribe(rs); //ObservableSource#subscribe is a void method so nothing will be returned
composite.add(rs);

可观察的#subscribeWith方法使我们只需一行就可以方便地执行上述操作composite.add(Source.subscribeWith(rs));

它可能会与所有看起来有点相似的重载subscribe方法混淆,但也有不同之处(有些是细微的)。查看代码和文档有助于提供它们之间的区别。

subscribeWith方法在您具有可能要重用的Observer的特定实现时非常有用。例如,在上面的示例代码中,它在订阅中提供了ResourceObserver的特定实现,从而继承了它的功能,同时仍然允许您处理onNext onError和OnComplete。

//sample code..
public class GetNewsObserver extends DisposableObserver<News> {
    //implement your onNext, onError, onComplete.
    ....
}
compositeDisposable.add(get()
    ...
    .subscribeWith(new GetNewsObserver()));
 类似资料:
  • Learning RxJava 2 for Android by example Take the MindOrks Android Online Course and Learn RxJava How to use RxJava 2 in Android Application How to migrate from RxJava 1.0 to RxJava 2.0 How to use RxJ

  • 我正在android中学习RxJava2。谁能解释一下,我们如何使用RxJava2将数据插入SQLiteDatabase。这里是我试图使用的代码示例,但它将数据插入数据库六次; //OnClick(单击) //可观察 //观察员 //添加方法

  • 我做错了什么?我如何将结果与可观察的结果结合起来?最好不是用Lamda符号。 我查看的其他资源:-android rxjava2/retrofit2链接调用与分页令牌

  • 假设我有一个间隔,我给了它一个计算调度器。这样地: 那么,平面图{...}中发生的一切是否也会被调度在计算线程上? 在Observable.interval的源代码中,它说: 作为RxJava的初学者,我很难理解这个评论。我知道间隔计时器/等待逻辑发生在计算线程上。但是,关于要发出的项目的最后一部分是否也意味着发出的项目将在同一个线程上使用?还是需要观察?这样地: 如果我想在计算线程上处理emit

  • 请注意,我使用以下代码得到了相同的结果: 所以问题是,fooObservable直到订阅了PublishSubject之后才订阅PublishSubject, 是否有一种方法可以在第一次订阅FooObservable之后立即运行代码? 如果请求与已经订阅的请求匹配,那么observable应该在订阅时立即提供最新的匹配值。 当没有订阅者时,我需要取消我包装的服务的订阅。

  • 注意:我使用()而不是尖括号 我有一个MVP android应用程序,它使用Retofit2和RxJava2从GitHub Api获取数据。代码运行良好,我能够恢复一个可观察的(响应(列表(头)),其中响应来自Reformation2,头来自OkHttp3。 但是当涉及到单元测试时,我遇到了一个问题:我无法模拟响应(List(Headers))。Retrofit2响应类有一个私有构造函数,所以我无