续接上一篇,来讲一下ProcessorProvidersBehaviour的process方法,代码如下:
@Override
public <T> Observable<T> process(final io.rx_cache2.ConfigProvider configProvider) {
return Observable.defer(new Callable<ObservableSource<? extends T>>() {
@Override public ObservableSource<? extends T> call() throws Exception {
if (hasProcessesEnded) {
return getData(configProvider);
}
return oProcesses.flatMap(new Function<Integer, ObservableSource<? extends T>>() {
@Override public ObservableSource<? extends T> apply(Integer ignore) throws Exception {
return getData(configProvider);
}
});
}
});
}
这里做的处理是确保数据迁移完成后才进行getData(),看一下getData(),
<T> Observable<T> getData(final io.rx_cache2.ConfigProvider configProvider) {
Record<Object> record = twoLayersCache.retrieve(configProvider.getProviderKey(), configProvider.getDynamicKey(),
configProvider.getDynamicKeyGroup(), useExpiredDataIfLoaderNotAvailable,
configProvider.getLifeTimeMillis(), configProvider.isEncrypted());
Observable<Reply> replyObservable;
if (record != null && !configProvider.evictProvider().evict()) {
replyObservable = Observable.just(new Reply(record.getData(), record.getSource(), configProvider.isEncrypted()));
} else {
replyObservable = getDataFromLoader(configProvider, record);
}
return (Observable<T>) replyObservable.map(new Function<Reply, Object>() {
@Override public Object apply(Reply reply) throws Exception {
return ProcessorProvidersBehaviour.this.getReturnType(configProvider, reply);
}
});
}
twoLayerCache.retrieve()中调用了封装的retrieveRecord来检索缓存(之前的篇章已经讲过RetrieveRecord)。接下来是configProvider.evictProvider().evict()指的是缓存是否可立即清除,如果可立即清除,就选择调用getDataFromLoader获取数据(),如果不是可立即清除,就选择调用缓存数据。看一下getDataFromLoader(),
private Observable<Reply> getDataFromLoader(final io.rx_cache2.ConfigProvider configProvider,
final Record record) {
return configProvider.getLoaderObservable().map(new Function<Object, Reply>() {
@Override public Reply apply(Object data) throws Exception {
boolean useExpiredData = configProvider.useExpiredDataIfNotLoaderAvailable() != null ?
configProvider.useExpiredDataIfNotLoaderAvailable()
: useExpiredDataIfLoaderNotAvailable;
if (data == null && useExpiredData && record != null) {
return new Reply(record.getData(), record.getSource(), configProvider.isEncrypted());
}
clearKeyIfNeeded(configProvider);
if (data == null) {
throw new io.rx_cache2.RxCacheException(io.rx_cache2.internal.Locale.NOT_DATA_RETURN_WHEN_CALLING_OBSERVABLE_LOADER
+ " "
+ configProvider.getProviderKey());
}
twoLayersCache.save(configProvider.getProviderKey(), configProvider.getDynamicKey(),
configProvider.getDynamicKeyGroup(), data, configProvider.getLifeTimeMillis(),
configProvider.isExpirable(), configProvider.isEncrypted());
return new Reply(data, Source.CLOUD, configProvider.isEncrypted());
}
}).onErrorReturn(new Function<Object, Object>() {
@Override public Object apply(Object o) throws Exception {
clearKeyIfNeeded(configProvider);
boolean useExpiredData = configProvider.useExpiredDataIfNotLoaderAvailable() != null ?
configProvider.useExpiredDataIfNotLoaderAvailable()
: useExpiredDataIfLoaderNotAvailable;
if (useExpiredData && record != null) {
return new Reply(record.getData(), record.getSource(), configProvider.isEncrypted());
}
throw new io.rx_cache2.RxCacheException(io.rx_cache2.internal.Locale.NOT_DATA_RETURN_WHEN_CALLING_OBSERVABLE_LOADER
+ " "
+ configProvider.getProviderKey(), (Throwable) o);
}
});
}
首先是configProvider.getLoaderObservable(),这个是缓存接口中传入的参数,使用的时候是将实际获取数据的被观测者传入的。useExpiredData是是否使用过期数据。如果实际获取不到数据(data==null),可以使用过期缓存,且有缓存就可以使用该缓存。如果实际能获取数据,先清理缓存(clearKeyIfNeeded(),其中调用了twoLayersCache的清理方法)(进入getDataFromLoader方法的前提条件就是缓存可立即清除),然后将新获取到的数据保存为缓存(twoLayersCache.save()封装了SaveRecord,用于保存缓存)。onErrorReturn,如果实际获取数据出错,则清理缓存,然后如果可使用过期缓存,缓存不为空(这里的缓存已经被检索出来了,所以提前的清理不影响),则使用该缓存。
然后讲一下Reply,构造函数传入三个参数,缓存的数据,缓存的来源,是否加密。再说一下ProcessorProvidersBehaviour.this.getReturnType(configProvider, reply),代码如下:
private Object getReturnType(io.rx_cache2.ConfigProvider configProvider, Reply reply) {
Object data = getDeepCopy.deepCopy(reply.getData());
if (configProvider.requiredDetailedResponse()) {
return new Reply<>(data, reply.getSource(), configProvider.isEncrypted());
} else {
return data;
}
}
getDeepCopy.deepCopy()用来对数据进行反序列化,返回的对象是具体的数据对象,requiredDetailedResponse()确定返回的类型是Reply还是只有数据。这里是process的最后一个环节。
回到这个process最初的地方:
@Override public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
return Observable.defer(new Callable<ObservableSource<?>>() {
@Override public ObservableSource<?> call() throws Exception {
Observable observable =
processorProviders.process(proxyTranslator.processMethod(method, args));
Class<?> methodType = method.getReturnType();
if (methodType == Observable.class) return Observable.just(observable);
if (methodType == Single.class) return Observable.just(Single.fromObservable(observable));
if (methodType == Maybe.class) {
return Observable.just(Maybe.fromSingle(Single.fromObservable(observable)));
}
if (method.getReturnType() == io.reactivex.Flowable.class) {
return Observable.just(observable.toFlowable(BackpressureStrategy.MISSING));
}
String errorMessage = method.getName() + io.rx_cache2.internal.Locale.INVALID_RETURN_TYPE;
throw new RuntimeException(errorMessage);
}
}).blockingFirst();
}
可以看到对于缓存接口的返回类型不同(method.getReturnType()),还做了不同的调整。
到这里RxCache主要内容基本说完了,看完后再去使用RxCache应该会更容易上手吧。
。。。还有RxCache用到的序列化工具,内存缓存的存储形式,加密解密还没有,这个以后应该会研究一下
的吧。。