groupcache是一个缓存系统,开始应用在Google下载站点dl.google.com,后来也使用在Google Blogger和Google Code这些数据更改频率较低的系统中。
groupcache没有update/delete 命令,只有set命令,使用lru存储策略,空间占满时便淘汰最不常使用的缓存,所以适合数据更改频率较低的应用。
groupcache集群使用“一致性哈希“分布节点,单节点出现问题对整体系统影响较小。
一. groupcache 使用
下面创建有三个groupcache节点的集群
"http://127.0.0.1:8001", "http://127.0.0.1:8002", "http://127.0.0.1:8003"
1. 创建本地httppool,并添加对等节点。httppool是一个集群节点选取器,保存所有节点信息,获取对等节点缓存时,通过计算key的“一致性哈希值”与节点的哈希值比较来选取集群中的某个节点
local_addr = “http://127.0.0.1:8001”
peers := groupcache.NewHTTPPool("http://" + local_addr)
peers_addrs = []string{"http://127.0.0.1:8001", "http://127.0.0.1:8002", "http://127.0.0.1:8003"}
peers.Set(peers_addrs...)
2. 创建一个group(一个group是一个存储模块,类似命名空间,可以创建多个) “image_cache”。NewGroup参数分别是group名字、group大小byte、getter函数(当获取不到key对应的缓存的时候,该函数处理如何获取相应数据,并设置给dest,然后image_cache便会缓存key对应的数据)
var image_cache = groupcache.NewGroup("image", 8<<30, groupcache.GetterFunc( func(ctx groupcache.Context, key string, dest groupcache.Sink) error { //此函数即为自定义数据处理逻辑 result, err := ioutil.ReadFile(key) if err != nil { fmt.Printf("read file error %s.\n", err.Error()) return nil } fmt.Printf("asking for %s from local file system\n", key) dest.SetBytes([]byte(result)) return nil }))
3. group查找对应key的缓存,data需要使用sink(一个数据包装结构)包装一下
var data []byte
image_cache.Get(nil, key, groupcache.AllocatingByteSliceSink(&data))
其他两个节点需修改local_addr 地址,然后3个节点的groupcache集群就设置成功了。
二. groupcache源码分析
groupcache主要分为httppool和group两部分,httppool负责集群管理,group负责缓存管理。
1. httppool的作用就是管理所有节点,并通过http协议使节点之间相互通信,获取存储在其他节点上的缓存
type HTTPPool struct { // Context optionally specifies a context for the server to use when it // receives a request. // If nil, the server uses a nil Context. Context func(*http.Request) Context // Transport optionally specifies an http.RoundTripper for the client // to use when it makes a request. // If nil, the client uses http.DefaultTransport. Transport func(Context) http.RoundTripper // this peer's base URL, e.g. "https://example.net:8000" self string //本地节点url // opts specifies the options. opts HTTPPoolOptions mu sync.Mutex // guards peers and httpGetters peers *consistenthash.Map //包含 一致性hash map、hash函数的结构体,保存集群节点(都是用url代表,同下)和其对应一致性hash值 httpGetters map[string]*httpGetter // keyed by e.g. "http://10.0.0.2:8008" 保存节点url和其对应的http数据请求器 } type HTTPPoolOptions struct { // BasePath specifies the HTTP path that will serve groupcache requests. // If blank, it defaults to "/_groupcache/". BasePath string //peers间url请求路径 // Replicas specifies the number of key replicas on the consistent hash. // If blank, it defaults to 50. Replicas int //单一节点在一致性hash map中的虚拟节点数 // HashFn specifies the hash function of the consistent hash. // If blank, it defaults to crc32.ChecksumIEEE. HashFn consistenthash.Hash //一致性hash函数 } type Map struct { //consistenthash.Map hash Hash //一致性hash函数 replicas int //单一节点在一致性hash map中的虚拟节点数 keys []int // Sorted //所有节点生成的虚拟节点hash值slice hashMap map[int]string //hash值和节点对应map } //httppool选取节点算法 func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) { p.mu.Lock() defer p.mu.Unlock() if p.peers.IsEmpty() { return nil, false } if peer := p.peers.Get(key); peer != p.self { //判断获取到的节点是不是本地节点 return p.httpGetters[peer], true //返回节点对应的httpGetter } return nil, false } func (m *Map) Get(key string) string { //p.peers.Get(key) if m.IsEmpty() { return "" } hash := int(m.hash([]byte(key))) //使用一致性hash函数计算"缓存数据"key的hash值 // Binary search for appropriate replica. idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash }) //选取最小的大于 key的hash值 的 节点hash值 // Means we have cycled back to the first replica. if idx == len(m.keys) { idx = 0 } return m.hashMap[m.keys[idx]] //返回节点(url) } //httppool实现了http Handler,可以给peers提供http服务,用来查询缓存 func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Parse request. if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) { panic("HTTPPool serving unexpected path: " + r.URL.Path) } parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2) if len(parts) != 2 { http.Error(w, "bad request", http.StatusBadRequest) return } groupName := parts[0] key := parts[1] // Fetch the value for this group/key. group := GetGroup(groupName) //获取group名字,因为可以有多个group if group == nil { http.Error(w, "no such group: "+groupName, http.StatusNotFound) return } var ctx Context if p.Context != nil { ctx = p.Context(r) } group.Stats.ServerRequests.Add(1) //设置统计数据 var value []byte err := group.Get(ctx, key, AllocatingByteSliceSink(&value)) //此处和groupcache使用处一致 if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } // Write the value to the response body as a proto message. body, err := proto.Marshal(&pb.GetResponse{Value: value}) //protobuf协议编码数据 if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/x-protobuf") w.Write(body) } //向其他节点发起http请求,获取缓存数据 func (h *httpGetter) Get(context Context, in *pb.GetRequest, out *pb.GetResponse) error { u := fmt.Sprintf( "%v%v/%v", h.baseURL, url.QueryEscape(in.GetGroup()), url.QueryEscape(in.GetKey()), ) req, err := http.NewRequest("GET", u, nil) if err != nil { return err } tr := http.DefaultTransport if h.transport != nil { tr = h.transport(context) } res, err := tr.RoundTrip(req) if err != nil { return err } defer res.Body.Close() if res.StatusCode != http.StatusOK { return fmt.Errorf("server returned: %v", res.Status) } b := bufferPool.Get().(*bytes.Buffer) b.Reset() defer bufferPool.Put(b) _, err = io.Copy(b, res.Body) if err != nil { return fmt.Errorf("reading response body: %v", err) } err = proto.Unmarshal(b.Bytes(), out) if err != nil { return fmt.Errorf("decoding response body: %v", err) } return nil }
2. group的作用是管理缓存,并通过httppool选取节点,获取其他节点的缓存。
type Group struct { name string //group 名字 getter Getter //getter 当缓存中不存在对应数据时,使用该函数获取数据并缓存 peersOnce sync.Once peers PeerPicker //http实现了该接口,使用 func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) 函数选取节点 cacheBytes int64 // limit for sum of mainCache and hotCache size //缓存最大空间 byte // mainCache is a cache of the keys for which this process // (amongst its peers) is authoritative. That is, this cache // contains keys which consistent hash on to this process's // peer number. mainCache cache //使用lru策略实现的缓存结构,也是key hash值在本地的缓存 // hotCache contains keys/values for which this peer is not // authoritative (otherwise they would be in mainCache), but // are popular enough to warrant mirroring in this process to // avoid going over the network to fetch from a peer. Having // a hotCache avoids network hotspotting, where a peer's // network card could become the bottleneck on a popular key. // This cache is used sparingly to maximize the total number // of key/value pairs that can be stored globally. hotCache cache //使用lru策略实现的缓存结构,key hash值不再本地,作为热点缓存,负载均衡 // loadGroup ensures that each key is only fetched once // (either locally or remotely), regardless of the number of // concurrent callers. loadGroup flightGroup //使用该结构保证当缓存中不存在key对应的数据时,只有一个goroutine 调用getter函数取数据,其他正在并发的goroutine会等待直到第一个goroutine返回数据,然后大家一起返回数据 // Stats are statistics on the group. Stats Stats //统计信息 } //创建group func NewGroup(name string, cacheBytes int64, getter Getter) *Group { return newGroup(name, cacheBytes, getter, nil) } func newGroup(name string, cacheBytes int64, getter Getter, peers PeerPicker) *Group { if getter == nil { panic("nil Getter") } mu.Lock() defer mu.Unlock() initPeerServerOnce.Do(callInitPeerServer) if _, dup := groups[name]; dup { panic("duplicate registration of group " + name) } g := &Group{ name: name, //maincache、hotcache、peerPick都是在函数调用过程中赋值或初始化的 getter: getter, peers: peers, cacheBytes: cacheBytes, loadGroup: &singleflight.Group{}, } if fn := newGroupHook; fn != nil { fn(g) //此处函数空 } groups[name] = g //保存创建的group return g } //group查找 func (g *Group) Get(ctx Context, key string, dest Sink) error { g.peersOnce.Do(g.initPeers) //把httppool赋值给 groupcache.PeerPicker g.Stats.Gets.Add(1) //统计信息 if dest == nil { return errors.New("groupcache: nil dest Sink") } value, cacheHit := g.lookupCache(key) //从maincache、hotcache查找 if cacheHit { g.Stats.CacheHits.Add(1) return setSinkView(dest, value) } // Optimization to avoid double unmarshalling or copying: keep // track of whether the dest was already populated. One caller // (if local) will set this; the losers will not. The common // case will likely be one caller. destPopulated := false value, destPopulated, err := g.load(ctx, key, dest) //从对等节点或自定义查找逻辑(getter)中获取数据 if err != nil { return err } if destPopulated { return nil } return setSinkView(dest, value) //把数据设置给sink } //从maincache、hotcache查找,cache底层使用链表实现并使用lru策略修改链表 func (g *Group) lookupCache(key string) (value ByteView, ok bool) { if g.cacheBytes <= 0 { return } value, ok = g.mainCache.get(key) if ok { return } value, ok = g.hotCache.get(key) return } //从对等节点或自定义查找逻辑(getter)中获取数据 func (g *Group) load(ctx Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) { g.Stats.Loads.Add(1) viewi, err := g.loadGroup.Do(key, func() (interface{}, error) { //此函数使用flightGroup执行策略,保证只有一个goroutine 调用getter函数取数据 // Check the cache again because singleflight can only dedup calls // that overlap concurrently. It's possible for 2 concurrent // requests to miss the cache, resulting in 2 load() calls. An // unfortunate goroutine scheduling would result in this callback // being run twice, serially. If we don't check the cache again, // cache.nbytes would be incremented below even though there will // be only one entry for this key. // // Consider the following serialized event ordering for two // goroutines in which this callback gets called twice for hte // same key: // 1: Get("key") //展示了一个有可能2个以上的goroutine同时执行进入了load,这样会导致同一个key对应的数据被多次获取并统计,所以又执行了一次g.lookupCache(key) // 2: Get("key") // 1: lookupCache("key") // 2: lookupCache("key") // 1: load("key") // 2: load("key") // 1: loadGroup.Do("key", fn) // 1: fn() // 2: loadGroup.Do("key", fn) // 2: fn() if value, cacheHit := g.lookupCache(key); cacheHit { g.Stats.CacheHits.Add(1) return value, nil } g.Stats.LoadsDeduped.Add(1) var value ByteView var err error if peer, ok := g.peers.PickPeer(key); ok { //通过一致性hash获取对等节点,与httppool对应 value, err = g.getFromPeer(ctx, peer, key) //构造protobuf数据,向其他节点发起http请求,查找数据,并存储到hotcache if err == nil { g.Stats.PeerLoads.Add(1) return value, nil } g.Stats.PeerErrors.Add(1) // TODO(bradfitz): log the peer's error? keep // log of the past few for /groupcachez? It's // probably boring (normal task movement), so not // worth logging I imagine. } value, err = g.getLocally(ctx, key, dest) //调用getter函数获取数据,并存储到maincache if err != nil { g.Stats.LocalLoadErrs.Add(1) return nil, err } g.Stats.LocalLoads.Add(1) destPopulated = true // only one caller of load gets this return value g.populateCache(key, value, &g.mainCache) return value, nil }) if err == nil { value = viewi.(ByteView) } return }
参考
groupcache:https://github.com/golang/groupcache
一致性哈希:http://blog.codinglabs.org/articles/consistent-hashing.html
protobuf:https://developers.google.com/protocol-buffers/docs/overviewgo
protobuf例子:https://godoc.org/github.com/golang/protobuf/proto