LoadingCache是guava开发包下的一款十分实用的本地缓存工具类。
什么时候用?
对于一些十分常用的热点数据,可以考虑加本地缓存。比如一些热点新闻的数据,否则会导致存储热点问题。比如redis会有热key。
接下来看一下LoadingCache接口的主要方法:
public interface LoadingCache<K, V> extends Cache<K, V>, Function<K, V> {
V get(K key) throws ExecutionException;
V getUnchecked(K key);
ImmutableMap<K, V> getAll(Iterable<? extends K> keys) throws ExecutionException;
void refresh(K key);
}
get:先看缓存中的数据是否过期,如果过期了,阻塞调用load方法加载数据;如果没有过期,再看是否需要刷新,如果需要,调用reload方法尽可能地以异步方式刷新数据,如果没刷完,返回旧值;为什么是尽可能?需要以异步方式重写reload方法,guava提供的reload默认实现是同步的;
getUnchecked:同get,只是将受检异常转为非受检了;
getAll:会调用loadAll加载数据,默认情况下等价于多次get;
refresh:会调用reload方法,尽可能地以异步方式刷新数据,同get方法里的过期场景;
看一个简答的例子:
LoadingCache<String, Integer> loadingCache = CacheBuilder
.newBuilder()
.refreshAfterWrite(2, TimeUnit.SECONDS)
.expireAfterWrite(5, TimeUnit.SECONDS)
.build(new CacheLoader<String, Integer>() {
@Override
public Integer load(String s) throws Exception {
System.out.println("start to load");
Thread.sleep(5000);
System.out.println("end load");
return ThreadLocalRandom.current().nextInt(10);
}
@Override
public ListenableFuture<Integer> reload(String key, Integer oldValue) throws Exception {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
return executorService.submit(() -> load(key));
// return super.reload(key, oldValue);
}
});
refresh超时参数:如果超过阈值会调用reload方法刷新缓存;
expire超时参数:如果超过阈值会调用load方法阻塞时load缓存;
这俩参数的设定有一个比价好的策略:expire长于refresh。这样做的目的在于,尽量避免出现因为expire导致的阻塞,因为如果load需要较长时间,那么业务上就会感知到,这会影响用户体验。如果在expire前先refresh了,之后就不需要expire了,而refresh本身是异步的,对业务几乎无影响。那能不能只有refresh?也不行。想想一下,如果一个服务长时间没有被访问,突然来了一波请求,此时如果不设置expire,那么异步刷新完成前业务将会读到一个很旧的值,这也不太好,所以还是必须设置expire值。
另外上面的例子中除了实现了必要的load方法外,还覆盖了guava默认的reload方法,提供了一个异步版本的reload简单实现。我们先来看一下默认的reload是怎么做的:
@GwtIncompatible // Futures
public ListenableFuture<V> reload(K key, V oldValue) throws Exception {
checkNotNull(key);
checkNotNull(oldValue);
return Futures.immediateFuture(load(key));
}
可以看到,这里直接调用了load方法去获取返回值,然后将返回值设置到一个ImmediateFuture类型的future中。本质上没有异步执行。
接下来重点看下get和refresh逻辑:
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
try {
if (count != 0) { // read-volatile
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e != null) {
long now = map.ticker.read();
V value = getLiveValue(e, now);
if (value != null) {
recordRead(e, now);
statsCounter.recordHits(1);
return scheduleRefresh(e, key, hash, value, now, loader);
}
ValueReference<K, V> valueReference = e.getValueReference();
if (valueReference.isLoading()) {
return waitForLoadingValue(e, key, valueReference);
}
}
}
return lockedGetOrLoad(key, hash, loader);
} catch (ExecutionException ee) {
}
}
LoadingCache的实现是一个map,里面也有segment的概念。这个get方法就是通过hash值索引到segment后的segment的get方法。首先查找entry,如果从entry里取值,检测下这个值有没有过期,逻辑就在getLive方法里。如果过期,那么会对应的entry以及值删掉,同时返回null,返回null就会走到后面的lockedGetOrLoad方法。否则就会调用scheduleRefresh方法去刷新值。这个和之前接口文档里的描述一致。
看下lockedGetOrLoad:
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
boolean createNewEntry = true;
lock();
try {
long now = map.ticker.read();
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
valueReference = e.getValueReference();
...
}
}
}
if (createNewEntry) {
try {
synchronized (e) {
return loadSync(key, hash, loadingValueReference, loader);
}
}
} else {
return waitForLoadingValue(e, key, valueReference);
}
}
省略了一堆代码。。。先上锁,第一个获取锁的线程进来会创建一个entry,如果创建成功,会调用loadSync函数去阻塞式load值,其余的线程只能wait,也就是waitForLoadingValue方法。
loadSync函数:
V loadSync(
K key,
int hash,
LoadingValueReference<K, V> loadingValueReference,
CacheLoader<? super K, V> loader)
throws ExecutionException {
ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
return getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
}
public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
try {
stopwatch.start();
V previousValue = oldValue.get();
if (previousValue == null) {
V newValue = loader.load(key);
return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
}
ListenableFuture<V> newValue = loader.reload(key, previousValue);
if (newValue == null) {
return Futures.immediateFuture(null);
}
// To avoid a race, make sure the refreshed value is set into loadingValueReference
// *before* returning newValue from the cache query.
return transform(
newValue,
new com.google.common.base.Function<V, V>() {
@Override
public V apply(V newValue) {
LoadingValueReference.this.set(newValue);
return newValue;
}
},
directExecutor());
} catch (Throwable t) {
ListenableFuture<V> result = setException(t) ? futureValue : fullyFailedFuture(t);
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
return result;
}
}
loadSync会调用loadFuture方法,loadFuture会先判断当前是否有就值,如果有说明是refresh过来的,否则是load过来的。对于后者,需要直接拿到返回值,所以就直接调用loader的load方法取加载值,所以是一个同步阻塞过程。对于前者,因为可能是异步的,所以调用的是loader的reload防范,拿到的是一个future。然后对于这个future使用transform对接过做了转换,也就是一个Function实现类里的逻辑,将newValue设置到entry里。这样便完成了刷新动作。这里是不是异步就取决于reload函数的实现了。
最后我们再看下getAll方法,也就是同时获取多个key的值。
ImmutableMap<K, V> getAll(Iterable<? extends K> keys) throws ExecutionException {
int hits = 0;
int misses = 0;
Map<K, V> result = Maps.newLinkedHashMap();
Set<K> keysToLoad = Sets.newLinkedHashSet();
for (K key : keys) {
V value = get(key);
if (!result.containsKey(key)) {
result.put(key, value);
if (value == null) {
misses++;
keysToLoad.add(key);
} else {
hits++;
}
}
}
try {
if (!keysToLoad.isEmpty()) {
try {
Map<K, V> newEntries = loadAll(keysToLoad, defaultLoader);
for (K key : keysToLoad) {
V value = newEntries.get(key);
if (value == null) {
throw new InvalidCacheLoadException("loadAll failed to return a value for " + key);
}
result.put(key, value);
}
} catch (UnsupportedLoadingOperationException e) {
// loadAll not implemented, fallback to load
for (K key : keysToLoad) {
misses--; // get will count this miss
result.put(key, get(key, defaultLoader));
}
}
}
return ImmutableMap.copyOf(result);
} finally {
globalStatsCounter.recordHits(hits);
globalStatsCounter.recordMisses(misses);
}
}
这里先逐个调用get,看是否有值,这个get和之前分析的get不是同一个。对于没有值的key,统一收集起来,调用loadAll方法,loadAll会调用loader的loadAll方法,默认的loader的实现是抛一个UnsupportedLoadingOperationException类型异常,然后这里会catch住,之后便会调用我们前面分析的get方法逐个load。所以默认的loadAll等价于多次load。