当前位置: 首页 > 工具软件 > Discoverd > 使用案例 >

Flynn之Discoverd

华鹭洋
2023-12-01

Flynn的服务发现组件(discoverd)部署在所有的Flynn集群节点上,该组件使用了Raft协议保证数据的一致性,目前采用的是hashicorp的实现(https://github.com/hashicorp/raft),我们知道在Raft协议中,如果参与选举的节点太多,会导致性能下降,那是不是说Flynn不支持大规模的节点呢?

Flynn是能够支持大规模节点的,虽然discoverd组件部署在所有的节点上,但并不是所有的节点都参与选举,只有部分节点作为Raft集群的节点,其余节点作为代理节点(proxying),Flynn通过以下逻辑判断是否是代理节点,启动discoverd时指定的-peers参数起了关键作用。

	// if the advertise addr is not in the peer list we are proxying
	proxying := true
	for _, addr := range m.peers {
		if addr == m.advertiseAddr {
			proxying = false
			break
		}
	}

discoverd组件本身依赖Raft协议保证数据的一致性,这里提到的数据,在discoverd里指的是服务,我们可以把我们的服务注册到discoverd上,我们的这些服务可能也需要一个Leader节点,例如Flynn的调度组件(scheduler)就只能在Leader节点上执行调度任务。那是否这些服务也是依赖Raft协议来选择Leader节点呢?

注册到discoverd组件上的服务不依赖Raft协议选择Leader,discoverd组件根据注册时间的长短来选择Leader,活的最长的服务节点被选择为Leader。所有注册到discoverd组件上的服务必须向discoverd组件发送心跳信息,discoverd组件如果检测到某个服务节点没有了心跳信息,就会把该节点移除,如果该节点恰好是Leader节点,那么就会触发重新选择Leader的动作。

以下代码是心跳检查相关代码

// Open starts the raft consensus and opens the store.
func (s *Store) Open() error {
	go s.expirer()

	return nil
}

// expirer runs in a separate goroutine and checks for instance expiration.
func (s *Store) expirer() {
	defer s.wg.Done()

	ticker := time.NewTicker(s.ExpiryCheckInterval)
	defer ticker.Stop()

	for {
		// Wait for next check or for close signal.
		select {
		case <-s.closing:
			return
		case <-ticker.C:
		}

		// Check all instances for expiration.
		if err := s.EnforceExpiry(); err != nil && err != raft.ErrNotLeader {
			s.logger.Printf("enforce expiry: %s", err)
		}
	}
}

// 以下代码有删减,仅保留主要逻辑
// EnforceExpiry checks all instances for expiration and issues an expiration command, if necessary.
// This function returns raft.ErrNotLeader if this store is not the current leader.
func (s *Store) EnforceExpiry() error {
	var cmd []byte
	// Ignore if this store is not the leader and hasn't been for at least 2 TTLs intervals.
	if !s.IsLeader() {
		return raft.ErrNotLeader
	} else if s.leaderTime.IsZero() || time.Since(s.leaderTime) < (2*s.InstanceTTL) {
		return ErrLeaderWait
	}

	// Iterate over services and then instances.
	var instances []expireInstance
	for service, m := range s.data.Instances {
		for _, inst := range m {
			// Ignore instances that have heartbeated within the TTL.
			if t := s.heartbeats[instanceKey{service, inst.ID}]; time.Since(t) <= s.InstanceTTL {
				continue
			}

			// Add to list of instances to expire.
			// The current expiry time is added to prevent a race condition of
			// instances updating their expiry date while this command is applying.
			instances = append(instances, expireInstance{
				Service:    service,
				InstanceID: inst.ID,
			})
		}
	}

	// Create command to expire instances.
	cmd, err := json.Marshal(&expireInstancesCommand{
		Instances: instances,
	})

	// Apply command to raft.
	if _, err := s.raftApply(expireInstancesCommandType, cmd); err != nil {
		return err
	}
	return nil
}

以下是出发重新选举的代码:

func (s *Store) Apply(l *raft.Log) interface{} {
	// Extract the command type and data.
	typ, cmd := l.Data[0], l.Data[1:]

	// Determine the command type by the first byte.
	switch typ {

	case expireInstancesCommandType:
		return s.applyExpireInstancesCommand(cmd)
	default:
		return fmt.Errorf("invalid command type: %d", typ)
	}
}

func (s *Store) applyExpireInstancesCommand(cmd []byte) error {
	var c expireInstancesCommand
	if err := json.Unmarshal(cmd, &c); err != nil {
		return err
	}

	// Iterate over instances and remove ones with matching expiry times.
	services := make(map[string]struct{})
	for _, expireInstance := range c.Instances {
		// Remove instance.
		delete(m, expireInstance.InstanceID)

		// Broadcast down event.
		s.broadcast(&discoverd.Event{
			Service:  expireInstance.Service,
			Kind:     discoverd.EventKindDown,
			Instance: inst,
		})

		// Keep track of services invalidated.
		services[expireInstance.Service] = struct{}{}
	}

	// Invalidate all services that had expirations.
	for service := range services {
		s.invalidateServiceLeader(service)
	}

	return nil
}

// invalidateServiceLeader updates the current leader of service.
func (s *Store) invalidateServiceLeader(service string) {
	// Retrieve service config.
	c := s.data.Services[service]

	// Ignore if there is no config or the leader is manually elected.
	if c == nil || c.LeaderType == discoverd.LeaderTypeManual {
		return
	}

	// Retrieve current leader ID.
	prevLeaderID := s.data.Leaders[service]

	// Find the oldest, non-expired instance.
	var leader *discoverd.Instance
	for _, inst := range s.data.Instances[service] {
		if leader == nil || inst.Index < leader.Index {
			leader = inst
		}
	}

	// Retrieve the leader ID.
	var leaderID string
	if leader != nil {
		leaderID = leader.ID
	}

	// Set leader.
	s.data.Leaders[service] = leaderID

	// Broadcast event.
	if prevLeaderID != leaderID {
		var inst *discoverd.Instance
		if s.data.Instances[service] != nil {
			inst = s.data.Instances[service][leaderID]
		}

		s.broadcast(&discoverd.Event{
			Service:  service,
			Kind:     discoverd.EventKindLeader,
			Instance: inst,
		})
	}
}

心跳信息是在哪里呢,估计都想到了,一定是在注册服务的那里。discoverd提供了对应的客户端,代码在discoverd/client里面,有个heartbeat.go就是专门来发心跳的。我们可以通过discoverd客户端的AddServiceAndRegister方法来完成服务注册功能

func (c *Client) AddServiceAndRegister(service, addr string) (Heartbeater, error) {
	if err := c.maybeAddService(service); err != nil {
		return nil, err
	}
	return c.Register(service, addr)
}

func (c *Client) Register(service, addr string) (Heartbeater, error) {
	return c.RegisterInstance(service, &Instance{Addr: addr})
}

func (c *Client) RegisterInstance(service string, inst *Instance) (Heartbeater, error) {
	h := newHeartbeater(c, service, inst)
	err := runAttempts.Run(func() error {
		firstErr := make(chan error)
		go h.run(firstErr)
		return <-firstErr
	})
	if err != nil {
		return nil, err
	}

	return h, nil
}

func (h *heartbeater) run(firstErr chan<- error) {
	path := fmt.Sprintf("/services/%s/instances/%s", h.service, h.inst.ID)
	register := func() error {
		h.Lock()
		defer h.Unlock()
		return h.client().Put(path, h.inst, nil)
	}

	timer := time.NewTimer(nextHeartbeat())
	for {
		select {
		case <-timer.C:
			if err := register(); err != nil {
				timer.Reset(nextHeartbeatFailing())
				break
			}
			timer.Reset(nextHeartbeat())
		case <-h.stop:
			h.client().Delete(path)
			close(h.done)
			return
		}
	}
}

discoverd也是以Http协议提供服务,比如通过GET /services/abc/leader来获取abc服务的Leader节点,当然,也可以使用SSE协议来监听abc服务的Leader变化事件。Flynn的调度组件(scheduler)就是采用SSE协议监听Leader节点的变化。

	r.PUT("/services/:service/leader", h.servePutLeader)
	r.GET("/services/:service/leader", h.serveGetLeader)

 

转载于:https://my.oschina.net/chinamerp/blog/1621667

 类似资料: