当前位置: 首页 > 工具软件 > GroupCache > 使用案例 >

groupcache源码(五)-分布式设计

孟文栋
2023-12-01

一.简介

groupcache分布式设计比较简单,各节点通过http进行交互,传输内容为pb格式。并且使用一致性哈希算法减少扩缩容对数据的影响

二. 开启分布式

func main() {
// groupcache使用
	startHTTP()
// 监听端口,节点对外提供服务,对内可以从远端获取数据
	localUrl := "http://127.0.0.1:" + strconv.Itoa(CachePort)
	peers := groupcache.NewHTTPPool(localUrl)
	peers.Set("http://127.0.0.1:8000", "http://127.0.0.1:8001")

	http.ListenAndServe(":"+strconv.Itoa(CachePort), peers)
}

groupcache开启分布式比较简单,监听http端口,注册各个节点

三. 初始化

// NewHTTPPoolOpts initializes an HTTP pool of peers with the given options.
// Unlike NewHTTPPool, this function does not register the created pool as an HTTP handler.
// The returned *HTTPPool implements http.Handler and must be registered using http.Handle.
func NewHTTPPoolOpts(self string, o *HTTPPoolOptions) *HTTPPool {
// 避免多次初始化
	if httpPoolMade {
		panic("groupcache: NewHTTPPool must be called only once")
	}
	httpPoolMade = true

	p := &HTTPPool{
		self:        self,
		httpGetters: make(map[string]*httpGetter),
	}

	if o != nil { 
		p.opts = *o
	}
// 默认值设置
	if p.opts.BasePath == "" {
		p.opts.BasePath = defaultBasePath
	}
	if p.opts.Replicas == 0 {
		p.opts.Replicas = defaultReplicas
	}
// 一致性哈希实现
	p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
// 注册portPicker, 在数据获取时,就知道开启了分布式,可以从其他节点获取数据
	RegisterPeerPicker(func() PeerPicker { return p })
	return p
}

本方法主要实现了默认值的赋值,字段初始化,分布式portPicker注册

四. 各节点注册

// Set updates the pool's list of peers.
// Each peer value should be a valid base URL,
// for example "http://example.net:8000".
func (p *HTTPPool) Set(peers ...string) {
// 加锁
	p.mu.Lock()
	defer p.mu.Unlock()
// 一致性哈希字段 初始化
	p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
// 添加节点
	p.peers.Add(peers...)
	p.httpGetters = make(map[string]*httpGetter, len(peers))
	for _, peer := range peers {
		p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
	}
}

将各节点加到一致性哈希环中,获取数据时,将key对应的peer取出,然后调用httpGetter的get方法进行加载数据

五. 根据key获取httpGetter

func (p *HTTPPool) PickPeer(key string) (ProtoGetter, bool) {
// 加锁
	p.mu.Lock()
	defer p.mu.Unlock()
	if p.peers.IsEmpty() {
		return nil, false
	}
// 根据缓存key获取对应的httpGetter, 如果不是本节点 则返回
// 如果是本节点,返回false, 从本地获取数据
	if peer := p.peers.Get(key); peer != p.self {
		return p.httpGetters[peer], true
	}
	return nil, false
}

根据缓存的key获取数据从哪个节点获取数据,如果是本地,在外层直接调用本地方法加载数据,否则返回key对应得httpGetter,外层调用httpGetter的get方法从远端加载数据

六. 通过http获取远端数据

func (h *httpGetter) Get(ctx context.Context, in *pb.GetRequest, out *pb.GetResponse) error {
// http请求的url
	u := fmt.Sprintf(
		"%v%v/%v",
		h.baseURL,
		url.QueryEscape(in.GetGroup()),
		url.QueryEscape(in.GetKey()),
	)
	req, err := http.NewRequest("GET", u, nil)
	if err != nil {
		return err
	}
	req = req.WithContext(ctx)
	tr := http.DefaultTransport
	if h.transport != nil {
		tr = h.transport(ctx)
	}
	res, err := tr.RoundTrip(req)
	if err != nil {
		return err
	}
	defer res.Body.Close()
	if res.StatusCode != http.StatusOK {
		return fmt.Errorf("server returned: %v", res.Status)
	}
	b := bufferPool.Get().(*bytes.Buffer)
	b.Reset()
	defer bufferPool.Put(b)
/ 读取数据
	_, err = io.Copy(b, res.Body)
	if err != nil {
		return fmt.Errorf("reading response body: %v", err)
	}
	err = proto.Unmarshal(b.Bytes(), out)
	if err != nil {
		return fmt.Errorf("decoding response body: %v", err)
	}
	return nil
}

七. 实现http的ServeHTTP方法

func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	// Parse request.
// 特定url判断
	if !strings.HasPrefix(r.URL.Path, p.opts.BasePath) {
		panic("HTTPPool serving unexpected path: " + r.URL.Path)
	}
	parts := strings.SplitN(r.URL.Path[len(p.opts.BasePath):], "/", 2)
	if len(parts) != 2 {
		http.Error(w, "bad request", http.StatusBadRequest)
		return
	}
	groupName := parts[0]
	key := parts[1]

	// Fetch the value for this group/key.
// 通过组名获取对应的group
	group := GetGroup(groupName)
	if group == nil {
		http.Error(w, "no such group: "+groupName, http.StatusNotFound)
		return
	}
	var ctx context.Context
	if p.Context != nil {
		ctx = p.Context(r)
	} else {
		ctx = r.Context()
	}

	group.Stats.ServerRequests.Add(1)
	var value []byte
// 查询数据,和本地获取数据方法一致
	err := group.Get(ctx, key, AllocatingByteSliceSink(&value))
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	// Write the value to the response body as a proto message.
// pb压缩数据
	body, err := proto.Marshal(&pb.GetResponse{Value: value})
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
// 数据返回
	w.Header().Set("Content-Type", "application/x-protobuf")
	w.Write(body)
}

节点接收到http请求时,根据group名字找到group, 查询本节点数据,然后pb处理后将数据返回

八. 总结

通过上述代码可以发现,groupcache就是通过http进行信息交互,使用pb减少传输信息的大小,利用一致性哈希算法确定key所在节点,最终达到分布式

 类似资料: