当前位置: 首页 > 工具软件 > Orchestrator > 使用案例 >

Orchestrator核心之失败探测

柴宝
2023-12-01

前言

上篇文章中:《orchestrator的discover模块》主要讲述的是在client发出discover这个命令后,orchestrator服务端所采取的动作,那么,后续的持续discover,是如何实现,并且如何探测到失败实例的呢,今天这篇文章就来讲述这方面的内容。

持续发现

入口函数在:logic/orchestrator.go文件:
ContinuousDiscovery()函数调用handleDiscoveryRequests(),handleDiscoveryRequests()该函数就是真正在doscover的动作。

func ContinuousDiscovery() {
	...
	go handleDiscoveryRequests()
	...

handleDiscoveryRequests()开启了DiscoveryMaxConcurrency(默认300)个协程,调用DiscoverInstance()去发现每个实例。

func handleDiscoveryRequests() {
	discoveryQueue = discovery.CreateOrReturnQueue("DEFAULT")

	// create a pool of discovery workers
	for i := uint(0); i < config.Config.DiscoveryMaxConcurrency; i++ {
		go func() {
			for {
				instanceKey := discoveryQueue.Consume()
				// Possibly this used to be the elected node, but has
				// been demoted, while still the queue is full.
				if !IsLeaderOrActive() {
					log.Debugf("Node apparently demoted. Skipping discovery of %+v. "+
						"Remaining queue size: %+v", instanceKey, discoveryQueue.QueueLen())
					discoveryQueue.Release(instanceKey)
					continue
				}

				DiscoverInstance(instanceKey)
				discoveryQueue.Release(instanceKey)
			}
		}()
	}
}

DiscoverInstance()会调用inst.ReadTopologyInstanceBufferable()来做真正的探测过程。下面来具体解析一下inst.ReadTopologyInstanceBufferable()来做来些什么。

func DiscoverInstance(instanceKey inst.InstanceKey) {
	...
	// First we've ever heard of this instance. Continue investigation:
	instance, err = inst.ReadTopologyInstanceBufferable(&instanceKey, config.Config.BufferInstanceWrites, latency)
	...
}

inst.ReadTopologyInstanceBufferable发现

该函数在inst/instance_dao.go文件中。代码内容有点长,来解释一些比较重要的点,其他做了简化删除处理。

func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, latency *stopwatch.NamedStopwatch) (inst *Instance, err error) {

	instanceFound := false	// 这个也比较重要,后面goto执行判断用的,具体下面说
	partialSuccess := false		// 这个参数很重要,是控制探测成功还是失败的标志,初始化置为false
	foundByShowSlaveHosts := false // 这个在上篇文章中说了,discover用的

	// 这里的UpdateInstanceLastAttemptedCheck作用就是更新last_attempted_check这个字断为now()。目的是防止在实例在被挂起时,发生切换,这是一种保护机制。在调用readtopology()的时候被卡住。如果last_attempted_check >= last_checked,那么表示有夯死。
	if !instanceKey.IsValid() {
		latency.Start("backend")
		if err := UpdateInstanceLastAttemptedCheck(instanceKey); err != nil {
			log.Errorf("ReadTopologyInstanceBufferable: %+v: %v", instanceKey, err)
		}
		latency.Stop("backend")
		return instance, fmt.Errorf("ReadTopologyInstance will not act on invalid instance key: %+v", *instanceKey)
	}

	

	if isMaxScale, resolvedHostname, err = instance.checkMaxScale(db, latency); err != nil {
		// We do not "goto Cleanup" here, although it should be the correct flow.
		// Reason is 5.7's new security feature that requires GRANTs on performance_schema.session_variables.
		// There is a wrong decision making in this design and the migration path to 5.7 will be difficult.
		// I don't want orchestrator to put even more burden on this.
		// If the statement errors, then we are unable to determine that this is maxscale, hence assume it is not.
		// In which case there would be other queries sent to the server that are not affected by 5.7 behavior, and that will fail.

		// Certain errors are not recoverable (for this discovery process) so it's fine to go to Cleanup
		// 这里官方注释说明一切,大致意思就是说5.7的新安全特性问题,所以不会跳转到Cleanup
		if unrecoverableError(err) {
			goto Cleanup
		}
	}

	latency.Start("instance")
	if isMaxScale {
	// 判断类型,这里不看,因为我们不是MaxScale
		
	} else {
		// NOT MaxScale
		// 这里就开始真正的discover了,都是一个代码块一个代码块的
		// We begin with a few operations we can run concurrently, and which do not depend on anything
		{
			waitGroup.Add(1)
			go func() {
				defer waitGroup.Done()
				var dummy string
				// show global status works just as well with 5.6 & 5.7 (5.7 moves variables to performance_schema)
				err := db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &instance.Uptime)

				if err != nil {
					logReadTopologyInstanceError(instanceKey, "show global status like 'Uptime'", err)

					// We do not "goto Cleanup" here, although it should be the correct flow.
					// Reason is 5.7's new security feature that requires GRANTs on performance_schema.global_variables.
					// There is a wrong decisionmaking in this design and the migration path to 5.7 will be difficult.
					// I don't want orchestrator to put even more burden on this. The 'Uptime' variable is not that important
					// so as to completely fail reading a 5.7 instance.
					// This is supposed to be fixed in 5.7.9
				}
				errorChan <- err
			}()
		}

		var mysqlHostname, mysqlReportHost string
		err = db.QueryRow("select @@global.hostname, ifnull(@@global.report_host, ''), @@global.server_id, @@global.version, @@global.version_comment, @@global.read_only, @@global.binlog_format, @@global.log_bin, @@global.log_slave_updates, @@global.log_bin_index").Scan(
			&mysqlHostname, &mysqlReportHost, &instance.ServerID, &instance.Version, &instance.VersionComment, &instance.ReadOnly, &instance.Binlog_format, &instance.LogBinEnabled, &instance.LogReplicationUpdatesEnabled, &instance.MyBinlogPath)
		if err != nil {
			goto Cleanup
		}
		// 这里就很重要了,如果上述的语句执行成功了,那么partialSuccess设置为true,如果执行失败了,就直接跳转到Cleanup。也就是说,如果探测失败了,就直接会调用Cleanup里面的UpdateInstanceLastChecked函数,去更新last_checked和last_check_partial_success两个字断了,来表示探测失败,后面就是通过sql来触发失败切换,关于失败类型,会在其他文章中解析*
		partialSuccess = true // We at least managed to read something from the server.
		// 这里上篇文章讲过,略过
		// 下面的都是一些探测的东西,就是通过语句或者配置文件中的配置去相应的实例拿数据。略过
		switch strings.ToLower(config.Config.MySQLHostnameResolveMethod) {
		case "none":
			resolvedHostname = instance.Key.Hostname
		case "default", "hostname", "@@hostname":
			resolvedHostname = mysqlHostname
		case "report_host", "@@report_host":
			if mysqlReportHost == "" {
				err = fmt.Errorf("MySQLHostnameResolveMethod configured to use @@report_host but %+v has NULL/empty @@report_host", instanceKey)
				goto Cleanup
			}
			resolvedHostname = mysqlReportHost
		default:
			resolvedHostname = instance.Key.Hostname
		}

		
	// 在这里置为了true,在经过上面一些列探测后,如果没有goto Cleanup,那么会到这步,把instanceFound设置为true,下面的Cleanup模块会用到这个变量
	instanceFound = true
	// 下面也很简单,也是去实例拿数据,不过我改造成去元数据库去拿了,掠过
	...
	
// 这里是比较重要的一块,即当上面的油探测失败的时候,就会goto 跳转到Cleanup这里,不了解golang的goto语句,可以去了解一下
Cleanup:
	waitGroup.Wait()
	close(errorChan)
	err = func() error {
		if err != nil {
			return err
		}

		for err := range errorChan {
			if err != nil {
				return err
			}
		}
		return nil
	}()
	// 所有探测都成功的时候,instanceFound是true的,但是呢,前面只有跳转的语句,后面即使instanceFound设置为true了,Cleanup也不会走这块代码的。所以直接跳过看就好
	if instanceFound {
		...
	// UpdateInstanceLastChecked是非常重要的一个函数,更新last_checked和last_check_partial_success的值,这里的partialSuccess必然是false了,表示探测失败了
	_ = UpdateInstanceLastChecked(&instance.Key, partialSuccess)
	latency.Stop("backend")
	return nil, err
}

后记

其实orchestrator的探测也是比较简单的,相对比较难的地方是在失败类型判定那块。
关于失败类型的判定,可以看我Orchestrator核心之失败类型判定 这篇文章,具体的分析,在慢慢完善中。

 类似资料: