当前位置: 首页 > 工具软件 > race-cache > 使用案例 >

工作笔记-GuavaCache源码分析

乐刚毅
2023-12-01

Guava Cache


Guava Cache是google的java扩展包中的一个模块,主要提供缓存服务,整体实现比较简单,单模块的核心源文件不超过20个,花几个小时可以看个大概。关于Guava Cache的使用介绍可以参考github上的wiki:

https://github.com/google/guava/wiki/CachesExplained

Cache的实现类似ConcurrentHashMap,在其基础上丰富了数据加载和刷新、基于多种策略的数据清除以及一些额外的统计功能。从源码可以看出,ConcurrentHashMap在JDK1.8以后再次进行了优化,取消了segment,并且添加了单向链表至红黑树的转化,效率得到提升,而Guava Cache仍然是老的实现,在效率上肯定不如ConcurrentHashMap,接下来源码解析会讲到。

Guava Cache暴露给用户的以下几个类/接口:<I>LoadingCache,<I>Cache,<I>RemovalListener,<I>CacheLoader,<I>Weigher, CacheBuilder.

 


Cache

public interface Cache<K, V> {
 @Nullable
  V getIfPresent(@CompatibleWith("K") Object key);

  V get(K key, Callable<? extends V> loader) throws ExecutionException;

  ImmutableMap<K, V> getAllPresent(Iterable<?> keys);
  
  void put(K key, V value);

  void putAll(Map<? extends K, ? extends V> m);

  void invalidate(@CompatibleWith("K") Object key);

  void invalidateAll(Iterable<?> keys);

  void invalidateAll();

  long size();

  CacheStats stats();

  ConcurrentMap<K, V> asMap();

  void cleanUp();

}

Cache接口包含了用户对缓存的基本操作,由LocalManualCache实现。


LoadingCache

public interface LoadingCache<K, V> extends Cache<K, V>, Function<K, V> {

  V get(K key) throws ExecutionException;

  V getUnchecked(K key);

  ImmutableMap<K, V> getAll(Iterable<? extends K> keys) throws ExecutionException;

  @Deprecated
  @Override
  V apply(K key);

  void refresh(K key);

  @Override
  ConcurrentMap<K, V> asMap();
}

方便阅读,去除掉了注释,在Cache的基础上扩展,添加了自动加载和刷新逻辑,由LocalLoadingCache实现。


CacheBuilder

用于创建LoadingCache或者Cache,通过调用build()或者build(CacheLoader<? super K, V> loader)来创建一个缓存实例,前者创建一个Cache, 后者创建一个LoadingCache.  下面所有CacheBuilder可配置的项目, 其中部分项目不能同时配置:

Cache cache = CacheBuilder
        .newBuilder()
        .concurrencyLevel(1)
        .expireAfterAccess(Duration.ofSeconds(10))
        .expireAfterAccess(10, TimeUnit.SECONDS) /* 1. 访问后生存时间 */
        .expireAfterWrite(10, TimeUnit.SECONDS) /* 2. 写后生存时间 */
        .initialCapacity(16) /* 3. 初始容量 */
        .maximumSize(100) /* 4. 最大容量 */
        .maximumWeight(1000) /* 5. 最大weight */
        .recordStats() /* 6. 记录统计数据 */
        .refreshAfterWrite(10, TimeUnit.SECONDS) /* 7. 写后刷新 */
        .removalListener(new RemovalListener<Object, Object>() {
        	@Override
        	public void onRemoval(RemovalNotification notification) {
        		// notify
        	}
        }) /* 8. 缓存回收监听 */
        .softValues() /* 9. Value软引用 */
        .ticker(Ticker.systemTicker()) /* 10. ticker时钟源 */
        .weakKeys() /* 11. Key软引用 */
        .weakValues() /* 12. Value软引用 */
        .weigher(new Weigher<Object, Object>() {
	        @Override
	        public int weigh(Object key, Object value) {
	        	return 0;
	        }

        }) /* 13. weigher */
        .build();		

 


CacheLoader、Weigher和RemovalListener三个接口比较简单,使用时只要实现对应的一至两个接口即可,参考使用手册。下面详细介绍下缓存实现源码。

Guava Cache的缓存实现主要集中在LocalCache.class中,这个文件有将近五千行代码,里面有非常多的内部类。LocalCache实现了ConcurrentMap和Map接口,同1.8以前的HashMap一样,一个LocalCache管理多个Segment,每个Segment是一个Hashtable的结构,读写操作时,通过key的哈希找到对应的segment,再通过哈希找到对应的segment上的index,相当于做了两次哈希。当不同的key映射到同一个哈希节点时,则在该节点形成一个单向链表,新插入的数据作为表头节点。以一个Put操作的代码为例:

写操作

 @Nullable
    V put(K key, int hash, V value, boolean onlyIfAbsent) {
  
      lock();/*对整个put逻辑加锁,而非对单条记录加锁*/
      try {
        long now = map.ticker.read();
        /*在写之前,先做两件事,1.清理掉弱引用已经被GC掉的数据,2.清理掉已经过期的数据*/
        preWriteCleanup(now);

        int newCount = this.count + 1;
        if (newCount > this.threshold) { 
          /*达到最大则扩容*/
          expand();
          newCount = this.count + 1;
        }

        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        /*通过&操作找到segment上的index*/
        int index = hash & (table.length() - 1);
        /*找到指定哈希节点的头结点*/
        ReferenceEntry<K, V> first = table.get(index);

        /*遍历单向联表,看要插入的Key是否存在*/
        for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash
              && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) {
            /*找到了*/

            ValueReference<K, V> valueReference = e.getValueReference();
            V entryValue = valueReference.get();

            if (entryValue == null) {
              ++modCount;
              if (valueReference.isActive()) {
                /*Value被回收了,发送COLLECTED通知*/
                enqueueNotification(
                    key, hash, entryValue, valueReference.getWeight(), RemovalCause.COLLECTED);
                setValue(e, key, value, now);
                newCount = this.count; // count remains unchanged
              } else {
                /*直接赋值*/
                setValue(e, key, value, now);
                newCount = this.count + 1;
              }
              this.count = newCount; // write-volatile
              /*触发缓存回收*/
              evictEntries(e);
              return null;
            } else if (onlyIfAbsent) {
              /*统计操作(访问时间、写入时间等)*/
              recordLockedRead(e, now);
              return entryValue;
            } else {
              
              ++modCount;
              enqueueNotification(
              /*发送REPLACED通知*/
                  key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);
              setValue(e, key, value, now);
              /*触发缓存回收*/
              evictEntries(e);
              return entryValue;
            }
          }
        }

        /*没找到,创建一个节点*/
        ++modCount;
        /*此处根据引用类型的不同配置,调用的是不同实现的newEntry方法*/
        ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
        setValue(newEntry, key, value, now);
        table.set(index, newEntry);
        newCount = this.count + 1;
        this.count = newCount; // write-volatile
        /*触发缓存回收*/
        evictEntries(newEntry);
        return null;
      } finally {
        /*操作完成,解锁*/
        unlock();
        /*后续操作,发送通知等*/
        postWriteCleanup();
      }
    }

结合代码和注释,Put时,步骤:

1. 加锁(线程安全)

2. 清理弱引用、清理过期数据

3. (可能的操作)扩容

4. 遍历搜索已存在的Key

5. 找到, 判断Value是否已被回收

6. 赋值,触发Replaced通知。

7. 没找到, 创建新的Entry, 根据K/V的引用类型(weak/strong)创建新的Entry.

8. 触发缓存回收(maxsize, maxweight等的判断)

9. 后续操作,发通知等。

接下来继续看细节实现,

清理弱引用、清理过期数据(preWriteCleanUp):

    @GuardedBy("this")
    void preWriteCleanup(long now) {
      runLockedCleanup(now);
    }


    void runLockedCleanup(long now) {
      if (tryLock()) {
        try {
          /*清理弱引用*/
          drainReferenceQueues();
          /*清理过期数据*/
          expireEntries(now); 
          readCount.set(0);
        } finally {
          unlock();
        }
      }
    }

    @GuardedBy("this")
    void drainReferenceQueues() {
      /*是否配置了WeakKey*/
      if (map.usesKeyReferences()) {
        /*清理WeakKey*/
        drainKeyReferenceQueue();
      }
      /*是否配置了WeakValue*/
      if (map.usesValueReferences()) {
        /*清理WeakValue*/
        drainValueReferenceQueue();
      }
    }

    @GuardedBy("this")
    void drainKeyReferenceQueue() {
      /*遍历清理WeakKey*/
      Reference<? extends K> ref;
      int i = 0;
      while ((ref = keyReferenceQueue.poll()) != null) {
        @SuppressWarnings("unchecked")
        ReferenceEntry<K, V> entry = (ReferenceEntry<K, V>) ref;
        map.reclaimKey(entry);
        if (++i == DRAIN_MAX) {
          break;
        }
      }
    }

    @GuardedBy("this")
    void drainValueReferenceQueue() {
      /*遍历清理WeakValue*/
      Reference<? extends V> ref;
      int i = 0;
      while ((ref = valueReferenceQueue.poll()) != null) {
        @SuppressWarnings("unchecked")
        ValueReference<K, V> valueReference = (ValueReference<K, V>) ref;
        map.reclaimValue(valueReference);
        if (++i == DRAIN_MAX) {
          break;
        }
      }
    }


    @GuardedBy("this")
    void expireEntries(long now) {
      drainRecencyQueue();

      /*遍历清理writeQueue和AccessQueue*/
      ReferenceEntry<K, V> e;
      while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
        if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
          throw new AssertionError();
        }
      }
      while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
        if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
          throw new AssertionError();
        }
      }
    }

以上代码涉及到了四个队列。

keyReferenceQueue和valueReferenceQueue:了解过Java的WeakReference的话,都知道,使用WeakReference通过注册一个ReferenceQueue,将已没有强引用会被GC清除掉的弱引用都保存在queue中。如果配置了weakKeys和weakValues,那么在写之前遍历这两个Queue,将弱引用清除掉。

writeQueue和accessQueue:配置了expireAfterWrite和expireAfterAccess的Cache,会将写过/访问过的数据保存在对应queue中,在下一次写前,遍历清除已过期的数据。

扩容expand():

    @GuardedBy("this")
    void expand() {
      AtomicReferenceArray<ReferenceEntry<K, V>> oldTable = table;
      int oldCapacity = oldTable.length();
      if (oldCapacity >= MAXIMUM_CAPACITY) {
        return;
      }


      int newCount = count;
      /*容量比原先扩大一倍*/
      AtomicReferenceArray<ReferenceEntry<K, V>> newTable = newEntryArray(oldCapacity << 1);
      threshold = newTable.length() * 3 / 4;
      int newMask = newTable.length() - 1;
      /*遍历拷贝Entry,根据引用类型的不同配置,有不同的CopyEntry实现*/
      for (int oldIndex = 0; oldIndex < oldCapacity; ++oldIndex) {
        // We need to guarantee that any existing reads of old Map can
        // proceed. So we cannot yet null out each bin.
        ReferenceEntry<K, V> head = oldTable.get(oldIndex);

        if (head != null) {
          ReferenceEntry<K, V> next = head.getNext();
          int headIndex = head.getHash() & newMask;

          // Single node on list
          if (next == null) {
            newTable.set(headIndex, head);
          } else {
            // Reuse the consecutive sequence of nodes with the same target
            // index from the end of the list. tail points to the first
            // entry in the reusable list.
            ReferenceEntry<K, V> tail = head;
            int tailIndex = headIndex;
            for (ReferenceEntry<K, V> e = next; e != null; e = e.getNext()) {
              int newIndex = e.getHash() & newMask;
              if (newIndex != tailIndex) {
                // The index changed. We'll need to copy the previous entry.
                tailIndex = newIndex;
                tail = e;
              }
            }
            newTable.set(tailIndex, tail);

            // Clone nodes leading up to the tail.
            for (ReferenceEntry<K, V> e = head; e != tail; e = e.getNext()) {
              int newIndex = e.getHash() & newMask;
              ReferenceEntry<K, V> newNext = newTable.get(newIndex);
              ReferenceEntry<K, V> newFirst = copyEntry(e, newNext);
              if (newFirst != null) {
                newTable.set(newIndex, newFirst);
              } else {
                removeCollectedEntry(e);
                newCount--;
              }
            }
          }
        }
      }
      table = newTable;
      this.count = newCount;
    }

触发缓存回收EvictEntries():

    @GuardedBy("this")
    void evictEntries(ReferenceEntry<K, V> newest) {
      if (!map.evictsBySize()) {
        return;
      }

      drainRecencyQueue();
      
      /*maxWeight判断*/
      if (newest.getValueReference().getWeight() > maxSegmentWeight) {
        if (!removeEntry(newest, newest.getHash(), RemovalCause.SIZE)) {
          throw new AssertionError();
        }
      }

      /*触发回收,找可回收对象*/
      while (totalWeight > maxSegmentWeight) {
        ReferenceEntry<K, V> e = getNextEvictable();
        if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) {
          throw new AssertionError();
        }
      }
    }

    
    @GuardedBy("this")
    ReferenceEntry<K, V> getNextEvictable() {
      /*从LRU找,FIFO队列,最先进的被淘汰*/
      for (ReferenceEntry<K, V> e : accessQueue) {
        int weight = e.getValueReference().getWeight();
        if (weight > 0) {
          return e;
        }
      }
      throw new AssertionError();
    }

后续操作postWriteCleanup():

void postWriteCleanup() {
      runUnlockedCleanup();
    }


    void runUnlockedCleanup() {
      // locked cleanup may generate notifications we can send unlocked
      if (!isHeldByCurrentThread()) {
        map.processPendingNotifications();
      }
    }

  void processPendingNotifications() {
    RemovalNotification<K, V> notification;
    while ((notification = removalNotificationQueue.poll()) != null) {
      try {
        removalListener.onRemoval(notification);
      } catch (Throwable e) {
        logger.log(Level.WARNING, "Exception thrown by removal listener", e);
      }
    }
  }

比较简单就不解释了。

再补充下前面的newEntry和setValue两步操作

setValue源码如下,当配置weakValues()时,setValue为对value的弱引用。

    @GuardedBy("this")
    void setValue(ReferenceEntry<K, V> entry, K key, V value, long now) {
      ValueReference<K, V> previous = entry.getValueReference();
      int weight = map.weigher.weigh(key, value);
      checkState(weight >= 0, "Weights must be non-negative");

      /*referenceValue不同的引用配置有不同的实现方式*/
      ValueReference<K, V> valueReference =
          map.valueStrength.referenceValue(this, entry, value, weight);
      
      entry.setValueReference(valueReference);
      /*记录写操作*/
      recordWrite(entry, weight, now);
      previous.notifyNewValue(value);
    }

    /*map.valueStrength.referenceValue的WEAK实现*/
      <K, V> ValueReference<K, V> referenceValue(
          Segment<K, V> segment, ReferenceEntry<K, V> entry, V value, int weight) {
        return (weight == 1)
            ? new WeakValueReference<K, V>(segment.valueReferenceQueue, value, entry)
            : new WeightedWeakValueReference<K, V>(
                segment.valueReferenceQueue, value, entry, weight);
      }

    /*map.valueStrength.referenceValue的STRONG实现*/
      <K, V> ValueReference<K, V> referenceValue(
          Segment<K, V> segment, ReferenceEntry<K, V> entry, V value, int weight) {
        return (weight == 1)
            ? new StrongValueReference<K, V>(value)
            : new WeightedStrongValueReference<K, V>(value, weight);
      }


newEntry源码如下,当配置为weakKeys时,创建的Entry为WeakEntry,其中的key是弱引用对象。

 STRONG {
      @Override
      <K, V> ReferenceEntry<K, V> newEntry(
          Segment<K, V> segment, K key, int hash, @Nullable ReferenceEntry<K, V> next) {
        return new StrongEntry<>(key, hash, next);
      }
    },    
WEAK {
      @Override
      <K, V> ReferenceEntry<K, V> newEntry(
          Segment<K, V> segment, K key, int hash, @Nullable ReferenceEntry<K, V> next) {
        return new WeakEntry<>(segment.keyReferenceQueue, key, hash, next);
      }
    },

读操作

再看读操作,以get为例:

   V get(Object key, int hash) {
      try {
        if (count != 0) { // read-volatile
          long now = map.ticker.read();
          /*找到Entry*/
          ReferenceEntry<K, V> e = getLiveEntry(key, hash, now);
          if (e == null) {
            return null;
          }

          V value = e.getValueReference().get();
          if (value != null) {
            /*记录读操作*/
            recordRead(e, now);
            /*触发刷新*/
            return scheduleRefresh(e, e.getKey(), hash, value, now, map.defaultLoader);
          }
          tryDrainReferenceQueues();
        }
        return null;
      } finally {
        /*触发回收*/
        postReadCleanup();
      }
    }

可以看到,Get整个流程主要有三个步骤

1. 从segment中找到头结点,再从链表中找到Entry

2. 触发刷新

3. 触发回收

分三部分看源码,第一部分,看getLiveEntry源码:

    ReferenceEntry<K, V> getLiveEntry(Object key, int hash, long now) {
      ReferenceEntry<K, V> e = getEntry(key, hash);
      if (e == null) {
        return null;
      } else if (map.isExpired(e, now)) {
        tryExpireEntries(now);
        return null;
      }
      return e;
    }

    ReferenceEntry<K, V> getEntry(Object key, int hash) {
      for (ReferenceEntry<K, V> e = getFirst(hash); e != null; e = e.getNext()) {
        if (e.getHash() != hash) {
          continue;
        }

        K entryKey = e.getKey();
        if (entryKey == null) {
          tryDrainReferenceQueues();
          continue;
        }

        if (map.keyEquivalence.equivalent(key, entryKey)) {
          return e;
        }
      }

      return null;
    }    


   ReferenceEntry<K, V> getFirst(int hash) {
      // read this volatile field only once
      AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
      return table.get(hash & (table.length() - 1));
    }

比较好理解,第二部分,接下来看scheduleRefresh:

    V scheduleRefresh(
        ReferenceEntry<K, V> entry,
        K key,
        int hash,
        V oldValue,
        long now,
        CacheLoader<? super K, V> loader) {
      /*1. 是否配置了刷新 2.是否到了刷新时间 3.是否已经在刷新*/
      if (map.refreshes()
          && (now - entry.getWriteTime() > map.refreshNanos)
          && !entry.getValueReference().isLoading()) {
        /*是,是,否,开始刷新*/
        V newValue = refresh(key, hash, loader, true);
        if (newValue != null) {
          return newValue;
        }
      }
      return oldValue;
    }


    V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) {

      /*将正在刷新的值插入“正在刷新队列”*/
      final LoadingValueReference<K, V> loadingValueReference =
          insertLoadingValueReference(key, hash, checkTime);
      if (loadingValueReference == null) {
        return null;
      }
       
      /*异步刷新*/
      ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader);
      if (result.isDone()) {
        try {
          return Uninterruptibles.getUninterruptibly(result);
        } catch (Throwable t) {
          // don't let refresh exceptions propagate; error was already logged
        }
      }
      return null;
    }

ListenableFuture<V> loadAsync(
        final K key,
        final int hash,
        final LoadingValueReference<K, V> loadingValueReference,
        CacheLoader<? super K, V> loader) {
      final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
      loadingFuture.addListener(
          new Runnable() {
            @Override
            public void run() {
              try {
                /*重新加载数据*/
                getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
              } catch (Throwable t) {
                logger.log(Level.WARNING, "Exception thrown during refresh", t);
                loadingValueReference.setException(t);
              }
            }
          },
          directExecutor());
      return loadingFuture;
    }

看loadFuture源码,旧值为null,则调用load, 旧值非null,则调用reload. 

    public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
      try {
        stopwatch.start();
        V previousValue = oldValue.get();
        if (previousValue == null) {
          V newValue = loader.load(key);
          return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
        }
        ListenableFuture<V> newValue = loader.reload(key, previousValue);
        if (newValue == null) {
          return Futures.immediateFuture(null);
        }
        // To avoid a race, make sure the refreshed value is set into loadingValueReference
        // *before* returning newValue from the cache query.
        return transform(
            newValue,
            new com.google.common.base.Function<V, V>() {
              @Override
              public V apply(V newValue) {
                LoadingValueReference.this.set(newValue);
                return newValue;
              }
            },
            directExecutor());
      } catch (Throwable t) {
        ListenableFuture<V> result = setException(t) ? futureValue : fullyFailedFuture(t);
        if (t instanceof InterruptedException) {
          Thread.currentThread().interrupt();
        }
        return result;
      }
    }

第三部分,看postReadCleanUp()源码:

   void postReadCleanup() {
      /*DRAIN_THREASHOLD值为0X3F*/
      /*每读64次清理一次*/
      if ((readCount.incrementAndGet() & DRAIN_THRESHOLD) == 0) {
        cleanUp();
      }
    }

总结,GuavaCache的实现并不复杂,过期数据等的清理都是在下一次写操作时执行,或者第64次读操作,但是基本很少会发生在读操作,因为写操作时会清零读操作次数,读操作每次都会检查刷新时间并且触发异步刷新。

 类似资料: