当前位置: 首页 > 知识库问答 >
问题:

重构Google的NetworkBoundResource类以使用RxJava而不是LiveData

强化
2023-03-14

谷歌的android架构组件html" target="_blank">教程中有一部分解释了如何抽象通过网络获取数据的逻辑。在其中,他们创建了一个名为NetworkBoundResource的抽象类,使用LiveData创建一个反应流,作为所有反应网络请求的基础。

public abstract class NetworkBoundResource<ResultType, RequestType> {
private final AppExecutors appExecutors;

private final MediatorLiveData<Resource<ResultType>> result = new MediatorLiveData<>();

@MainThread
NetworkBoundResource(AppExecutors appExecutors) {
    this.appExecutors = appExecutors;
    result.setValue(Resource.loading(null));
    LiveData<ResultType> dbSource = loadFromDb();
    result.addSource(dbSource, data -> {
        result.removeSource(dbSource);
        if (shouldFetch()) {
            fetchFromNetwork(dbSource);
        } else {
            result.addSource(dbSource, newData -> result.setValue(Resource.success(newData)));
        }
    });
}

private void fetchFromNetwork(final LiveData<ResultType> dbSource) {
    LiveData<ApiResponse<RequestType>> apiResponse = createCall();
    // we re-attach dbSource as a new source, it will dispatch its latest value quickly
    result.addSource(dbSource, newData -> result.setValue(Resource.loading(newData)));
    result.addSource(apiResponse, response -> {
        result.removeSource(apiResponse);
        result.removeSource(dbSource);
        //noinspection ConstantConditions
        if (response.isSuccessful()) {
            appExecutors.diskIO().execute(() -> {
                saveCallResult(processResponse(response));
                appExecutors.mainThread().execute(() ->
                        // we specially request a new live data,
                        // otherwise we will get immediately last cached value,
                        // which may not be updated with latest results received from network.
                        result.addSource(loadFromDb(),
                                newData -> result.setValue(Resource.success(newData)))
                );
            });
        } else {
            onFetchFailed();
            result.addSource(dbSource,
                    newData -> result.setValue(Resource.error(response.errorMessage, newData)));
        }
    });
}

protected void onFetchFailed() {
}

public LiveData<Resource<ResultType>> asLiveData() {
    return result;
}

@WorkerThread
protected RequestType processResponse(ApiResponse<RequestType> response) {
    return response.body;
}

@WorkerThread
protected abstract void saveCallResult(@NonNull RequestType item);

@MainThread
protected abstract boolean shouldFetch();

@NonNull
@MainThread
protected abstract LiveData<ResultType> loadFromDb();

@NonNull
@MainThread
protected abstract LiveData<ApiResponse<RequestType>> createCall();
}

根据我的理解,这门课的逻辑是:

a)创建一个名为“result”的MediatorLiveData作为主要返回对象,并将其初始值设置为resource.loading(null)

b)从Android Room db中获取数据作为dbSource LiveData,并将其作为源LiveData添加到“result”中

c)在dbSource LiveData的第一次发射时,从“result”中删除dbSource LiveData并调用“shouldFetchFromNetwork()”,这将

  1. 如果为TRUE,则调用“fetchDataFromNetwork(dbSource)”,该“fetchDataFromNetwork(dbSource)”通过“createCall()”创建网络调用,该“createCall()”返回封装为ApiResponse对象的响应的LiveData
  2. 将dbSource LiveData添加回“result”,然后将发出的值设置到resource.loading(数据)
  3. 将apiResponce LiveData添加到“result”,并在第一次发射时删除dbSource和apiResponce LiveDatas
  4. 如果apiResponse成功,则调用“saveCallResult(processResponse(response))”,并将dbSource LiveData添加回“result”,并将发出的值设置为resource.success(newData)
  5. 如果apiResponse失败,调用“OnFetchFailed()”,将dbSource LiveData添加回“result”,并将发出的值设置为resource.error(response.errorMessage,newData))
  6. 如果为FALSE,只需将dbSource LiveData添加到“result”,并将发出的值设置为resource.success(newData)

鉴于此逻辑是正确的解释,我已经尝试重构该类以使用RxJava Observables而不是LiveData。这是我成功重构的尝试(我删除了初始的resource.loading(空),因为我认为这是多余的)。

public abstract class NetworkBoundResource<ResultType, RequestType> {

private Observable<Resource<ResultType>> result;

@MainThread
NetworkBoundResource() {
    Observable<Resource<ResultType>> source;
    if (shouldFetch()) {
        source = createCall()
                .subscribeOn(Schedulers.io())
                .doOnNext(apiResponse -> saveCallResult(processResponse(apiResponse)))
                .flatMap(apiResponse -> loadFromDb().toObservable().map(Resource::success))
                .doOnError(t -> onFetchFailed())
                .onErrorResumeNext(t -> {
                    return loadFromDb()
                            .toObservable()
                            .map(data -> Resource.error(t.getMessage(), data))

                })
                .observeOn(AndroidSchedulers.mainThread());
    } else {
        source = loadFromDb()
                .toObservable()
                .map(Resource::success);
    }

    result = Observable.concat(
            loadFromDb()
                    .toObservable()
                    .map(Resource::loading)
                    .take(1),
            source
    );
}

public Observable<Resource<ResultType>> asObservable() {return result;}

protected void onFetchFailed() {}

@WorkerThread
protected RequestType processResponse(ApiResponse<RequestType> response) {return response.body;}

@WorkerThread
protected abstract void saveCallResult(@NonNull RequestType item);

@MainThread
protected abstract boolean shouldFetch();

@NonNull
@MainThread
protected abstract Flowable<ResultType> loadFromDb();

@NonNull
@MainThread
protected abstract Observable<ApiResponse<RequestType>> createCall();
}

由于我是RxJava的新手,我的问题是,我是否正确地重构为RxJava并保持与该类的LiveData版本相同的逻辑?

共有1个答案

冀俊良
2023-03-14
public abstract class ApiRepositorySource<RawResponse extends BaseResponse, ResultType> {

    // result is a Flowable because Room Database only returns Flowables
    // Retrofit response will also be folded into the stream as a Flowable
    private Flowable<ApiResource<ResultType>> result; 
    private AppDatabase appDatabase;

    @MainThread
    ApiRepositorySource(AppDatabase appDatabase) {
        this.appDatabase = appDatabase;
        Flowable<ApiResource<ResultType>> source;
        if (shouldFetch()) {
            source = createCall()
                .doOnNext(this::saveCallResult)
                .flatMap(apiResponse -> loadFromDb().toObservable().map(ApiResource::success))
                .doOnError(this::onFetchFailed)
                .onErrorResumeNext(t -> {
                    return loadFromDb()
                            .toObservable()
                            .map(data -> {
                                ApiResource apiResource;

                                if (t instanceof HttpException && ((HttpException) t).code() >= 400 && ((HttpException) t).code() < 500) {
                                    apiResource = ApiResource.invalid(t.getMessage(), data);
                                } else {
                                    apiResource = ApiResource.error(t.getMessage(), data);
                                }

                                return apiResource;
                            });
                })
                .toFlowable(BackpressureStrategy.LATEST);
        } else {
            source = loadFromDb()
                    .subscribeOn(Schedulers.io())
                    .map(ApiResource::success);
        }

        result = Flowable.concat(initLoadDb()
                            .map(ApiResource::loading)
                            .take(1),
                            source)
                .subscribeOn(Schedulers.io());
    }

    public Observable<ApiResource<ResultType>> asObservable() {
        return result.toObservable();
    }

    @SuppressWarnings("WeakerAccess")
    protected void onFetchFailed(Throwable t) {
        Timber.e(t);
    }

    @WorkerThread
    protected void saveCallResult(@NonNull RawResult resultType) {
        resultType.saveResponseToDb(appDatabase);
    }

    @MainThread
    protected abstract boolean shouldFetch();

    @NonNull
    @MainThread
    protected abstract Flowable<ResultType> loadFromDb();

    @NonNull
    @MainThread
    protected abstract Observable<RawResult> createCall();

    @NonNull
    @MainThread
    protected Flowable<ResultType> initLoadDb() {
        return loadFromDb();
    }
}

所以这里是我在多次迭代之后决定使用的东西。这是目前正在生产和工作良好的我的应用程序。以下是一些带走的笔记:

>

  • 创建BaseResponse接口

        public interface BaseResponse {
             void saveResponseToDb(AppDatabase appDatabase);
        }
    

    并在所有api响应对象类中实现它。这样做意味着您不必在每个ApiResource中实现save_to_database逻辑,只要您愿意,就可以将其默认为响应的实现。

    为了简单起见,我选择在onErrorResumeNext块中处理改型错误响应,但我建议您创建一个Transformer类来容纳所有这些逻辑。在本例中,我为ApiResources添加了一个额外的status枚举值,称为invalid,用于400级响应。

    您可能会对LiveData使用Reactive Streams体系结构组件

    实现“android.arch.lifecycle:reactiveStreams:$lifecycle_version”并向该类添加一个名为

        public LiveData<ApiResource<ResultType>> asLiveData {
             return LiveDataReactiveStreams.fromPublisher(result);
        }
    

    从理论上讲,这将非常有效,因为我们的ViewModels不必将可观察到的辐射转换为LiveData辐射,也不必为视图中的可观察到的辐射实现生命周期逻辑。不幸的是,这个流会在每次配置更改时重新构建,因为它会在调用的任何onDestroy中处理LiveData(无论isFinishing是true还是false)。因此,我们必须管理这个流的生命周期,这就违背了最初使用它的目的,或者每次设备旋转时都有重复的调用。

    下面是UserRepository创建ApiNetworkResource实例的示例

    @Singleton
    public class UserRepository {
    
        private final RetrofitApi retrofitApi;
        private final AppDatabase appDatabase;
    
        @Inject
        UserRepository(RetrofitApi retrofitApi, AppDatabase appDatabase) {
            this.retrofitApi = retrofitApi;
            this.appDatabase = appDatabase;
        }
    
        public Observable<ApiResource<User>> getUser(long userId) {
            return new ApiRepositorySource<UserResponse, User>(appDatabase) {
    
                @Override
                protected boolean shouldFetch() {
                    return true;
                }
    
                @NonNull
                @Override
                protected Flowable<User> loadFromDb() {
                    return appDatabase.userDao().getUserFlowable(userId);
                }
    
                @NonNull
                @Override
                protected Observable<UserResponse> createCall() {
                    return retrofitApi.getUserById(userId);
                }
            }.asObservable();
        }
    
    }
    

  •  类似资料:
    • 注意:如果您已经在使用RxJava或Agera这样的库,那么您可以继续使用它们,而不是LiveData。但是当您使用它们或其他方法时,请确保您正确地处理了生命周期,以便在相关的LifecycleOwner停止时数据流暂停,并且在LifecycleOwner被销毁时数据流被销毁。您还可以添加android.arch.lifecycle:reActiveStreams工件,以便将LiveData与另一

    • 我从对象和对象数组中更改了一个对象两次,这样在第一次迭代中,我过滤掉了几个对象,在第二次迭代中,我使用map更改了每个过滤后的对象。我能用减速机或更好的吗?

    • 对许多人来说,这可能是一个简单的问题,但让我困惑。我从凯西·塞拉那里挑选了一个例子,展示了抽象类的实用性,但我无法理解抽象类的整体重要性。 例如,我们有一个带有抽象方法的抽象类, 我的问题是——为什么我们首先需要抽象类来为每种汽车类型定制方法?为什么不在这些汽车子类型中的任何一个中使用这两种方法,比如宝马和其他两个——大众和奥迪——可以简单地覆盖这些方法?

    • RxJava新手,我对接口回调(通过接口变量从代码的内层/模块调用)和RxJava有疑问。要使其更清楚,请快速举例: 标准回调接口实现,接口,A类和B类 当调用classB方法“SomethingOccessed”时,结果是:“回调调用了方法SomethingOccessed”。接口的方法onCallbackCalled(String str)可以从classB中调用任意多次。 类A↓......

    • 问题内容: 在讨论元类时,文档指出: 当然,您也可以覆盖其他类方法(或添加新方法)。例如,在元类中定义自定义方法可在调用类时允许自定义行为,例如,并非总是创建新实例。 我的问题是:假设我想在调用类时具有自定义行为,例如缓存而不是创建新对象。我可以通过重写类的方法来做到这一点。什么时候我想用它定义一个元类?这种方法带来了什么是无法实现的? 问题答案: 直接回答你的问题是:当你想要做 更多的 不仅仅是

    • 说我有一些阶级Foo 我有一个使用Foo并违反LoD的程序 重构以使用LoD如下所示: IntelliJ IDEA有很多重构方法(例如提取到方法、提取到变量、内联)。 IntelliJ IDEA中有没有一种方法可以重构像条形图这样的代码。getFoo()。getX()类似于条形图。getFooX()?