上篇文章中:《orchestrator的discover模块》主要讲述的是在client发出discover这个命令后,orchestrator服务端所采取的动作,那么,后续的持续discover,是如何实现,并且如何探测到失败实例的呢,今天这篇文章就来讲述这方面的内容。
入口函数在:logic/orchestrator.go文件:
ContinuousDiscovery()函数调用handleDiscoveryRequests(),handleDiscoveryRequests()该函数就是真正在doscover的动作。
func ContinuousDiscovery() {
...
go handleDiscoveryRequests()
...
handleDiscoveryRequests()开启了DiscoveryMaxConcurrency(默认300)个协程,调用DiscoverInstance()去发现每个实例。
func handleDiscoveryRequests() {
discoveryQueue = discovery.CreateOrReturnQueue("DEFAULT")
// create a pool of discovery workers
for i := uint(0); i < config.Config.DiscoveryMaxConcurrency; i++ {
go func() {
for {
instanceKey := discoveryQueue.Consume()
// Possibly this used to be the elected node, but has
// been demoted, while still the queue is full.
if !IsLeaderOrActive() {
log.Debugf("Node apparently demoted. Skipping discovery of %+v. "+
"Remaining queue size: %+v", instanceKey, discoveryQueue.QueueLen())
discoveryQueue.Release(instanceKey)
continue
}
DiscoverInstance(instanceKey)
discoveryQueue.Release(instanceKey)
}
}()
}
}
DiscoverInstance()会调用inst.ReadTopologyInstanceBufferable()来做真正的探测过程。下面来具体解析一下inst.ReadTopologyInstanceBufferable()来做来些什么。
func DiscoverInstance(instanceKey inst.InstanceKey) {
...
// First we've ever heard of this instance. Continue investigation:
instance, err = inst.ReadTopologyInstanceBufferable(&instanceKey, config.Config.BufferInstanceWrites, latency)
...
}
该函数在inst/instance_dao.go文件中。代码内容有点长,来解释一些比较重要的点,其他做了简化删除处理。
func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, latency *stopwatch.NamedStopwatch) (inst *Instance, err error) {
instanceFound := false // 这个也比较重要,后面goto执行判断用的,具体下面说
partialSuccess := false // 这个参数很重要,是控制探测成功还是失败的标志,初始化置为false
foundByShowSlaveHosts := false // 这个在上篇文章中说了,discover用的
// 这里的UpdateInstanceLastAttemptedCheck作用就是更新last_attempted_check这个字断为now()。目的是防止在实例在被挂起时,发生切换,这是一种保护机制。在调用readtopology()的时候被卡住。如果last_attempted_check >= last_checked,那么表示有夯死。
if !instanceKey.IsValid() {
latency.Start("backend")
if err := UpdateInstanceLastAttemptedCheck(instanceKey); err != nil {
log.Errorf("ReadTopologyInstanceBufferable: %+v: %v", instanceKey, err)
}
latency.Stop("backend")
return instance, fmt.Errorf("ReadTopologyInstance will not act on invalid instance key: %+v", *instanceKey)
}
if isMaxScale, resolvedHostname, err = instance.checkMaxScale(db, latency); err != nil {
// We do not "goto Cleanup" here, although it should be the correct flow.
// Reason is 5.7's new security feature that requires GRANTs on performance_schema.session_variables.
// There is a wrong decision making in this design and the migration path to 5.7 will be difficult.
// I don't want orchestrator to put even more burden on this.
// If the statement errors, then we are unable to determine that this is maxscale, hence assume it is not.
// In which case there would be other queries sent to the server that are not affected by 5.7 behavior, and that will fail.
// Certain errors are not recoverable (for this discovery process) so it's fine to go to Cleanup
// 这里官方注释说明一切,大致意思就是说5.7的新安全特性问题,所以不会跳转到Cleanup
if unrecoverableError(err) {
goto Cleanup
}
}
latency.Start("instance")
if isMaxScale {
// 判断类型,这里不看,因为我们不是MaxScale
} else {
// NOT MaxScale
// 这里就开始真正的discover了,都是一个代码块一个代码块的
// We begin with a few operations we can run concurrently, and which do not depend on anything
{
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
var dummy string
// show global status works just as well with 5.6 & 5.7 (5.7 moves variables to performance_schema)
err := db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &instance.Uptime)
if err != nil {
logReadTopologyInstanceError(instanceKey, "show global status like 'Uptime'", err)
// We do not "goto Cleanup" here, although it should be the correct flow.
// Reason is 5.7's new security feature that requires GRANTs on performance_schema.global_variables.
// There is a wrong decisionmaking in this design and the migration path to 5.7 will be difficult.
// I don't want orchestrator to put even more burden on this. The 'Uptime' variable is not that important
// so as to completely fail reading a 5.7 instance.
// This is supposed to be fixed in 5.7.9
}
errorChan <- err
}()
}
var mysqlHostname, mysqlReportHost string
err = db.QueryRow("select @@global.hostname, ifnull(@@global.report_host, ''), @@global.server_id, @@global.version, @@global.version_comment, @@global.read_only, @@global.binlog_format, @@global.log_bin, @@global.log_slave_updates, @@global.log_bin_index").Scan(
&mysqlHostname, &mysqlReportHost, &instance.ServerID, &instance.Version, &instance.VersionComment, &instance.ReadOnly, &instance.Binlog_format, &instance.LogBinEnabled, &instance.LogReplicationUpdatesEnabled, &instance.MyBinlogPath)
if err != nil {
goto Cleanup
}
// 这里就很重要了,如果上述的语句执行成功了,那么partialSuccess设置为true,如果执行失败了,就直接跳转到Cleanup。也就是说,如果探测失败了,就直接会调用Cleanup里面的UpdateInstanceLastChecked函数,去更新last_checked和last_check_partial_success两个字断了,来表示探测失败,后面就是通过sql来触发失败切换,关于失败类型,会在其他文章中解析*
partialSuccess = true // We at least managed to read something from the server.
// 这里上篇文章讲过,略过
// 下面的都是一些探测的东西,就是通过语句或者配置文件中的配置去相应的实例拿数据。略过
switch strings.ToLower(config.Config.MySQLHostnameResolveMethod) {
case "none":
resolvedHostname = instance.Key.Hostname
case "default", "hostname", "@@hostname":
resolvedHostname = mysqlHostname
case "report_host", "@@report_host":
if mysqlReportHost == "" {
err = fmt.Errorf("MySQLHostnameResolveMethod configured to use @@report_host but %+v has NULL/empty @@report_host", instanceKey)
goto Cleanup
}
resolvedHostname = mysqlReportHost
default:
resolvedHostname = instance.Key.Hostname
}
// 在这里置为了true,在经过上面一些列探测后,如果没有goto Cleanup,那么会到这步,把instanceFound设置为true,下面的Cleanup模块会用到这个变量
instanceFound = true
// 下面也很简单,也是去实例拿数据,不过我改造成去元数据库去拿了,掠过
...
// 这里是比较重要的一块,即当上面的油探测失败的时候,就会goto 跳转到Cleanup这里,不了解golang的goto语句,可以去了解一下
Cleanup:
waitGroup.Wait()
close(errorChan)
err = func() error {
if err != nil {
return err
}
for err := range errorChan {
if err != nil {
return err
}
}
return nil
}()
// 所有探测都成功的时候,instanceFound是true的,但是呢,前面只有跳转的语句,后面即使instanceFound设置为true了,Cleanup也不会走这块代码的。所以直接跳过看就好
if instanceFound {
...
// UpdateInstanceLastChecked是非常重要的一个函数,更新last_checked和last_check_partial_success的值,这里的partialSuccess必然是false了,表示探测失败了
_ = UpdateInstanceLastChecked(&instance.Key, partialSuccess)
latency.Stop("backend")
return nil, err
}
其实orchestrator的探测也是比较简单的,相对比较难的地方是在失败类型判定那块。
关于失败类型的判定,可以看我Orchestrator核心之失败类型判定 这篇文章,具体的分析,在慢慢完善中。