当前位置: 首页 > 工具软件 > race-cache > 使用案例 >

【Java】guava(一)LoadingCache使用及原理

邓夕
2023-12-01

LoadingCache是guava开发包下的一款十分实用的本地缓存工具类。

什么时候用?

对于一些十分常用的热点数据,可以考虑加本地缓存。比如一些热点新闻的数据,否则会导致存储热点问题。比如redis会有热key。

接下来看一下LoadingCache接口的主要方法:

public interface LoadingCache<K, V> extends Cache<K, V>, Function<K, V> {
  V get(K key) throws ExecutionException;
  V getUnchecked(K key);
  ImmutableMap<K, V> getAll(Iterable<? extends K> keys) throws ExecutionException;
  void refresh(K key);
}

get:先看缓存中的数据是否过期,如果过期了,阻塞调用load方法加载数据;如果没有过期,再看是否需要刷新,如果需要,调用reload方法尽可能地以异步方式刷新数据,如果没刷完,返回旧值;为什么是尽可能?需要以异步方式重写reload方法,guava提供的reload默认实现是同步的;

getUnchecked:同get,只是将受检异常转为非受检了;

getAll:会调用loadAll加载数据,默认情况下等价于多次get;

refresh:会调用reload方法,尽可能地以异步方式刷新数据,同get方法里的过期场景;

看一个简答的例子:

LoadingCache<String, Integer> loadingCache = CacheBuilder
        .newBuilder()
        .refreshAfterWrite(2, TimeUnit.SECONDS)
        .expireAfterWrite(5, TimeUnit.SECONDS)
        .build(new CacheLoader<String, Integer>() {
            @Override
            public Integer load(String s) throws Exception {
                System.out.println("start to load");
                Thread.sleep(5000);
                System.out.println("end load");
                return ThreadLocalRandom.current().nextInt(10);
            }

            @Override
            public ListenableFuture<Integer> reload(String key, Integer oldValue) throws Exception {
                ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
                return executorService.submit(() -> load(key));
//                        return super.reload(key, oldValue);
            }
        });

refresh超时参数:如果超过阈值会调用reload方法刷新缓存;

expire超时参数:如果超过阈值会调用load方法阻塞时load缓存;

这俩参数的设定有一个比价好的策略:expire长于refresh。这样做的目的在于,尽量避免出现因为expire导致的阻塞,因为如果load需要较长时间,那么业务上就会感知到,这会影响用户体验。如果在expire前先refresh了,之后就不需要expire了,而refresh本身是异步的,对业务几乎无影响。那能不能只有refresh?也不行。想想一下,如果一个服务长时间没有被访问,突然来了一波请求,此时如果不设置expire,那么异步刷新完成前业务将会读到一个很旧的值,这也不太好,所以还是必须设置expire值。

另外上面的例子中除了实现了必要的load方法外,还覆盖了guava默认的reload方法,提供了一个异步版本的reload简单实现。我们先来看一下默认的reload是怎么做的:

@GwtIncompatible // Futures
public ListenableFuture<V> reload(K key, V oldValue) throws Exception {
checkNotNull(key);
checkNotNull(oldValue);
return Futures.immediateFuture(load(key));
}

可以看到,这里直接调用了load方法去获取返回值,然后将返回值设置到一个ImmediateFuture类型的future中。本质上没有异步执行。

接下来重点看下get和refresh逻辑:

V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
  try {
    if (count != 0) { // read-volatile
      ReferenceEntry<K, V> e = getEntry(key, hash);
      if (e != null) {
        long now = map.ticker.read();
        V value = getLiveValue(e, now);
        if (value != null) {
          recordRead(e, now);
          statsCounter.recordHits(1);
          return scheduleRefresh(e, key, hash, value, now, loader);
        }
        ValueReference<K, V> valueReference = e.getValueReference();
        if (valueReference.isLoading()) {
          return waitForLoadingValue(e, key, valueReference);
        }
      }
    }
    return lockedGetOrLoad(key, hash, loader);
  } catch (ExecutionException ee) {
  }
}

LoadingCache的实现是一个map,里面也有segment的概念。这个get方法就是通过hash值索引到segment后的segment的get方法。首先查找entry,如果从entry里取值,检测下这个值有没有过期,逻辑就在getLive方法里。如果过期,那么会对应的entry以及值删掉,同时返回null,返回null就会走到后面的lockedGetOrLoad方法。否则就会调用scheduleRefresh方法去刷新值。这个和之前接口文档里的描述一致。

看下lockedGetOrLoad:

V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
  boolean createNewEntry = true;
  lock();
  try {
    long now = map.ticker.read();
    int index = hash & (table.length() - 1);
    ReferenceEntry<K, V> first = table.get(index);
    for (e = first; e != null; e = e.getNext()) {
      K entryKey = e.getKey();
      if (e.getHash() == hash
          && entryKey != null
          && map.keyEquivalence.equivalent(key, entryKey)) {
        valueReference = e.getValueReference();
        ... 
      }
    }
  }

  if (createNewEntry) {
    try {
      synchronized (e) {
        return loadSync(key, hash, loadingValueReference, loader);
      }
    }
  } else {
    return waitForLoadingValue(e, key, valueReference);
  }
}

省略了一堆代码。。。先上锁,第一个获取锁的线程进来会创建一个entry,如果创建成功,会调用loadSync函数去阻塞式load值,其余的线程只能wait,也就是waitForLoadingValue方法。

loadSync函数:

V loadSync(
    K key,
    int hash,
    LoadingValueReference<K, V> loadingValueReference,
    CacheLoader<? super K, V> loader)
    throws ExecutionException {
  ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
  return getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
}

public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
  try {
    stopwatch.start();
    V previousValue = oldValue.get();
    if (previousValue == null) {
      V newValue = loader.load(key);
      return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
    }
    ListenableFuture<V> newValue = loader.reload(key, previousValue);
    if (newValue == null) {
      return Futures.immediateFuture(null);
    }
    // To avoid a race, make sure the refreshed value is set into loadingValueReference
    // *before* returning newValue from the cache query.
    return transform(
        newValue,
        new com.google.common.base.Function<V, V>() {
          @Override
          public V apply(V newValue) {
            LoadingValueReference.this.set(newValue);
            return newValue;
          }
        },
        directExecutor());
  } catch (Throwable t) {
    ListenableFuture<V> result = setException(t) ? futureValue : fullyFailedFuture(t);
    if (t instanceof InterruptedException) {
      Thread.currentThread().interrupt();
    }
    return result;
  }
}

loadSync会调用loadFuture方法,loadFuture会先判断当前是否有就值,如果有说明是refresh过来的,否则是load过来的。对于后者,需要直接拿到返回值,所以就直接调用loader的load方法取加载值,所以是一个同步阻塞过程。对于前者,因为可能是异步的,所以调用的是loader的reload防范,拿到的是一个future。然后对于这个future使用transform对接过做了转换,也就是一个Function实现类里的逻辑,将newValue设置到entry里。这样便完成了刷新动作。这里是不是异步就取决于reload函数的实现了。

最后我们再看下getAll方法,也就是同时获取多个key的值。

  ImmutableMap<K, V> getAll(Iterable<? extends K> keys) throws ExecutionException {
    int hits = 0;
    int misses = 0;

    Map<K, V> result = Maps.newLinkedHashMap();
    Set<K> keysToLoad = Sets.newLinkedHashSet();
    for (K key : keys) {
      V value = get(key);
      if (!result.containsKey(key)) {
        result.put(key, value);
        if (value == null) {
          misses++;
          keysToLoad.add(key);
        } else {
          hits++;
        }
      }
    }

    try {
      if (!keysToLoad.isEmpty()) {
        try {
          Map<K, V> newEntries = loadAll(keysToLoad, defaultLoader);
          for (K key : keysToLoad) {
            V value = newEntries.get(key);
            if (value == null) {
              throw new InvalidCacheLoadException("loadAll failed to return a value for " + key);
            }
            result.put(key, value);
          }
        } catch (UnsupportedLoadingOperationException e) {
          // loadAll not implemented, fallback to load
          for (K key : keysToLoad) {
            misses--; // get will count this miss
            result.put(key, get(key, defaultLoader));
          }
        }
      }
      return ImmutableMap.copyOf(result);
    } finally {
      globalStatsCounter.recordHits(hits);
      globalStatsCounter.recordMisses(misses);
    }
  }

这里先逐个调用get,看是否有值,这个get和之前分析的get不是同一个。对于没有值的key,统一收集起来,调用loadAll方法,loadAll会调用loader的loadAll方法,默认的loader的实现是抛一个UnsupportedLoadingOperationException类型异常,然后这里会catch住,之后便会调用我们前面分析的get方法逐个load。所以默认的loadAll等价于多次load。

 类似资料: