当前位置: 首页 > 知识库问答 >
问题:

RxJava-等待其他活动/片段中的其他可观察对象的retryWhen完成

凤棋
2023-03-14

用例:我正在开发一个Android应用程序,它有一个带有4个选项卡的视图页码,所有这些选项卡都是片段。对于每个选项卡/片段,我必须连接到具有 Oauth 的 REST Api,并且令牌每 5 分钟过期一次。

当前解决方案:使用RxJava和retryWhen操作符,我可以在收到401 HTTP错误时重新进行身份验证。对于订阅和消费的每个可观察流,使用:

retryWhen(refreshTokenAuthenticator)

因此,当令牌到期时,流将使用它,然后执行真正的api调用。

问题:这仅适用于一个订阅中使用的一个可观察量,但我需要允许用户在不阻止他/她的情况下在选项卡之间切换,同时考虑到401错误可能随时出现在任何API调用的任何片段中。

问题:有没有一种方法可以让可观察对象等待其他可观察对象使用不在同一流/订阅者中的onNext()完成?事实上在不同的Fragments中?所以api调用场景将如下所示:

Api Call Fragment A --> request
Api Call Fragment A <-- response 200 Code

Api Call Fragment B --> request
Api Call Fragment B <-- response 401 Code (retryWhen in action)
Api Call Fragment B --> request (refreshToken)
Api Call Fragment B <-- response 200 (with new access token saved in the app)

几乎在同一时间...

Api Call Fragment C --> request
Api Call Fragment C <-- response 401 Code (retryWhen in action)
Observable in Fragment C Waits till Observable in Fragment B finish (onNext())
Api Call Fragment C --> request
Api Call Fragment C <-- response 200

这是我已经拥有的,每个API调用看起来几乎相同:

public void getDashboardDetail() {

    Subscription subscription = repository.getDashboard()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .retryWhen(tokenAuthenticator)
            .subscribe(new RestHttpObserver<UserDataDto>() {
                @Override
                public void onUnknownError(Throwable e) {
                    getMvpView().onError(e);
                }

                @Override
                public void onHostUnreachable() {
                    getMvpView().onHostUnreachable();
                }

                @Override
                public void onHttpErrorCode(int errorCode, ErrorDto errorDto) {
                    getMvpView().onHttpErrorCode(errorCode, errorDto);
                }

                @Override
                public void onCompleted() {
                    //Do nothing...
                }

                @Override
                public void onNext(UserDataDto response) {
                    getMvpView().onReceiveUserData(response);
                }
            });

    this.compositeSubscription.add(subscription);

}

和我的RefreshTokenAuthenticator:

public class RefreshTokenAuthenticator implements Func1<Observable<? extends Throwable>, Observable<?>> {

private static final int RETRY_COUNT = 1;

private static final int HTTP_ERROR_CODE = 401;

@Inject
private UserRepository repository;

@Inject
private SessionManager sessionManager;

@Inject
private MyApplication application;


@Inject
private RefreshTokenAuthenticator() {
}

@Override
public synchronized Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .flatMap(new Func1<Throwable, Observable<?>>() {
                int retryCount = 0;

                @Override
                public Observable<?> call(final Throwable throwable) {

                    retryCount++;
                    if (retryCount <= RETRY_COUNT && throwable instanceof HttpException) {
                        int errorCode = ((HttpException) throwable).code();
                        if (errorCode == HTTP_ERROR_CODE) {
                            return repository
                                    .refreshToken(sessionManager.getAuthToken().getRefreshToken())
                                    .observeOn(AndroidSchedulers.mainThread())
                                    .subscribeOn(Schedulers.io())

                                    .doOnNext(tokenDto -> sessionManager.saveAuthToken(tokenDto))
                                    .doOnError(throwable1 -> {
                                        Log.e("RefreshTokenAuth", "DoOnError", throwable1);
                                        application.logout();
                                    });

                        }
                    }
                    // No more retries. Pass the original Retrofit error through.
                    return Observable.error(throwable);
                }
            });
}

}

共有2个答案

毋城
2023-03-14

最后,只需添加一个全局(在我的应用程序类中)布尔值(如果应用当前是否重新进行身份验证),即可使其正常工作。它实际上允许两个401 HTTP错误,但第二个错误继续在onNext()中并重新执行初始可观察量。我想做一些更被动的事情,但至少这解决了我的主要问题。

public class RefreshTokenAuthenticator implements Func1<Observable<? extends Throwable>, Observable<?>> {

private static final int RETRY_COUNT = 1;

private static final int HTTP_ERROR_CODE = 401;

@Inject
private UserRepository repository;

@Inject
private SessionManager sessionManager;

@Inject
private MyApplication application;


@Inject
private RefreshTokenAuthenticator() {
}

@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .flatMap(new Func1<Throwable, Observable<?>>() {
                int retryCount = 0;

                @Override
                public Observable<?> call(final Throwable throwable) {

                    retryCount++;
                    if (retryCount <= RETRY_COUNT && throwable instanceof HttpException) {
                        int errorCode = ((HttpException) throwable).code();

                        if (errorCode == HTTP_ERROR_CODE) {

                            Log.i("RefreshTokenAuth", "APPLICATION IS AUTHENTICATING = " + application.isAuthenticating);
                            if (!application.isAuthenticating) {
                                application.isAuthenticating = true;

                                String refreshToken = sessionManager.getAuthToken().getRefreshToken();

                                return repository
                                        .refreshToken(refreshToken)
                                        .observeOn(AndroidSchedulers.mainThread())
                                        .subscribeOn(Schedulers.io())
                                        .doOnCompleted(() -> application.isAuthenticating = false)
                                        .doOnNext(tokenDto -> sessionManager.saveAuthToken(tokenDto))
                                        .doOnError(throwable1 -> {
                                            Log.e("RefreshTokenAuth", "DoOnError", throwable1);
                                            application.logout();
                                        });
                            } else {
                                return Observable.just(1).doOnNext(o -> Log.i("RefreshTokenAuth", "Let's try another shot!"));
                            }
                        }
                    }
                    // No more retries. Pass the original Retrofit error through.
                    return Observable.error(throwable);
                }
            });
}

}

咸弘雅
2023-03-14

1) 使身份验证令牌的源缓存上一个成功结果提供使此缓存结果无效的方法

class Auth {
    private Observable<AuthToken> validToken;

    synchronized void invalidateAuthToken() {
        validToken = null;
    }

    synchronized Observable<AuthToken> getAuthToken() {
        if (validToken == null) {
            validToken = repository
                .refreshToken(...) // start async request
                .doOnError(e -> invalidateAuthToken())
                .replay(1); // cache result
        }
        return validToken; // share among all subscribers
    }
}

2) 要访问web服务,请使用以下模式:

Observable<Data1> dataSource1 = 
    Observable.defer(auth.getAuthToken()) // always start from token
        .flatMap(token ->
            repository.fetchData1(token, ...)) // use token to call web service
        .doOnError(e -> auth.invalidateAuthToken())
        .retry(N); // retry N times
 类似资料:
  • 我有这个问题,我一直在寻找,但找不到解决方案(或者也许我不能根据其他答案做出解决方案)。 我的问题是,我需要找到一种方法来等待可观察的(有自己的订户)并等待另一个可观察的(有自己的订户)完成。 场景是这样的: 奥布1- 奥布斯2 - 我主要担心的是我需要两个订阅者。在我看来,obs1 和 obs2 并行运行,但需要检查 obs1 是否以新的会话令牌完成。也许这不是RxJava的主要目的。 Obs1

  • 当我创建时,我需要从另一个活动中删除一个片段,而不是创建它的活动: 然后我可以从片段中访问另一个活动,假设片段中有一个按钮可以打开另一个活动,现在,停留在该活动中我想删除启动该活动的片段,我得到该片段的参考号通过意图。所以我尝试: 但它给了我一个nullPointerException, 03-07 22:42:33.270 30993-30993/河马。rggmiranda。hiposleep

  • 我有3个活动A、B和C。A导致B,B导致C。我希望能够在A和B之间来回移动,但我希望在C开始后完成A和B。我知道如何在通过意图启动C时关闭B,但在启动C时如何关闭A?

  • 我的代码中有两个基本类:Airport和Flight(出发和到达)。机场由抵港飞行物清单和离港飞行物清单组成。在我的主要活动中,我持有一个airport对象列表,并在列表视图中表示它们。轻触列表视图对象,我启动了一个新的活动,用于显示该airport对象的所有到达航班(ActivityArrivalFlights),其中我将airport对象作为字段保存。在本练习中,有一个将到达航班添加到同一对象

  • Navicat 还能让你管理其他 SQLite 对象:索引和触发器。在主窗口的主工具栏点击相应的按钮来打开对象列表。