当前位置: 首页 > 知识库问答 >
问题:

RxJava onErrorResumeNext()

夹谷星河
2023-03-14
public Observable<ModelA> observeGetAPI(){
    return retrofitAPI.getObservableAPI1()
            .flatMap(observableApi1Response -> {
                ModelA model = new ModelA();

                model.setApi1Response(observableApi1Response);

                return retrofitAPI.getObservableAPI2()
                        .map(observableApi2Response -> {
                            // Blah blah blah...
                            return model;
                        })
                        .onErrorResumeNext(observeGetAPIFallback(model))
                        .subscribeOn(Schedulers.newThread())
            })
            .onErrorReturn(throwable -> {
                // Blah blah blah...
                return model;
            })
            .subscribeOn(Schedulers.newThread());
}

private Observable<ModelA> observeGetAPIFallback(ModelA model){
    return retrofitAPI.getObservableAPI3().map(observableApi3Response -> {
        // Blah blah blah...
        return model;
    }).onErrorReturn(throwable -> {
        // Blah blah blah...
        return model;
    })
    .subscribeOn(Schedulers.immediate());
}

Subscription subscription;
subscription = observeGetAPI.subscribe(ModelA -> {
    // IF THERE'S AN ERROR WE NEVER GET B RESPONSE HERE...
}, throwable ->{
    // WE NEVER GET HERE... onErrorResumeNext()
},
() -> { // IN CASE OF AN ERROR WE GET STRAIGHT HERE, MEANWHILE, B GETS EXECUTED }
);

编辑:以下是正在发生的事情的大致时间线:

---> HTTP GET REQUEST B
<--- HTTP 200 REQUEST B RESPONSE (SUCCESS)

---> HTTP GET REQUEST A
<--- HTTP 200 REQUEST A RESPONSE (FAILURE!)

---> HTTP GET FALLBACK A
** onComplete() called! ---> Subscriber never gets fallback response since onComplete() gets called before time.
<--- HTTP 200 FALLBACK A RESPONSE (SUCCESS)

这里有一个链接,指向我制作的一个简单的图,它代表了我想要发生的事情:图

共有1个答案

南宫鸿晖
2023-03-14

下面使用的Rx呼叫应该模拟您正在进行的改装。

fallbackObservable =
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        logger.v("emitting A Fallback");
                        subscriber.onNext("A Fallback");
                        subscriber.onCompleted();
                    }
                })
                .delay(1, TimeUnit.SECONDS)
                .onErrorReturn(new Func1<Throwable, String>() {
                    @Override
                    public String call(Throwable throwable) {
                        logger.v("emitting Fallback Error");
                        return "Fallback Error";
                    }
                })
                .subscribeOn(Schedulers.immediate());

stringObservable =
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        logger.v("emitting B");
                        subscriber.onNext("B");
                        subscriber.onCompleted();
                    }
                })
                .delay(1, TimeUnit.SECONDS)
                .flatMap(new Func1<String, Observable<String>>() {
                    @Override
                    public Observable<String> call(String s) {
                        logger.v("flatMapping B");
                        return Observable
                                .create(new Observable.OnSubscribe<String>() {
                                    @Override
                                    public void call(Subscriber<? super String> subscriber) {
                                        logger.v("emitting A");
                                        subscriber.onNext("A");
                                        subscriber.onCompleted();
                                    }
                                })
                                .delay(1, TimeUnit.SECONDS)
                                .map(new Func1<String, String>() {
                                    @Override
                                    public String call(String s) {
                                        logger.v("A completes but contains invalid data - throwing error");
                                        throw new NotImplementedException("YUCK!");
                                    }
                                })
                                .onErrorResumeNext(fallbackObservable)
                                .subscribeOn(Schedulers.newThread());
                    }
                })
                .onErrorReturn(new Func1<Throwable, String>() {
                    @Override
                    public String call(Throwable throwable) {
                        logger.v("emitting Return Error");
                        return "Return Error";
                    }
                })
                .subscribeOn(Schedulers.newThread());

subscription = stringObservable.subscribe(
        new Action1<String>() {
            @Override
            public void call(String s) {
                logger.v("onNext " + s);
            }
        },
        new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                logger.v("onError");
            }
        },
        new Action0() {
            @Override
            public void call() {
                logger.v("onCompleted");
            }
        });

log语句的输出为:

RxNewThreadScheduler-1 emitting B
RxComputationThreadPool-1 flatMapping B
RxNewThreadScheduler-2 emitting A
RxComputationThreadPool-2 A completes but contains invalid data - throwing error
RxComputationThreadPool-2 emitting A Fallback
RxComputationThreadPool-1 onNext A Fallback
RxComputationThreadPool-1 onCompleted

这好像是你在找的东西,但也许我错过了什么。

 类似资料:

相关问答

相关文章

相关阅读