当前位置: 首页 > 工具软件 > BigCache > 使用案例 >

Bigcache(1)-NewBigCache

拓拔迪
2023-12-01

前言

BigCache作为一个知名的本地存储库,其本身存在着很多巧妙的设计理念与实现,本系列将从源码入手,一步一步做出解析,希望感兴趣的可以共勉。

NewBigCache()

该函数位于bigcache/bigcache.go内
NewBigCache函数作为BigCache的实例化函数,作用是根据给定的配置返回一个cahce实例,可以直接自定义bigcache的Config结构体,也可以使用默认的预置配置函数DefaultConfig()获取默认配置。
下面是该函数的定义:

// NewBigCache initialize new instance of BigCache
func NewBigCache(config Config) (*BigCache, error) {
	return newBigCache(config, &systemClock{})
}

Config Struct

下面是Config Struct定义:

// Config for BigCache
type Config struct {
	// 缓存分片数,值必须是2的幂,例如1024,2048等
	Shards int
	// key的过期时间,bigcache的淘汰策略使用的是时间策略,即根据固定的一个过期时间用FIFO算法进行淘汰过期的key
	LifeWindow time.Duration
	// 定时清理过期key的周期时间,最小单位是1秒,大于等于1秒生效
	CleanWindow time.Duration
	// 设置最大存储对象数量,注意,这里是所有分片的可存储总和
	MaxEntriesInWindow int
	// 单个缓存得最大长度限制(字节数),用于初始化分片大小
	MaxEntrySize int
	// 是否计算缓存资源被请求的次数
	StatsEnabled bool
	// 详细模式 打印有关新内存分配的信息
	Verbose bool
	// 将字符串转化为int64,默认使用fnv64
	Hasher Hasher
	// 字节队列的缓存size阈值,单位是MB,到达阈值后最老的记录会被最新的记录覆盖,0为不限制size
	HardMaxCacheSize int
	// 优先级第二,当最老的记录因为过期被移除或内存剩余空间不足或者因为调用了delete方法的时候,会触发回调方法,并且返回代表原因的位掩码
	OnRemove func(key string, entry []byte)
	// 优先级第一,当最老的记录因为过期被移除或内存剩余空间不足或者因为调用了delete方法的时候,会触发该回调方法,返回表示该特定条目细节的结构。
	OnRemoveWithMetadata func(key string, entry []byte, keyMetadata Metadata)
	// 优先级第三,当最老的记录因为过期被移除或内存剩余空间不足或者因为调用了delete方法的时候,会触发该回调方法,将传递一个表示原因的常量。
	OnRemoveWithReason func(key string, entry []byte, reason RemoveReason)

	onRemoveFilter int

	// Logger 当Verbose为true时启用得log日志
	Logger Logger
}

经过我尽量的说人话,应该通过对这个初始化用的配置struct的了解,大概知晓了bigcache的一些使用时的信息,接着往下看。

通过这个配置,应该知道了如何自定义初始化bigcache,如果是第一次使用,或者测试阶段,也可以使用提供的位于bigcache/config.go内的DefaultConfig默认初始化函数。下面是其函数定义:

// DefaultConfig initializes config with default values.
// When load for BigCache can be predicted in advance then it is better to use custom config.
func DefaultConfig(eviction time.Duration) Config {
	return Config{
		Shards:             1024,
		LifeWindow:         eviction,
		CleanWindow:        1 * time.Second,
		MaxEntriesInWindow: 1000 * 10 * 60,
		MaxEntrySize:       500,
		StatsEnabled:       false,
		Verbose:            true,
		Hasher:             newDefaultHasher(),
		HardMaxCacheSize:   0,
		Logger:             DefaultLogger(),
	}
}

可以看出,这其实就是官方给出的一个基本配置,而且注明了当BigCache的负载可以提前预测时,最好使用自定义配置。

  • Shards(分片数量):1024
  • LifeWindow(key过期时间):eviction,外部输入过期时间
  • CleanWindow(自动清理过期key时间):1,1分钟清理一次过期key
  • MaxEntriesInWindow(允许所有分区加起来的最大缓存数量):1000 * 10 * 60,所有分片总共可以缓存1000 * 10 * 60个key
  • MaxEntrySize(单个缓存得最大字节数):500,每个key的value最长500字节
  • StatsEnabled(是否计算缓存资源被请求的次数):false,不计算
  • Verbose(详细模式):true,打印有关新内存分配的信息
  • Hasher(字符串转化为int64-默认fnv64):newDefaultHasher(),转换方法
  • Logger(log日志):DefaultLogger(),

newBigCache()

不难发现,NewBigCache()内部调用的是newBigCache()函数,这里将NewBigCache()对外开放,而newBigCache()则只对内使用。
先来看下函数定义:

func newBigCache(config Config, clock clock) (*BigCache, error) {
	if !isPowerOfTwo(config.Shards) {
		return nil, fmt.Errorf("Shards number must be power of two")
	}
	if config.MaxEntrySize < 0 {
		return nil, fmt.Errorf("MaxEntrySize must be >= 0")
	}
	if config.MaxEntriesInWindow < 0 {
		return nil, fmt.Errorf("MaxEntriesInWindow must be >= 0")
	}
	if config.HardMaxCacheSize < 0 {
		return nil, fmt.Errorf("HardMaxCacheSize must be >= 0")
	}

	if config.Hasher == nil {
		config.Hasher = newDefaultHasher()
	}

	cache := &BigCache{
		shards:     make([]*cacheShard, config.Shards),
		lifeWindow: uint64(config.LifeWindow.Seconds()),
		clock:      clock,
		hash:       config.Hasher,
		config:     config,
		shardMask:  uint64(config.Shards - 1),
		close:      make(chan struct{}),
	}

	var onRemove func(wrappedEntry []byte, reason RemoveReason)
	if config.OnRemoveWithMetadata != nil {
		onRemove = cache.providedOnRemoveWithMetadata
	} else if config.OnRemove != nil {
		onRemove = cache.providedOnRemove
	} else if config.OnRemoveWithReason != nil {
		onRemove = cache.providedOnRemoveWithReason
	} else {
		onRemove = cache.notProvidedOnRemove
	}

	for i := 0; i < config.Shards; i++ {
		cache.shards[i] = initNewShard(config, onRemove, clock)
	}

	if config.CleanWindow > 0 {
		go func() {
			ticker := time.NewTicker(config.CleanWindow)
			defer ticker.Stop()
			for {
				select {
				case t := <-ticker.C:
					cache.cleanUp(uint64(t.Unix()))
				case <-cache.close:
					return
				}
			}
		}()
	}

	return cache, nil
}

1、基础值校验

这里首先是对config.Shards、config.MaxEntrySize、config.MaxEntriesInWindow、config.HardMaxCacheSize的值做了范围校验。尤其是config.Shards的校验就很巧妙。来看下实现函数:

func isPowerOfTwo(number int) bool {
	return (number != 0) && (number&(number-1)) == 0
}

这里的实现就比较简单,但是很巧妙,给出两个具体例子,看一下就知道了:

example 1: number=8
8-> 1000; 7-> 0111; 8&7=0

example 2: number=16
16-> 0001 0000; 15-> 0000 1111; 16&15=0

如果config.Hasher没有传值,会将默认的newDefaultHasher()内的hash处理函数给赋值,即将字符串转化为int64,默认使用fnv64的处理函数。

type fnv64a struct{}
offset64 = 14695981039346656037
prime64 = 1099511628211

func (f fnv64a) Sum64(key string) uint64 {
	var hash uint64 = offset64
	for i := 0; i < len(key); i++ {
		hash ^= uint64(key[i])
		hash *= prime64
	}

	return hash
}

2、BigCache Struct

先来看下定义:

// BigCache is fast, concurrent, evicting cache created to keep big number of entries without impact on performance.
// It keeps entries on heap but omits GC for them. To achieve that, operations take place on byte arrays,
// therefore entries (de)serialization in front of the cache will be needed in most use cases.
type BigCache struct {
	shards       []*cacheShard 	//分片
	lifeWindow   uint64			//key的过期时间
	clock        clock			//时间获取的方法
	hash         Hasher			//hash算法
	config       Config			//配置文件,就是一开始的Config结构体
	shardMask    uint64			//常量:分片数-1,用于 &运算 替换取模运算,寻找分片位置
	maxShardSize uint32			//分片最大容量(字节数)
	close        chan struct{}	//控制关闭
}

先来看一下官方给的解释:BigCache是一种快速、并发、逐出式缓存,用于在不影响性能的情况下保留大量条目。它将条目保留在堆上,但忽略了它们的GC。为了实现这一点,操作发生在字节数组上,因此在大多数用例中,都需要在缓存前面进行条目(反)序列化。

step1:

shards:     make([]*cacheShard, config.Shards),

这里又引出了另一个关键的结构cacheShard,这里暂时不做过多介绍,因为在下面紧接着不久就会再看见他,到时候再进行叙述更为贴切。
这里实际上就是使用make来初始化一下内存空间。

step2:

lifeWindow: uint64(config.LifeWindow.Seconds()),

这里是根据配置的config.LifeWindow即key的过期时间,来获取设定的数值(单位为秒),也就是经过多少秒才会过期。

step3:

clock:      clock,
hash:       config.Hasher,
config:     config,
shardMask:  uint64(config.Shards - 1),

其中clock、hash、config就是直接从配置进行赋值,而shardMask则是config.Shards - 1,这也不难理解,这里不做过多叙述

step4:

close:      make(chan struct{}),

这里是使用channel进行结束信号传递。

3、onRemove功能函数初始化

先看下这部分代码:

var onRemove func(wrappedEntry []byte, reason RemoveReason)
if config.OnRemoveWithMetadata != nil {
	onRemove = cache.providedOnRemoveWithMetadata
} else if config.OnRemove != nil {
	onRemove = cache.providedOnRemove
} else if config.OnRemoveWithReason != nil {
	onRemove = cache.providedOnRemoveWithReason
} else {
	onRemove = cache.notProvidedOnRemove
}

for i := 0; i < config.Shards; i++ {
	cache.shards[i] = initNewShard(config, onRemove, clock)
}

根据输入的回调函数情况,将对应的回调函数复制给onRemove,如果未输入就将notProvidedOnRemove赋值给onRemove,然后在下面的循环内,为每个分片都进行回调函数的注册。

3.1 providedOnRemoveWithMetadata

先来看下函数定义:

func (c *BigCache) providedOnRemoveWithMetadata(wrappedEntry []byte, reason RemoveReason) {
	hashedKey := c.hash.Sum64(readKeyFromEntry(wrappedEntry))
	shard := c.getShard(hashedKey)
	c.config.OnRemoveWithMetadata(readKeyFromEntry(wrappedEntry), readEntry(wrappedEntry), shard.getKeyMetadata(hashedKey))
}

不难发现上面对三者的优先级定义其实就是这个if-else语句决定的。

step1:
先来看下第一句:

hashedKey := c.hash.Sum64(readKeyFromEntry(wrappedEntry))

这里明显是调用在外面注册的hash计算函数,将输入的string进行计算,得到hash的键值,然后输入是一个[]byte类型,所以这里的readKeyFromEntry()函数肯定是将[]byte转换为string,再来看一下这个函数的定义:

func readKeyFromEntry(data []byte) string {
	length := binary.LittleEndian.Uint16(data[timestampSizeInBytes+hashSizeInBytes:])

	// copy on read
	dst := make([]byte, length)
	copy(dst, data[headersSizeInBytes:headersSizeInBytes+length])

	return bytesToString(dst)
}

这里直接看返回的确是string,再来看一下内部具体是怎么实现的。
这里是根据使用binary.LittleEndian.Uint16获取key的长度,然后在data内取出key的内容,这里涉及到SET方法时的存储方式,不在这里过于深究,暂时先给出存储方式,在进行SET方法时在进行展开。
格式:时间戳|hashKey|keyLength|key|data

  • 时间戳:常见的是10位(单位为秒,也有使用13位的,单位为毫秒),但是这里是只保存8位,后面的会被覆盖
  • hashKey:同理也是此处存存储8位
  • keyLength:存储记录key字符串的长度,本身是16位
  • key:就是存储时的string,由于他的下标签已知(根据时间戳、hashKey、keyLength的长度计算得出),再加上keyLength记录的key字符串的长度,就可以取出这个数据。
  • data:就是key-value中的value

所以readKeyFromEntry就是取出这个存储记录中key代表的string。

step2:
接下来再看:

shard := c.getShard(hashedKey)

从这里就可以看出,bigcache是利用hash进行的自动分区,而getShard理所当然就是根据hash计算出对应的分区。
来看下函数内部定义:

func (c *BigCache) getShard(hashedKey uint64) (shard *cacheShard) {
	return c.shards[hashedKey&c.shardMask]
}

这里是采用hashKey和shardMask按位与计算得出分配的shard中的标签。
由于是按位与,所以可以保证得出的结果(标签),必然是小于等于这两个数中的最小者,在bigcache中几乎肯定是shardMask小,而shardMask在初始化阶段是由uint64(config.Shards - 1)得到的,所以无论结果如何分配到的标签肯定是存在对应的分区的,因为一共就初始化了config.Shards个分区。

而且由于hashKey的特殊性,几乎不可能存在shardMask大于hashKey的情况,退一万步讲,即使存在,但是按位与后的结果肯定是小于等于hashKey,那么影响就是大于hashKey的标签对应的分片一直无法被分配,处于闲置的状态,所以也不会造成此处发生异常。

step3:
接下来看:

c.config.OnRemoveWithMetadata(readKeyFromEntry(wrappedEntry), readEntry(wrappedEntry), shard.getKeyMetadata(hashedKey))

这里的OnRemoveWithMetadata就是在初始化时外界传入的回调函数,具体执行什么功能,就看个人定义了,这里主要是说下,bigcache究竟给我们的回调函数什么信息:

首先是readKeyFromEntry(),他在上面已经说明过了,就是取出这个存储记录中key代表的string。

其次是readEntry(),来看下内部实现:

func readEntry(data []byte) []byte {
	length := binary.LittleEndian.Uint16(data[timestampSizeInBytes+hashSizeInBytes:])

	// copy on read
	dst := make([]byte, len(data)-int(headersSizeInBytes+length))
	copy(dst, data[headersSizeInBytes+length:])

	return dst
}

一样的配方,一样的味道,就是取出存储的value值。可以对比下readEntry()readKeyFromEntry()的实现,很清晰。

最后是getKeyMetadata(),来看下内部实现:

func (s *cacheShard) getKeyMetadata(key uint64) Metadata {
	return Metadata{
		RequestCount: s.hashmapStats[key],
	}
}

这里出现一个新的结果体,让我们先来看下Metadata:

// Metadata contains information of a specific entry
type Metadata struct {
	RequestCount uint32
}

这个结构体很简单就是记录元数据包含特定条目的信息,即缓存资源被请求的次数。

3.2 providedOnRemove

先来看下这个函数的定义:

func (c *BigCache) providedOnRemove(wrappedEntry []byte, reason RemoveReason) {
	c.config.OnRemove(readKeyFromEntry(wrappedEntry), readEntry(wrappedEntry))
}

这里是将readKeyFromEntry()readEntry()的结果返回给回调函数,这俩函数上面有,不进行赘述了。

3.3 providedOnRemoveWithReason

先来看下这个函数的定义:

func (c *BigCache) providedOnRemoveWithReason(wrappedEntry []byte, reason RemoveReason) {
	if c.config.onRemoveFilter == 0 || (1<<uint(reason))&c.config.onRemoveFilter > 0 {
		c.config.OnRemoveWithReason(readKeyFromEntry(wrappedEntry), readEntry(wrappedEntry), reason)
	}
}

这里出现了一个onRemoveFilter,这个就是回调过滤的条件,他的定义是:

onRemoveFilter int

然后回调原因的reason定义如下:

// RemoveReason is a value used to signal to the user why a particular key was removed in the OnRemove callback.
type RemoveReason uint32

const (
	// Expired means the key is past its LifeWindow.
	Expired = RemoveReason(1)
	// NoSpace means the key is the oldest and the cache size was at its maximum when Set was called, or the
	// entry exceeded the maximum shard size.
	NoSpace = RemoveReason(2)
	// Deleted means Delete was called and this key was removed as a result.
	Deleted = RemoveReason(3)
)

这个就是具体的原因:
1、key超过了过期时间
2、超过了最大缓存size
3、主动delete删除

这个providedOnRemoveWithReason其实与providedOnRemove除了增加了回调的原因变量作为条件之外,就是完全相同的,所以也不在进行赘述。

notProvidedOnRemove

先来看下函数定义:

func (c *BigCache) notProvidedOnRemove(wrappedEntry []byte, reason RemoveReason) {
}

我想也不用进行赘述了

initNewShard

先来看下函数定义:

func initNewShard(config Config, callback onRemoveCallback, clock clock) *cacheShard {
	bytesQueueInitialCapacity := config.initialShardSize() * config.MaxEntrySize
	maximumShardSizeInBytes := config.maximumShardSizeInBytes()
	if maximumShardSizeInBytes > 0 && bytesQueueInitialCapacity > maximumShardSizeInBytes {
		bytesQueueInitialCapacity = maximumShardSizeInBytes
	}
	return &cacheShard{
		hashmap:      make(map[uint64]uint32, config.initialShardSize()),
		hashmapStats: make(map[uint64]uint32, config.initialShardSize()),
		entries:      *queue.NewBytesQueue(bytesQueueInitialCapacity, maximumShardSizeInBytes, config.Verbose),
		entryBuffer:  make([]byte, config.MaxEntrySize+headersSizeInBytes),
		onRemove:     callback,

		isVerbose:    config.Verbose,
		logger:       newLogger(config.Logger),
		clock:        clock,
		lifeWindow:   uint64(config.LifeWindow.Seconds()),
		statsEnabled: config.StatsEnabled,
	}
}

step1:

bytesQueueInitialCapacity := config.initialShardSize() * config.MaxEntrySize

这里的initialShardSize()是计算每个分片要存储多少个key缓存。
bytesQueueInitialCapacity是通过每个分片分配的缓存和每个缓存的最大字节数得到每个分片容纳的最大字节数,即最大容量。
看下内部实现:

func (c Config) initialShardSize() int {
	return max(c.MaxEntriesInWindow/c.Shards, minimumEntriesInShard)
}

MaxEntriesInWindow是所有分片的最大缓存key-value对数量,Shards是缓存分片数,这在开始就进行过叙述,这里是计算单个分片要存储多少个key-value,如果小于预设的minimumEntriesInShard,会返回minimumEntriesInShard。

step2:

maximumShardSizeInBytes := config.maximumShardSizeInBytes()

这里的maximumShardSizeInBytes()是以字节为单位计算最大分片大小
看下内部实现:

// maximumShardSizeInBytes computes maximum shard size in bytes
func (c Config) maximumShardSizeInBytes() int {
	maxShardSize := 0

	if c.HardMaxCacheSize > 0 {
		maxShardSize = convertMBToBytes(c.HardMaxCacheSize) / c.Shards
	}

	return maxShardSize
}

其中convertMBToBytes的作用是将MB转为Byte,maximumShardSizeInBytes()完成了计算每个字节的预设最大字节数,0视为无限制。
所以maximumShardSizeInBytes就是最后的计算出每个分片的最大字节数(0为不限制)

step3:

if maximumShardSizeInBytes > 0 && bytesQueueInitialCapacity > maximumShardSizeInBytes {
		bytesQueueInitialCapacity = maximumShardSizeInBytes
	}

这里实现的就比较简单,就是比较下如果maximumShardSizeInBytes不为0,且小于计算得到的bytesQueueInitialCapacity时,就将bytesQueueInitialCapacity设置为maximumShardSizeInBytes的值

step4:

return &cacheShard{
		hashmap:      make(map[uint64]uint32, config.initialShardSize()),
		hashmapStats: make(map[uint64]uint32, config.initialShardSize()),
		entries:      *queue.NewBytesQueue(bytesQueueInitialCapacity, maximumShardSizeInBytes, config.Verbose),
		entryBuffer:  make([]byte, config.MaxEntrySize+headersSizeInBytes),
		onRemove:     callback,

		isVerbose:    config.Verbose,
		logger:       newLogger(config.Logger),
		clock:        clock,
		lifeWindow:   uint64(config.LifeWindow.Seconds()),
		statsEnabled: config.StatsEnabled,
	}

这里显然是出现了另一个关键的结构体:

type cacheShard struct {
	hashmap     map[uint64]uint32	//uint64:key的hash值,uint32:存储在bytesQueue的开始下标
	entries     queue.BytesQueue	// 实际存储的结构,字节数组,一个环形队列
	lock        sync.RWMutex		// 读写锁
	entryBuffer []byte				// 用于set的时候临时存放要set的value值
	onRemove    onRemoveCallback	// 触发删除时的回调函数-也就是给定的config中的回调函数

	isVerbose    bool				// 是否是详细模式-打印日志
	statsEnabled bool				// 是否计算请求缓存资源的次数
	logger       Logger				// 日志打印实例方法
	clock        clock				// 时间获取的方法
	lifeWindow   uint64				// 设置的key的过期时间

	hashmapStats map[uint64]uint32	// 计数-缓存资源被请求的次数
	stats        Stats				// 计数的结构
}

来看下这个计数结构:

// Stats 存储缓存统计信息
type Stats struct {
	// Hits 记录该分片找到key成功的次数
	Hits int64 `json:"hits"`
	// Misses 记录该分片没有找到key失败的次数
	Misses int64 `json:"misses"`
	// DelHits 记录该分片delete成功的次数
	DelHits int64 `json:"delete_hits"`
	// DelMisses 记录该分片delete失败的次数
	DelMisses int64 `json:"delete_misses"`
	// Collisions 记录该分片成功get的次数
	Collisions int64 `json:"collisions"`
}

回到

return &cacheShard{
		hashmap:      make(map[uint64]uint32, config.initialShardSize()),
		hashmapStats: make(map[uint64]uint32, config.initialShardSize()),
		entries:      *queue.NewBytesQueue(bytesQueueInitialCapacity, maximumShardSizeInBytes, config.Verbose),
		entryBuffer:  make([]byte, config.MaxEntrySize+headersSizeInBytes),
		onRemove:     callback,

		isVerbose:    config.Verbose,
		logger:       newLogger(config.Logger),
		clock:        clock,
		lifeWindow:   uint64(config.LifeWindow.Seconds()),
		statsEnabled: config.StatsEnabled,
	}

看一下这个结构体NewBytesQueue函数:

// NewBytesQueue 初始化一个新的字节队列.
// capacity 将会被用于初始化字节数字的size分配
// verbose 为true时会将一些内存分配中的详细信息打印出来
func NewBytesQueue(capacity int, maxCapacity int, verbose bool) *BytesQueue {
	return &BytesQueue{
		array:        make([]byte, capacity),
		capacity:     capacity,
		maxCapacity:  maxCapacity,
		headerBuffer: make([]byte, binary.MaxVarintLen32),
		tail:         leftMarginIndex,
		head:         leftMarginIndex,
		rightMargin:  leftMarginIndex,
		verbose:      verbose,
	}
}

由这里也可以看出,这里是每一个分片都有自己的一个队列用来存储整个分片的所有数据。

其他的都是一些一眼就知道含义的,不在进行赘述。

这里的叙述也是在我不了解的情况下,直接就NewBigCache()开始进行学习和叙述,这样的好处是,当刚接触bigcache的人并且想学习源码时,咱们的思路可能是相同的,在之后会有一些总结性的文章,现在,让我们准备开始Set函数。

 类似资料: