groupcache分布式设计比较简单,各节点通过http进行交互,传输内容为pb格式。并且使用一致性哈希算法减少扩缩容对数据的影响
func main() {
// groupcache使用
startHTTP()
// 监听端口,节点对外提供服务,对内可以从远端获取数据
localUrl := "http://127.0.0.1:" + strconv.Itoa(CachePort)
peers := groupcache.NewHTTPPool(localUrl)
peers.Set("http://127.0.0.1:8000", "http://127.0.0.1:8001")
http.ListenAndServe(":"+strconv.Itoa(CachePort), peers)
}
groupcache开启分布式比较简单,监听http端口,注册各个节点
// NewHTTPPoolOpts initializes an HTTP pool of peers with the given options.
// Unlike NewHTTPPool, this function does not register the created pool as an HTTP handler.
// The returned *HTTPPool implements http.Handler and must be registered using http.Handle.
func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
// 避免多次初始化
if httpPoolMade {
panic("groupcache: NewHTTPPool must be called only once")
}
httpPoolMade = true
p := &HTTPPool{
self: self,
httpGetters: make(map[string]*httpGetter),
}
if o != nil {
p.opts = *o
}
// 默认值设置
if p.opts.BasePath == "" {
p.opts.BasePath = defaultBasePath
}
if p.opts.Replicas == 0 {
p.opts.Replicas = defaultReplicas
}
// 一致性哈希实现
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
// 注册portPicker, 在数据获取时,就知道开启了分布式,可以从其他节点获取数据
RegisterPeerPicker(func() PeerPicker { return p })
return p
}
本方法主要实现了默认值的赋值,字段初始化,分布式portPicker注册
// Set updates the pool's list of peers.
// Each peer value should be a valid base URL,
// for example "http://example.net:8000".
func (p *HTTPPool) Set(peers ...string) {
// 加锁
p.mu.Lock()
defer p.mu.Unlock()
// 一致性哈希字段 初始化
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
// 添加节点
p.peers.Add(peers...)
p.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
}
}
将各节点加到一致性哈希环中,获取数据时,将key对应的peer取出,然后调用httpGetter的get方法进行加载数据
func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
// 加锁
p.mu.Lock()
defer p.mu.Unlock()
if p.peers.IsEmpty() {
return nil, false
}
// 根据缓存key获取对应的httpGetter, 如果不是本节点 则返回
// 如果是本节点,返回false, 从本地获取数据
if peer := p.peers.Get(key); peer != p.self {
return p.httpGetters[peer], true
}
return nil, false
}
根据缓存的key获取数据从哪个节点获取数据,如果是本地,在外层直接调用本地方法加载数据,否则返回key对应得httpGetter,外层调用httpGetter的get方法从远端加载数据
func (h *httpGetter) Get(ctx context.Context, in *pb.GetRequest, out *pb.GetResponse) error {
// http请求的url
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(in.GetGroup()),
url.QueryEscape(in.GetKey()),
)
req, err := http.NewRequest("GET", u, nil)
if err != nil {
return err
}
req = req.WithContext(ctx)
tr := http.DefaultTransport
if h.transport != nil {
tr = h.transport(ctx)
}
res, err := tr.RoundTrip(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("server returned: %v", res.Status)
}
b := bufferPool.Get().(*bytes.Buffer)
b.Reset()
defer bufferPool.Put(b)
/ 读取数据
_, err = io.Copy(b, res.Body)
if err != nil {
return fmt.Errorf("reading response body: %v", err)
}
err = proto.Unmarshal(b.Bytes(), out)
if err != nil {
return fmt.Errorf("decoding response body: %v", err)
}
return nil
}
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Parse request.
// 特定url判断
if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) {
panic("HTTPPool serving unexpected path: " + r.URL.Path)
}
parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
groupName := parts[0]
key := parts[1]
// Fetch the value for this group/key.
// 通过组名获取对应的group
group := GetGroup(groupName)
if group == nil {
http.Error(w, "no such group: "+groupName, http.StatusNotFound)
return
}
var ctx context.Context
if p.Context != nil {
ctx = p.Context(r)
} else {
ctx = r.Context()
}
group.Stats.ServerRequests.Add(1)
var value []byte
// 查询数据,和本地获取数据方法一致
err := group.Get(ctx, key, AllocatingByteSliceSink(&value))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Write the value to the response body as a proto message.
// pb压缩数据
body, err := proto.Marshal(&pb.GetResponse{Value: value})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 数据返回
w.Header().Set("Content-Type", "application/x-protobuf")
w.Write(body)
}
节点接收到http请求时,根据group名字找到group, 查询本节点数据,然后pb处理后将数据返回
通过上述代码可以发现,groupcache就是通过http进行信息交互,使用pb减少传输信息的大小,利用一致性哈希算法确定key所在节点,最终达到分布式