当前位置: 首页 > 工具软件 > GroupCache > 使用案例 >

groupcache源码(四)-主数据结构group

农均
2023-12-01

一. 简介

group是核心数据结构,对外提供服务,对内进行数据操作

二. 结构体信息

// A Group is a cache namespace and associated data loaded spread over
// a group of 1 or more machines.
type Group struct {
	name       string // 名字
	getter     Getter //获取的数据不在缓存中时,加载数据的方法
	peersOnce  sync.Once // 只执行一次
	peers      PeerPicker
	cacheBytes int64 // limit for sum of mainCache and hotCache size // 可使用的最大缓存大小

	// mainCache is a cache of the keys for which this process
	// (amongst its peers) is authoritative. That is, this cache
	// contains keys which consistent hash on to this process's
	// peer number.
	mainCache cache // 当前group缓存的数据

	// hotCache contains keys/values for which this peer is not
	// authoritative (otherwise they would be in mainCache), but
	// are popular enough to warrant mirroring in this process to
	// avoid going over the network to fetch from a peer.  Having
	// a hotCache avoids network hotspotting, where a peer's
	// network card could become the bottleneck on a popular key.
	// This cache is used sparingly to maximize the total number
	// of key/value pairs that can be stored globally.
	hotCache cache //缓存不在本节点的数据,避免热点数据造成网络io成为瓶颈

	// loadGroup ensures that each key is only fetched once
	// (either locally or remotely), regardless of the number of
	// concurrent callers.
	loadGroup flightGroup //单飞 避免惊群

	_ int32 // force Stats to be 8-byte aligned on 32-bit platforms

	// Stats are statistics on the group.
	Stats Stats // group的一些信息
}

group是核心数据结构,通过字段可以看出,groupcache在高并发做了很多处理

三. group的实例化

// NewGroup creates a coordinated group-aware Getter from a Getter.
//
// The returned Getter tries (but does not guarantee) to run only one
// Get call at once for a given key across an entire set of peer
// processes. Concurrent callers both in the local process and in
// other processes receive copies of the answer once the original Get
// completes.
//
// The group name must be unique for each getter.
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
	return newGroup(name, cacheBytes, getter, nil)
}

// If peers is nil, the peerPicker is called via a sync.Once to initialize it.
func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group {
// 如果用户传入getter 为nil,直接panic
// getter: 数据不在缓存中时 加载数据的方法
	if getter == nil {
		panic("nil Getter")
	}
// 加锁
	mu.Lock()
	defer mu.Unlock()
// 如果注册了 初始化的方法 则调用
	initPeerServerOnce.Do(callInitPeerServer)
// 如果name注册过 直接panic,避免重复
	if _, dup := groups[name]; dup {
		panic("duplicate registration of group " + name)
	}
	g := &Group{
		name:       name,
		getter:     getter,
		peers:      peers,
		cacheBytes: cacheBytes,
		loadGroup:  &singleflight.Group{},
	}
// 如果注册过newGroupHook 则执行
	if fn := newGroupHook; fn != nil {
		fn(g)
	}
	groups[name] = g
	return g
}
初始化方法newGroup是常规的加锁,然后初始化。

四.数据获取

func (g *Group) Get(ctx context.Context, key string, dest Sink) error {
// 初始化节点选择方法
	g.peersOnce.Do(g.initPeers)
	g.Stats.Gets.Add(1)
	if dest == nil {
		return errors.New("groupcache: nil dest Sink")
	}
// 先从mainCache中获取数据,如果获取不到则再从hotCache中再次获取
	value, cacheHit := g.lookupCache(key)

	if cacheHit {
// cache命中次数+1
		g.Stats.CacheHits.Add(1)
// 设置值
// 利用指针修改值
		return setSinkView(dest, value)
	}

	// Optimization to avoid double unmarshalling or copying: keep
	// track of whether the dest was already populated. One caller
	// (if local) will set this; the losers will not. The common
	// case will likely be one caller.
// 尝试加载数据
	destPopulated := false
	value, destPopulated, err := g.load(ctx, key, dest)
	if err != nil {
		return err
	}
	if destPopulated {
		return nil
	}
	return setSinkView(dest, value)
}

先从本地缓存中获取数据, 如果不在缓存中,则调用load方法进行加载数据

五. 加载数据

// load loads key either by invoking the getter locally or by sending it to another machine.
func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
	g.Stats.Loads.Add(1)
	viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
		// Check the cache again because singleflight can only dedup calls
		// that overlap concurrently.  It's possible for 2 concurrent
		// requests to miss the cache, resulting in 2 load() calls.  An
		// unfortunate goroutine scheduling would result in this callback
		// being run twice, serially.  If we don't check the cache again,
		// cache.nbytes would be incremented below even though there will
		// be only one entry for this key.
		//
		// Consider the following serialized event ordering for two
		// goroutines in which this callback gets called twice for the
		// same key:
		// 1: Get("key")
		// 2: Get("key")
		// 1: lookupCache("key")
		// 2: lookupCache("key")
		// 1: load("key")
		// 2: load("key")
		// 1: loadGroup.Do("key", fn)
		// 1: fn()
		// 2: loadGroup.Do("key", fn)
		// 2: fn()
// 再次检查 缓存中数据
		if value, cacheHit := g.lookupCache(key); cacheHit {
			g.Stats.CacheHits.Add(1)
			return value, nil
		}
		g.Stats.LoadsDeduped.Add(1)
		var value ByteView
		var err error
// 判断是不是分布式的
		if peer, ok := g.peers.PickPeer(key); ok {
// 从远程获取数据
			value, err = g.getFromPeer(ctx, peer, key)
			if err == nil {
// 如果获取到了数据则返回
				g.Stats.PeerLoads.Add(1)
				return value, nil
			}
			g.Stats.PeerErrors.Add(1)
			// TODO(bradfitz): log the peer's error? keep
			// log of the past few for /groupcachez?  It's
			// probably boring (normal task movement), so not
			// worth logging I imagine.
		}
// 从本地本地获取数据
		value, err = g.getLocally(ctx, key, dest)
		if err != nil {
			g.Stats.LocalLoadErrs.Add(1)
			return nil, err
		}
		g.Stats.LocalLoads.Add(1)
		destPopulated = true // only one caller of load gets this return value
// 缓存数据
		g.populateCache(key, value, &g.mainCache)
		return value, nil
	})
	if err == nil {
		value = viewi.(ByteView)
	}
	return
}

数据不在缓存中时,会调用本方法,本方法主要使用了单飞,然后判断数据从本地或者远端获取

六. 从本地获取数据

func (g *Group) getLocally(ctx context.Context, key string, dest Sink) (ByteView, error) {
  // 调用 group初始化时 用户传递的方法进行加载数据	
  err := g.getter.Get(ctx, key, dest)
	if err != nil {
		return ByteView{}, err
	}
// 现有的 view()实现,err都是nil的
	return dest.view()
}

本地获取数据就比较简单,直接调用用户定义的方法

七. 加载远程数据

func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error) {
	req := &pb.GetRequest{
		Group: &g.name,
		Key:   &key,
	}
	res := &pb.GetResponse{}
// 获取数据
	err := peer.Get(ctx, req, res)
	if err != nil {
		return ByteView{}, err
	}
	value := ByteView{b: res.Value}
	// TODO(bradfitz): use res.MinuteQps or something smart to
	// conditionally populate hotCache.  For now just do it some
	// percentage of the time.
// 随机将数据加入到hotCache
	if rand.Intn(10) == 0 {
		g.populateCache(key, value, &g.hotCache)
	}
	return value, nil
}

1. groupcache是通过http进行通信,传输的内容是pb格式, pb可以减少传输文本大小

2. 只有部分数据在从远端获取后加入到hotCache

 类似资料: