Guava Cache是google的java扩展包中的一个模块,主要提供缓存服务,整体实现比较简单,单模块的核心源文件不超过20个,花几个小时可以看个大概。关于Guava Cache的使用介绍可以参考github上的wiki:
https://github.com/google/guava/wiki/CachesExplained
Cache的实现类似ConcurrentHashMap,在其基础上丰富了数据加载和刷新、基于多种策略的数据清除以及一些额外的统计功能。从源码可以看出,ConcurrentHashMap在JDK1.8以后再次进行了优化,取消了segment,并且添加了单向链表至红黑树的转化,效率得到提升,而Guava Cache仍然是老的实现,在效率上肯定不如ConcurrentHashMap,接下来源码解析会讲到。
Guava Cache暴露给用户的以下几个类/接口:<I>LoadingCache,<I>Cache,<I>RemovalListener,<I>CacheLoader,<I>Weigher, CacheBuilder.
Cache
public interface Cache<K, V> {
@Nullable
V getIfPresent(@CompatibleWith("K") Object key);
V get(K key, Callable<? extends V> loader) throws ExecutionException;
ImmutableMap<K, V> getAllPresent(Iterable<?> keys);
void put(K key, V value);
void putAll(Map<? extends K, ? extends V> m);
void invalidate(@CompatibleWith("K") Object key);
void invalidateAll(Iterable<?> keys);
void invalidateAll();
long size();
CacheStats stats();
ConcurrentMap<K, V> asMap();
void cleanUp();
}
Cache接口包含了用户对缓存的基本操作,由LocalManualCache实现。
LoadingCache
public interface LoadingCache<K, V> extends Cache<K, V>, Function<K, V> {
V get(K key) throws ExecutionException;
V getUnchecked(K key);
ImmutableMap<K, V> getAll(Iterable<? extends K> keys) throws ExecutionException;
@Deprecated
@Override
V apply(K key);
void refresh(K key);
@Override
ConcurrentMap<K, V> asMap();
}
方便阅读,去除掉了注释,在Cache的基础上扩展,添加了自动加载和刷新逻辑,由LocalLoadingCache实现。
CacheBuilder
用于创建LoadingCache或者Cache,通过调用build()或者build(CacheLoader<? super K, V> loader)来创建一个缓存实例,前者创建一个Cache, 后者创建一个LoadingCache. 下面所有CacheBuilder可配置的项目, 其中部分项目不能同时配置:
Cache cache = CacheBuilder
.newBuilder()
.concurrencyLevel(1)
.expireAfterAccess(Duration.ofSeconds(10))
.expireAfterAccess(10, TimeUnit.SECONDS) /* 1. 访问后生存时间 */
.expireAfterWrite(10, TimeUnit.SECONDS) /* 2. 写后生存时间 */
.initialCapacity(16) /* 3. 初始容量 */
.maximumSize(100) /* 4. 最大容量 */
.maximumWeight(1000) /* 5. 最大weight */
.recordStats() /* 6. 记录统计数据 */
.refreshAfterWrite(10, TimeUnit.SECONDS) /* 7. 写后刷新 */
.removalListener(new RemovalListener<Object, Object>() {
@Override
public void onRemoval(RemovalNotification notification) {
// notify
}
}) /* 8. 缓存回收监听 */
.softValues() /* 9. Value软引用 */
.ticker(Ticker.systemTicker()) /* 10. ticker时钟源 */
.weakKeys() /* 11. Key软引用 */
.weakValues() /* 12. Value软引用 */
.weigher(new Weigher<Object, Object>() {
@Override
public int weigh(Object key, Object value) {
return 0;
}
}) /* 13. weigher */
.build();
CacheLoader、Weigher和RemovalListener三个接口比较简单,使用时只要实现对应的一至两个接口即可,参考使用手册。下面详细介绍下缓存实现源码。
Guava Cache的缓存实现主要集中在LocalCache.class中,这个文件有将近五千行代码,里面有非常多的内部类。LocalCache实现了ConcurrentMap和Map接口,同1.8以前的HashMap一样,一个LocalCache管理多个Segment,每个Segment是一个Hashtable的结构,读写操作时,通过key的哈希找到对应的segment,再通过哈希找到对应的segment上的index,相当于做了两次哈希。当不同的key映射到同一个哈希节点时,则在该节点形成一个单向链表,新插入的数据作为表头节点。以一个Put操作的代码为例:
@Nullable
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();/*对整个put逻辑加锁,而非对单条记录加锁*/
try {
long now = map.ticker.read();
/*在写之前,先做两件事,1.清理掉弱引用已经被GC掉的数据,2.清理掉已经过期的数据*/
preWriteCleanup(now);
int newCount = this.count + 1;
if (newCount > this.threshold) {
/*达到最大则扩容*/
expand();
newCount = this.count + 1;
}
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
/*通过&操作找到segment上的index*/
int index = hash & (table.length() - 1);
/*找到指定哈希节点的头结点*/
ReferenceEntry<K, V> first = table.get(index);
/*遍历单向联表,看要插入的Key是否存在*/
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
/*找到了*/
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
if (entryValue == null) {
++modCount;
if (valueReference.isActive()) {
/*Value被回收了,发送COLLECTED通知*/
enqueueNotification(
key, hash, entryValue, valueReference.getWeight(), RemovalCause.COLLECTED);
setValue(e, key, value, now);
newCount = this.count; // count remains unchanged
} else {
/*直接赋值*/
setValue(e, key, value, now);
newCount = this.count + 1;
}
this.count = newCount; // write-volatile
/*触发缓存回收*/
evictEntries(e);
return null;
} else if (onlyIfAbsent) {
/*统计操作(访问时间、写入时间等)*/
recordLockedRead(e, now);
return entryValue;
} else {
++modCount;
enqueueNotification(
/*发送REPLACED通知*/
key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);
setValue(e, key, value, now);
/*触发缓存回收*/
evictEntries(e);
return entryValue;
}
}
}
/*没找到,创建一个节点*/
++modCount;
/*此处根据引用类型的不同配置,调用的是不同实现的newEntry方法*/
ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
setValue(newEntry, key, value, now);
table.set(index, newEntry);
newCount = this.count + 1;
this.count = newCount; // write-volatile
/*触发缓存回收*/
evictEntries(newEntry);
return null;
} finally {
/*操作完成,解锁*/
unlock();
/*后续操作,发送通知等*/
postWriteCleanup();
}
}
结合代码和注释,Put时,步骤:
1. 加锁(线程安全)
2. 清理弱引用、清理过期数据
3. (可能的操作)扩容
4. 遍历搜索已存在的Key
5. 找到, 判断Value是否已被回收
6. 赋值,触发Replaced通知。
7. 没找到, 创建新的Entry, 根据K/V的引用类型(weak/strong)创建新的Entry.
8. 触发缓存回收(maxsize, maxweight等的判断)
9. 后续操作,发通知等。
接下来继续看细节实现,
清理弱引用、清理过期数据(preWriteCleanUp):
@GuardedBy("this")
void preWriteCleanup(long now) {
runLockedCleanup(now);
}
void runLockedCleanup(long now) {
if (tryLock()) {
try {
/*清理弱引用*/
drainReferenceQueues();
/*清理过期数据*/
expireEntries(now);
readCount.set(0);
} finally {
unlock();
}
}
}
@GuardedBy("this")
void drainReferenceQueues() {
/*是否配置了WeakKey*/
if (map.usesKeyReferences()) {
/*清理WeakKey*/
drainKeyReferenceQueue();
}
/*是否配置了WeakValue*/
if (map.usesValueReferences()) {
/*清理WeakValue*/
drainValueReferenceQueue();
}
}
@GuardedBy("this")
void drainKeyReferenceQueue() {
/*遍历清理WeakKey*/
Reference<? extends K> ref;
int i = 0;
while ((ref = keyReferenceQueue.poll()) != null) {
@SuppressWarnings("unchecked")
ReferenceEntry<K, V> entry = (ReferenceEntry<K, V>) ref;
map.reclaimKey(entry);
if (++i == DRAIN_MAX) {
break;
}
}
}
@GuardedBy("this")
void drainValueReferenceQueue() {
/*遍历清理WeakValue*/
Reference<? extends V> ref;
int i = 0;
while ((ref = valueReferenceQueue.poll()) != null) {
@SuppressWarnings("unchecked")
ValueReference<K, V> valueReference = (ValueReference<K, V>) ref;
map.reclaimValue(valueReference);
if (++i == DRAIN_MAX) {
break;
}
}
}
@GuardedBy("this")
void expireEntries(long now) {
drainRecencyQueue();
/*遍历清理writeQueue和AccessQueue*/
ReferenceEntry<K, V> e;
while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
throw new AssertionError();
}
}
while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
throw new AssertionError();
}
}
}
以上代码涉及到了四个队列。
keyReferenceQueue和valueReferenceQueue:了解过Java的WeakReference的话,都知道,使用WeakReference通过注册一个ReferenceQueue,将已没有强引用会被GC清除掉的弱引用都保存在queue中。如果配置了weakKeys和weakValues,那么在写之前遍历这两个Queue,将弱引用清除掉。
writeQueue和accessQueue:配置了expireAfterWrite和expireAfterAccess的Cache,会将写过/访问过的数据保存在对应queue中,在下一次写前,遍历清除已过期的数据。
扩容expand():
@GuardedBy("this")
void expand() {
AtomicReferenceArray<ReferenceEntry<K, V>> oldTable = table;
int oldCapacity = oldTable.length();
if (oldCapacity >= MAXIMUM_CAPACITY) {
return;
}
int newCount = count;
/*容量比原先扩大一倍*/
AtomicReferenceArray<ReferenceEntry<K, V>> newTable = newEntryArray(oldCapacity << 1);
threshold = newTable.length() * 3 / 4;
int newMask = newTable.length() - 1;
/*遍历拷贝Entry,根据引用类型的不同配置,有不同的CopyEntry实现*/
for (int oldIndex = 0; oldIndex < oldCapacity; ++oldIndex) {
// We need to guarantee that any existing reads of old Map can
// proceed. So we cannot yet null out each bin.
ReferenceEntry<K, V> head = oldTable.get(oldIndex);
if (head != null) {
ReferenceEntry<K, V> next = head.getNext();
int headIndex = head.getHash() & newMask;
// Single node on list
if (next == null) {
newTable.set(headIndex, head);
} else {
// Reuse the consecutive sequence of nodes with the same target
// index from the end of the list. tail points to the first
// entry in the reusable list.
ReferenceEntry<K, V> tail = head;
int tailIndex = headIndex;
for (ReferenceEntry<K, V> e = next; e != null; e = e.getNext()) {
int newIndex = e.getHash() & newMask;
if (newIndex != tailIndex) {
// The index changed. We'll need to copy the previous entry.
tailIndex = newIndex;
tail = e;
}
}
newTable.set(tailIndex, tail);
// Clone nodes leading up to the tail.
for (ReferenceEntry<K, V> e = head; e != tail; e = e.getNext()) {
int newIndex = e.getHash() & newMask;
ReferenceEntry<K, V> newNext = newTable.get(newIndex);
ReferenceEntry<K, V> newFirst = copyEntry(e, newNext);
if (newFirst != null) {
newTable.set(newIndex, newFirst);
} else {
removeCollectedEntry(e);
newCount--;
}
}
}
}
}
table = newTable;
this.count = newCount;
}
触发缓存回收EvictEntries():
@GuardedBy("this")
void evictEntries(ReferenceEntry<K, V> newest) {
if (!map.evictsBySize()) {
return;
}
drainRecencyQueue();
/*maxWeight判断*/
if (newest.getValueReference().getWeight() > maxSegmentWeight) {
if (!removeEntry(newest, newest.getHash(), RemovalCause.SIZE)) {
throw new AssertionError();
}
}
/*触发回收,找可回收对象*/
while (totalWeight > maxSegmentWeight) {
ReferenceEntry<K, V> e = getNextEvictable();
if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) {
throw new AssertionError();
}
}
}
@GuardedBy("this")
ReferenceEntry<K, V> getNextEvictable() {
/*从LRU找,FIFO队列,最先进的被淘汰*/
for (ReferenceEntry<K, V> e : accessQueue) {
int weight = e.getValueReference().getWeight();
if (weight > 0) {
return e;
}
}
throw new AssertionError();
}
后续操作postWriteCleanup():
void postWriteCleanup() {
runUnlockedCleanup();
}
void runUnlockedCleanup() {
// locked cleanup may generate notifications we can send unlocked
if (!isHeldByCurrentThread()) {
map.processPendingNotifications();
}
}
void processPendingNotifications() {
RemovalNotification<K, V> notification;
while ((notification = removalNotificationQueue.poll()) != null) {
try {
removalListener.onRemoval(notification);
} catch (Throwable e) {
logger.log(Level.WARNING, "Exception thrown by removal listener", e);
}
}
}
比较简单就不解释了。
再补充下前面的newEntry和setValue两步操作
setValue源码如下,当配置weakValues()时,setValue为对value的弱引用。
@GuardedBy("this")
void setValue(ReferenceEntry<K, V> entry, K key, V value, long now) {
ValueReference<K, V> previous = entry.getValueReference();
int weight = map.weigher.weigh(key, value);
checkState(weight >= 0, "Weights must be non-negative");
/*referenceValue不同的引用配置有不同的实现方式*/
ValueReference<K, V> valueReference =
map.valueStrength.referenceValue(this, entry, value, weight);
entry.setValueReference(valueReference);
/*记录写操作*/
recordWrite(entry, weight, now);
previous.notifyNewValue(value);
}
/*map.valueStrength.referenceValue的WEAK实现*/
<K, V> ValueReference<K, V> referenceValue(
Segment<K, V> segment, ReferenceEntry<K, V> entry, V value, int weight) {
return (weight == 1)
? new WeakValueReference<K, V>(segment.valueReferenceQueue, value, entry)
: new WeightedWeakValueReference<K, V>(
segment.valueReferenceQueue, value, entry, weight);
}
/*map.valueStrength.referenceValue的STRONG实现*/
<K, V> ValueReference<K, V> referenceValue(
Segment<K, V> segment, ReferenceEntry<K, V> entry, V value, int weight) {
return (weight == 1)
? new StrongValueReference<K, V>(value)
: new WeightedStrongValueReference<K, V>(value, weight);
}
newEntry源码如下,当配置为weakKeys时,创建的Entry为WeakEntry,其中的key是弱引用对象。
STRONG {
@Override
<K, V> ReferenceEntry<K, V> newEntry(
Segment<K, V> segment, K key, int hash, @Nullable ReferenceEntry<K, V> next) {
return new StrongEntry<>(key, hash, next);
}
},
WEAK {
@Override
<K, V> ReferenceEntry<K, V> newEntry(
Segment<K, V> segment, K key, int hash, @Nullable ReferenceEntry<K, V> next) {
return new WeakEntry<>(segment.keyReferenceQueue, key, hash, next);
}
},
再看读操作,以get为例:
V get(Object key, int hash) {
try {
if (count != 0) { // read-volatile
long now = map.ticker.read();
/*找到Entry*/
ReferenceEntry<K, V> e = getLiveEntry(key, hash, now);
if (e == null) {
return null;
}
V value = e.getValueReference().get();
if (value != null) {
/*记录读操作*/
recordRead(e, now);
/*触发刷新*/
return scheduleRefresh(e, e.getKey(), hash, value, now, map.defaultLoader);
}
tryDrainReferenceQueues();
}
return null;
} finally {
/*触发回收*/
postReadCleanup();
}
}
可以看到,Get整个流程主要有三个步骤
1. 从segment中找到头结点,再从链表中找到Entry
2. 触发刷新
3. 触发回收
分三部分看源码,第一部分,看getLiveEntry源码:
ReferenceEntry<K, V> getLiveEntry(Object key, int hash, long now) {
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e == null) {
return null;
} else if (map.isExpired(e, now)) {
tryExpireEntries(now);
return null;
}
return e;
}
ReferenceEntry<K, V> getEntry(Object key, int hash) {
for (ReferenceEntry<K, V> e = getFirst(hash); e != null; e = e.getNext()) {
if (e.getHash() != hash) {
continue;
}
K entryKey = e.getKey();
if (entryKey == null) {
tryDrainReferenceQueues();
continue;
}
if (map.keyEquivalence.equivalent(key, entryKey)) {
return e;
}
}
return null;
}
ReferenceEntry<K, V> getFirst(int hash) {
// read this volatile field only once
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
return table.get(hash & (table.length() - 1));
}
比较好理解,第二部分,接下来看scheduleRefresh:
V scheduleRefresh(
ReferenceEntry<K, V> entry,
K key,
int hash,
V oldValue,
long now,
CacheLoader<? super K, V> loader) {
/*1. 是否配置了刷新 2.是否到了刷新时间 3.是否已经在刷新*/
if (map.refreshes()
&& (now - entry.getWriteTime() > map.refreshNanos)
&& !entry.getValueReference().isLoading()) {
/*是,是,否,开始刷新*/
V newValue = refresh(key, hash, loader, true);
if (newValue != null) {
return newValue;
}
}
return oldValue;
}
V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) {
/*将正在刷新的值插入“正在刷新队列”*/
final LoadingValueReference<K, V> loadingValueReference =
insertLoadingValueReference(key, hash, checkTime);
if (loadingValueReference == null) {
return null;
}
/*异步刷新*/
ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader);
if (result.isDone()) {
try {
return Uninterruptibles.getUninterruptibly(result);
} catch (Throwable t) {
// don't let refresh exceptions propagate; error was already logged
}
}
return null;
}
ListenableFuture<V> loadAsync(
final K key,
final int hash,
final LoadingValueReference<K, V> loadingValueReference,
CacheLoader<? super K, V> loader) {
final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
loadingFuture.addListener(
new Runnable() {
@Override
public void run() {
try {
/*重新加载数据*/
getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during refresh", t);
loadingValueReference.setException(t);
}
}
},
directExecutor());
return loadingFuture;
}
看loadFuture源码,旧值为null,则调用load, 旧值非null,则调用reload.
public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
try {
stopwatch.start();
V previousValue = oldValue.get();
if (previousValue == null) {
V newValue = loader.load(key);
return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
}
ListenableFuture<V> newValue = loader.reload(key, previousValue);
if (newValue == null) {
return Futures.immediateFuture(null);
}
// To avoid a race, make sure the refreshed value is set into loadingValueReference
// *before* returning newValue from the cache query.
return transform(
newValue,
new com.google.common.base.Function<V, V>() {
@Override
public V apply(V newValue) {
LoadingValueReference.this.set(newValue);
return newValue;
}
},
directExecutor());
} catch (Throwable t) {
ListenableFuture<V> result = setException(t) ? futureValue : fullyFailedFuture(t);
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
return result;
}
}
第三部分,看postReadCleanUp()源码:
void postReadCleanup() {
/*DRAIN_THREASHOLD值为0X3F*/
/*每读64次清理一次*/
if ((readCount.incrementAndGet() & DRAIN_THRESHOLD) == 0) {
cleanUp();
}
}
总结,GuavaCache的实现并不复杂,过期数据等的清理都是在下一次写操作时执行,或者第64次读操作,但是基本很少会发生在读操作,因为写操作时会清零读操作次数,读操作每次都会检查刷新时间并且触发异步刷新。