当前位置: 首页 > 工具软件 > Turn Watcher > 使用案例 >

【Istio】源码解读Istio Watcher 证书更新后Istio执行的逻辑

仇正豪
2023-12-01

源码地址 https://github.com/istio/istio/blob/master/pilot/pkg/proxy/envoy/watcher.go
从这份源代码中主要可以看出 在证书变更之后,Istio执行了那些逻辑。
Watcher这份Golang源代码主要是对Istio Certificate目录监管 如果Certificate目录发生变化 则将所有的Certificate文件进行SHA256加密后,传到Agent的配置文件中。

解析主要代码: (官文文档注解要是英文 我所加的注解主要是中文如果碰到我无法理解的东西也会使用相关英文但是会加标记 以著名是我加的注解)

func (w *watcher) Run(ctx context.Context) {
    // agent consumes notifications from the controller
    go w.agent.Run(ctx)
    // w.agent.Run()的功能是启动Proxy 相关代码可以在下方找到
    //https://github.com/istio/istio/blob/master/pilot/pkg/proxy/agent.go
    // kickstart the proxy with partial state (in case there are no notifications coming)
    w.Reload()
    //这里的Reload就是将所有的Certificates文件内容进行SHA256加密然后传到Agent的配置文件中
    // monitor certificates
    certDirs := make([]string, 0, len(w.certs))
    for _, cert := range w.certs {
        certDirs = append(certDirs, cert.Directory)
        //形成一个Certificate所在目录的目录通道
    }
    //主要监管的函数
    go watchCerts(ctx, certDirs, watchFileEvents, defaultMinDelay, w.Reload)
    go w.retrieveAZ(ctx, azRetryInterval, azRetryAttempts)

    <-ctx.Done()
}

从上面代码主要就是对Certificate进行监管 也是这份源代码中的主要方法

// 一旦文件目录文件有所改变 就要执行这个方法
//w.agent.ScheduleConfigUpdate(h.Sum(nil)) 该h.sum将h中的数据进行校验和放在接收的参数之后
// 不过这里接受的参数是nil也就是单单一个检验和就可以了
//最后将校验和覆盖agent原config
func (w *watcher) Reload() {
    h := sha256.New()
    for _, cert := range w.certs {
        generateCertHash(h, cert.Directory, cert.Files)
        //该方法 将cert中的数据加入h中
    }

    w.agent.ScheduleConfigUpdate(h.Sum(nil))
}
func generateCertHash(h hash.Hash, certsDir string, files []string) {
    if _, err := os.Stat(certsDir); os.IsNotExist(err) {
        return
    }

    for _, file := range files {
        filename := path.Join(certsDir, file)
        bs, err := ioutil.ReadFile(filename)
        if err != nil {
            // log.Warnf("failed to read file %q", filename)
            continue
        }
        if _, err := h.Write(bs); err != nil {
            log.Warna(err)
        }
    }
}

再来看执行的实际监管方法

// watchCerts watches all certificate directories and calls the provided
// `updateFunc` method when changes are detected. This method is blocking
// so it should be run as a goroutine.
// updateFunc will not be called more than one time per minDelay.
//这段注释是在说watchCerts方法在监管所有的Certificate目录,当变化(文件目录下由新建文件,删除文件,修改文件,改名)发生时要执行一次updateFunc方法(也就是Reload),这个方法在minDelay中最多启动一个。并且是作为Goroutine执行(这里我理解为一个线程)
func watchCerts(ctx context.Context, certsDirs []string, watchFileEventsFn watchFileEventsFn,
    minDelay time.Duration, updateFunc func()) {
    fw, err := fsnotify.NewWatcher()
    if err != nil {
        log.Warnf("failed to create a watcher for certificate files: %v", err)
        return
    }
    defer func() {
        if err := fw.Close(); err != nil {
            log.Warnf("closing watcher encounters an error %v", err)
        }
    }()

    // watch all directories
    for _, d := range certsDirs {
        //通过Watch作为监管文件目录有没有发生变化
        if err := fw.Watch(d); err != nil {
            log.Warnf("watching %s encounters an error %v", d, err)
            return
        }
    }
    watchFileEventsFn(ctx, fw.Event, minDelay, updateFunc)
}

watchFileEventsFn() 方法就是控制每当文件目录发生变化的时候,minDelay时间内最多执行一次

type watchFileEventsFn func(ctx context.Context, wch <-chan *fsnotify.FileEvent,
    minDelay time.Duration, notifyFn func())

// watchFileEvents watches for changes on a channel and notifies via notifyFn().
// The function batches changes so that related changes are processed together.
// The function ensures that notifyFn() is called no more than one time per minDelay.
// The function does not return until the the context is cancelled.
//这段注解就是再说监管Channel(通道)有无变化,有变化就调用notifyFn()方法(也就是Reload()方法),并这个方法将保证
//本方法能批量执行,以保证相关变化能一起执行,且notifyFn()方法在minDelay时间内最多执行一次,这个方法不会结束直到Context被取消(不知道什么意思)
func watchFileEvents(ctx context.Context, wch <-chan *fsnotify.FileEvent, minDelay time.Duration, notifyFn func()) {
    // timer and channel for managing minDelay.
    var timeChan <-chan time.Time
    var timer *time.Timer

    for {
        select {
        case ev := <-wch:
            log.Infof("watchFileEvents: %s", ev.String())
            if timer != nil {
                continue
            }
            // create new timer
            //time.NewTime方法,将等待minDelay时间,并返回一个Timer
            timer = time.NewTimer(minDelay)
            timeChan = timer.C
        case <-timeChan:
            // reset timer
            timeChan = nil
            timer.Stop()
            timer = nil
            log.Info("watchFileEvents: notifying")
            //执行notifyFn()方法
            notifyFn()
        case <-ctx.Done():
            log.Info("watchFileEvents has terminated")
            return
        }
    }
}
// retrieveAZ will only run once and then exit because AZ won't change over a proxy's lifecycle
// it has to use a reload due to limitations with envoy (az has to be passed in as a flag)
// 由于Envoy限制的原因,它调用Reload()方法
func (w *watcher) retrieveAZ(ctx context.Context, delay time.Duration, retries int) {
    if !model.IsApplicationNodeType(w.role.Type) {
        return
    }
    if len(w.config.AvailabilityZone) > 0 {
        return // already loaded
    }

    checkin, err := bootstrap.Checkin(w.config.ControlPlaneAuthPolicy == meshconfig.AuthenticationPolicy_MUTUAL_TLS,
        w.config.DiscoveryAddress, w.config.ServiceCluster, w.role.ServiceNode(), delay, retries)
    if err != nil {
        // TODO: turn back on when fully implemented, commented out to avoid confusing users
        // log.Errorf("Failed to connect to pilot. Fallback to starting with defaults and no AZ %v", err)
        // TODO: should we exit ? Envoy is unlikely to start without pilot.
    } else {
        w.config.AvailabilityZone = checkin.AvailabilityZone
        w.Reload()
    }
}

从这份源代码中主要可以看出 在证书变更之后,Istio执行了那些逻辑。

 类似资料: