group是核心数据结构,对外提供服务,对内进行数据操作
// A Group is a cache namespace and associated data loaded spread over
// a group of 1 or more machines.
type Group struct {
name string // 名字
getter Getter //获取的数据不在缓存中时,加载数据的方法
peersOnce sync.Once // 只执行一次
peers PeerPicker
cacheBytes int64 // limit for sum of mainCache and hotCache size // 可使用的最大缓存大小
// mainCache is a cache of the keys for which this process
// (amongst its peers) is authoritative. That is, this cache
// contains keys which consistent hash on to this process's
// peer number.
mainCache cache // 当前group缓存的数据
// hotCache contains keys/values for which this peer is not
// authoritative (otherwise they would be in mainCache), but
// are popular enough to warrant mirroring in this process to
// avoid going over the network to fetch from a peer. Having
// a hotCache avoids network hotspotting, where a peer's
// network card could become the bottleneck on a popular key.
// This cache is used sparingly to maximize the total number
// of key/value pairs that can be stored globally.
hotCache cache //缓存不在本节点的数据,避免热点数据造成网络io成为瓶颈
// loadGroup ensures that each key is only fetched once
// (either locally or remotely), regardless of the number of
// concurrent callers.
loadGroup flightGroup //单飞 避免惊群
_ int32 // force Stats to be 8-byte aligned on 32-bit platforms
// Stats are statistics on the group.
Stats Stats // group的一些信息
}
group是核心数据结构,通过字段可以看出,groupcache在高并发做了很多处理
// NewGroup creates a coordinated group-aware Getter from a Getter.
//
// The returned Getter tries (but does not guarantee) to run only one
// Get call at once for a given key across an entire set of peer
// processes. Concurrent callers both in the local process and in
// other processes receive copies of the answer once the original Get
// completes.
//
// The group name must be unique for each getter.
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
return newGroup(name, cacheBytes, getter, nil)
}
// If peers is nil, the peerPicker is called via a sync.Once to initialize it.
func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
// 如果用户传入getter 为nil,直接panic
// getter: 数据不在缓存中时 加载数据的方法
if getter == nil {
panic("nil Getter")
}
// 加锁
mu.Lock()
defer mu.Unlock()
// 如果注册了 初始化的方法 则调用
initPeerServerOnce.Do(callInitPeerServer)
// 如果name注册过 直接panic,避免重复
if _, dup := groups[name]; dup {
panic("duplicate registration of group " + name)
}
g := &Group{
name: name,
getter: getter,
peers: peers,
cacheBytes: cacheBytes,
loadGroup: &singleflight.Group{},
}
// 如果注册过newGroupHook 则执行
if fn := newGroupHook; fn != nil {
fn(g)
}
groups[name] = g
return g
}
初始化方法newGroup是常规的加锁,然后初始化。
func (g *Group) Get(ctx context.Context, key string, dest Sink) error {
// 初始化节点选择方法
g.peersOnce.Do(g.initPeers)
g.Stats.Gets.Add(1)
if dest == nil {
return errors.New("groupcache: nil dest Sink")
}
// 先从mainCache中获取数据,如果获取不到则再从hotCache中再次获取
value, cacheHit := g.lookupCache(key)
if cacheHit {
// cache命中次数+1
g.Stats.CacheHits.Add(1)
// 设置值
// 利用指针修改值
return setSinkView(dest, value)
}
// Optimization to avoid double unmarshalling or copying: keep
// track of whether the dest was already populated. One caller
// (if local) will set this; the losers will not. The common
// case will likely be one caller.
// 尝试加载数据
destPopulated := false
value, destPopulated, err := g.load(ctx, key, dest)
if err != nil {
return err
}
if destPopulated {
return nil
}
return setSinkView(dest, value)
}
先从本地缓存中获取数据, 如果不在缓存中,则调用load方法进行加载数据
// load loads key either by invoking the getter locally or by sending it to another machine.
func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
g.Stats.Loads.Add(1)
viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
// Check the cache again because singleflight can only dedup calls
// that overlap concurrently. It's possible for 2 concurrent
// requests to miss the cache, resulting in 2 load() calls. An
// unfortunate goroutine scheduling would result in this callback
// being run twice, serially. If we don't check the cache again,
// cache.nbytes would be incremented below even though there will
// be only one entry for this key.
//
// Consider the following serialized event ordering for two
// goroutines in which this callback gets called twice for the
// same key:
// 1: Get("key")
// 2: Get("key")
// 1: lookupCache("key")
// 2: lookupCache("key")
// 1: load("key")
// 2: load("key")
// 1: loadGroup.Do("key", fn)
// 1: fn()
// 2: loadGroup.Do("key", fn)
// 2: fn()
// 再次检查 缓存中数据
if value, cacheHit := g.lookupCache(key); cacheHit {
g.Stats.CacheHits.Add(1)
return value, nil
}
g.Stats.LoadsDeduped.Add(1)
var value ByteView
var err error
// 判断是不是分布式的
if peer, ok := g.peers.PickPeer(key); ok {
// 从远程获取数据
value, err = g.getFromPeer(ctx, peer, key)
if err == nil {
// 如果获取到了数据则返回
g.Stats.PeerLoads.Add(1)
return value, nil
}
g.Stats.PeerErrors.Add(1)
// TODO(bradfitz): log the peer's error? keep
// log of the past few for /groupcachez? It's
// probably boring (normal task movement), so not
// worth logging I imagine.
}
// 从本地本地获取数据
value, err = g.getLocally(ctx, key, dest)
if err != nil {
g.Stats.LocalLoadErrs.Add(1)
return nil, err
}
g.Stats.LocalLoads.Add(1)
destPopulated = true // only one caller of load gets this return value
// 缓存数据
g.populateCache(key, value, &g.mainCache)
return value, nil
})
if err == nil {
value = viewi.(ByteView)
}
return
}
数据不在缓存中时,会调用本方法,本方法主要使用了单飞,然后判断数据从本地或者远端获取
func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView, error) {
// 调用 group初始化时 用户传递的方法进行加载数据
err := g.getter.Get(ctx, key, dest)
if err != nil {
return ByteView{}, err
}
// 现有的 view()实现,err都是nil的
return dest.view()
}
本地获取数据就比较简单,直接调用用户定义的方法
func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error) {
req := &pb.GetRequest{
Group: &g.name,
Key: &key,
}
res := &pb.GetResponse{}
// 获取数据
err := peer.Get(ctx, req, res)
if err != nil {
return ByteView{}, err
}
value := ByteView{b: res.Value}
// TODO(bradfitz): use res.MinuteQps or something smart to
// conditionally populate hotCache. For now just do it some
// percentage of the time.
// 随机将数据加入到hotCache
if rand.Intn(10) == 0 {
g.populateCache(key, value, &g.hotCache)
}
return value, nil
}
1. groupcache是通过http进行通信,传输的内容是pb格式, pb可以减少传输文本大小
2. 只有部分数据在从远端获取后加入到hotCache