当前位置: 首页 > 工具软件 > RxCache > 使用案例 >

RxCache源码心得(3)

西门品
2023-12-01

续接上一篇,来讲一下ProcessorProvidersBehaviour的process方法,代码如下:

@Override
  public <T> Observable<T> process(final io.rx_cache2.ConfigProvider configProvider) {
    return Observable.defer(new Callable<ObservableSource<? extends T>>() {
      @Override public ObservableSource<? extends T> call() throws Exception {
        if (hasProcessesEnded) {
          return getData(configProvider);
        }

        return oProcesses.flatMap(new Function<Integer, ObservableSource<? extends T>>() {
          @Override public ObservableSource<? extends T> apply(Integer ignore) throws Exception {
            return getData(configProvider);
          }
        });
      }
    });
  }

这里做的处理是确保数据迁移完成后才进行getData(),看一下getData(),

 <T> Observable<T> getData(final io.rx_cache2.ConfigProvider configProvider) {
    Record<Object> record = twoLayersCache.retrieve(configProvider.getProviderKey(), configProvider.getDynamicKey(),
        configProvider.getDynamicKeyGroup(), useExpiredDataIfLoaderNotAvailable,
        configProvider.getLifeTimeMillis(), configProvider.isEncrypted());

    Observable<Reply> replyObservable;

    if (record != null && !configProvider.evictProvider().evict()) {
      replyObservable = Observable.just(new Reply(record.getData(), record.getSource(), configProvider.isEncrypted()));
    } else {
      replyObservable = getDataFromLoader(configProvider, record);
    }

    return (Observable<T>) replyObservable.map(new Function<Reply, Object>() {
      @Override public Object apply(Reply reply) throws Exception {
        return ProcessorProvidersBehaviour.this.getReturnType(configProvider, reply);
      }
    });
  }

twoLayerCache.retrieve()中调用了封装的retrieveRecord来检索缓存(之前的篇章已经讲过RetrieveRecord)。接下来是configProvider.evictProvider().evict()指的是缓存是否可立即清除,如果可立即清除,就选择调用getDataFromLoader获取数据(),如果不是可立即清除,就选择调用缓存数据。看一下getDataFromLoader(),

  private Observable<Reply> getDataFromLoader(final io.rx_cache2.ConfigProvider configProvider,
      final Record record) {
    return configProvider.getLoaderObservable().map(new Function<Object, Reply>() {
      @Override public Reply apply(Object data) throws Exception {
        boolean useExpiredData = configProvider.useExpiredDataIfNotLoaderAvailable() != null ?
            configProvider.useExpiredDataIfNotLoaderAvailable()
            : useExpiredDataIfLoaderNotAvailable;

        if (data == null && useExpiredData && record != null) {
          return new Reply(record.getData(), record.getSource(), configProvider.isEncrypted());
        }

        clearKeyIfNeeded(configProvider);

        if (data == null) {
          throw new io.rx_cache2.RxCacheException(io.rx_cache2.internal.Locale.NOT_DATA_RETURN_WHEN_CALLING_OBSERVABLE_LOADER
              + " "
              + configProvider.getProviderKey());
        }

        twoLayersCache.save(configProvider.getProviderKey(), configProvider.getDynamicKey(),
            configProvider.getDynamicKeyGroup(), data, configProvider.getLifeTimeMillis(),
            configProvider.isExpirable(), configProvider.isEncrypted());
        return new Reply(data, Source.CLOUD, configProvider.isEncrypted());
      }
    }).onErrorReturn(new Function<Object, Object>() {
      @Override public Object apply(Object o) throws Exception {
        clearKeyIfNeeded(configProvider);

        boolean useExpiredData = configProvider.useExpiredDataIfNotLoaderAvailable() != null ?
            configProvider.useExpiredDataIfNotLoaderAvailable()
            : useExpiredDataIfLoaderNotAvailable;

        if (useExpiredData && record != null) {
          return new Reply(record.getData(), record.getSource(), configProvider.isEncrypted());
        }

        throw new io.rx_cache2.RxCacheException(io.rx_cache2.internal.Locale.NOT_DATA_RETURN_WHEN_CALLING_OBSERVABLE_LOADER
            + " "
            + configProvider.getProviderKey(), (Throwable) o);
      }
    });
  }

 首先是configProvider.getLoaderObservable(),这个是缓存接口中传入的参数,使用的时候是将实际获取数据的被观测者传入的。useExpiredData是是否使用过期数据。如果实际获取不到数据(data==null),可以使用过期缓存,且有缓存就可以使用该缓存。如果实际能获取数据,先清理缓存(clearKeyIfNeeded(),其中调用了twoLayersCache的清理方法)(进入getDataFromLoader方法的前提条件就是缓存可立即清除),然后将新获取到的数据保存为缓存(twoLayersCache.save()封装了SaveRecord,用于保存缓存)。onErrorReturn,如果实际获取数据出错,则清理缓存,然后如果可使用过期缓存,缓存不为空(这里的缓存已经被检索出来了,所以提前的清理不影响),则使用该缓存。

然后讲一下Reply,构造函数传入三个参数,缓存的数据,缓存的来源,是否加密。再说一下ProcessorProvidersBehaviour.this.getReturnType(configProvider, reply),代码如下:

 private Object getReturnType(io.rx_cache2.ConfigProvider configProvider, Reply reply) {
    Object data = getDeepCopy.deepCopy(reply.getData());

    if (configProvider.requiredDetailedResponse()) {
      return new Reply<>(data, reply.getSource(), configProvider.isEncrypted());
    } else {
      return data;
    }
  }

getDeepCopy.deepCopy()用来对数据进行反序列化,返回的对象是具体的数据对象,requiredDetailedResponse()确定返回的类型是Reply还是只有数据。这里是process的最后一个环节。

回到这个process最初的地方:

  @Override public Object invoke(final Object proxy, final Method method, final Object[] args)
      throws Throwable {
    return Observable.defer(new Callable<ObservableSource<?>>() {
      @Override public ObservableSource<?> call() throws Exception {
        Observable observable =
            processorProviders.process(proxyTranslator.processMethod(method, args));
        Class<?> methodType = method.getReturnType();

        if (methodType == Observable.class) return Observable.just(observable);

        if (methodType == Single.class) return Observable.just(Single.fromObservable(observable));

        if (methodType == Maybe.class) {
          return Observable.just(Maybe.fromSingle(Single.fromObservable(observable)));
        }

        if (method.getReturnType() == io.reactivex.Flowable.class) {
          return Observable.just(observable.toFlowable(BackpressureStrategy.MISSING));
        }

        String errorMessage = method.getName() + io.rx_cache2.internal.Locale.INVALID_RETURN_TYPE;
        throw new RuntimeException(errorMessage);
      }
    }).blockingFirst();
  }

可以看到对于缓存接口的返回类型不同(method.getReturnType()),还做了不同的调整。


到这里RxCache主要内容基本说完了,看完后再去使用RxCache应该会更容易上手吧。

。。。还有RxCache用到的序列化工具,内存缓存的存储形式,加密解密还没有,这个以后应该会研究一下

的吧。。

 类似资料: