当前位置: 首页 > 工具软件 > Telegraf > 使用案例 >

Telegraf插件引擎分析

谯德元
2023-12-01

性能数据采集方法很多,最近看到一款工具很有意思,工程实现上有一些值得借鉴的地方

Telegraf(https://github.com/influxdata/telegraf)

(1)全插件架构,从数据采集、处理、聚合、输出都支持独立插件模式,给开发带来很多便利。
(2)框架设计上面有很多值得借鉴的地方,比如发生采集超时,下一次采集是否应该调度,如果调度起来应该如何处理,如何避免啰嗦复杂的代码结构等。

1 Agent整体拉起顺序

启动顺序

telegraf.go:main()
  |
  telegraf_posix.go:run()
    |
    telegraf.go:reloadLoop()                                      // 参数reload
      \ (1)reload := make(chan bool, 1)
      \ (2)注册SIGHUP信号,触发后reload <- true,重新拉起runAgent
      \ (3)定义ctx = context.WithCancel(context.Background()),传入agent
      |
      telegraf.go:runAgent(ctx,..)                                // 拉起agent
        \ (1)config.LoadConfig 读取配置文件
        \ (2)agent.NewAgent 定义agent
        | 
        agent.go:run(ctx,..)                                      // 启动agent
          \ (1)a.initPlugins                                     // 初始化插件
          \ (2)next = a.startOutputs(ctx,..)
          \ (3)aggC = a.startProcessors(next,..)
          \ (4)a.startAggregators(aggC, next,..)
          \ (5)a.startInputs(next,..)                            // 初始化inputs插件,第二部分重点分析
          \  ... var wg sync.WaitGroup
          \ (9)go func() { a.runInputs(ctx, startTime, iu) ...}  // 启动inputs插件采集循环,第三部分重点分析

知识点:为什么用context?https://juejin.im/post/6844904070667321357

2 Agent::run()中的startInputs都做了什么?

简化一下代码,采集(input)的核心上层逻辑:

func (a *Agent) Run(ctx context.Context) error {
    ...
    iu, err := a.startInputs(next, a.Config.Inputs)  // <----这里重点分析
    ...
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        err := a.runInputs(ctx, startTime, iu)       
        if err != nil {
            log.Printf("E! [agent] Error running inputs: %v", err)
        }
    }()
    wg.Wait()
    log.Printf("D! [agent] Stopped Successfully")
    return err
}

startInputs()构造返回inputUnit,分两个部分dst与inputs

type inputUnit struct {
    dst    chan<- telegraf.Metric
    inputs []*models.RunningInput
}

第一部分dst:数据输出管道,类型是Telegraf内部定义的通用数据结构Metric

// Metric is the type of data that is processed by Telegraf.  Input plugins,
// and to a lesser degree, Processor and Aggregator plugins create new Metrics
// and Output plugins write them.
type Metric interface {
    // Name is the primary identifier for the Metric and corresponds to the
    // measurement in the InfluxDB data model.
    Name() string

    // Tags returns the tags as a map.  This method is deprecated, use TagList instead.
    Tags() map[string]string

    // TagList returns the tags as a slice ordered by the tag key in lexical
    // bytewise ascending order.  The returned value should not be modified,
    // use the AddTag or RemoveTag methods instead.
    TagList() []*Tag

    // Fields returns the fields as a map.  This method is deprecated, use FieldList instead.
    Fields() map[string]interface{}

    // FieldList returns the fields as a slice in an undefined order.  The
    // returned value should not be modified, use the AddField or RemoveField
    // methods instead.
    FieldList() []*Field

    // Time returns the timestamp of the metric.
    Time() time.Time

    // Type returns a general type for the entire metric that describes how you
    // might interpret, aggregate the values.
    //
    // This method may be removed in the future and its use is discouraged.
    Type() ValueType

    // SetName sets the metric name.
    SetName(name string)

    // AddPrefix adds a string to the front of the metric name.  It is
    // equivalent to m.SetName(prefix + m.Name()).
    //
    // This method is deprecated, use SetName instead.
    AddPrefix(prefix string)

    // AddSuffix appends a string to the back of the metric name.  It is
    // equivalent to m.SetName(m.Name() + suffix).
    //
    // This method is deprecated, use SetName instead.
    AddSuffix(suffix string)

    // GetTag returns the value of a tag and a boolean to indicate if it was set.
    GetTag(key string) (string, bool)

    // HasTag returns true if the tag is set on the Metric.
    HasTag(key string) bool

    // AddTag sets the tag on the Metric.  If the Metric already has the tag
    // set then the current value is replaced.
    AddTag(key, value string)

    // RemoveTag removes the tag if it is set.
    RemoveTag(key string)

    // GetField returns the value of a field and a boolean to indicate if it was set.
    GetField(key string) (interface{}, bool)

    // HasField returns true if the field is set on the Metric.
    HasField(key string) bool

    // AddField sets the field on the Metric.  If the Metric already has the field
    // set then the current value is replaced.
    AddField(key string, value interface{})

    // RemoveField removes the tag if it is set.
    RemoveField(key string)

    // SetTime sets the timestamp of the Metric.
    SetTime(t time.Time)

    // HashID returns an unique identifier for the series.
    HashID() uint64

    // Copy returns a deep copy of the Metric.
    Copy() Metric

    // Accept marks the metric as processed successfully and written to an
    // output.
    Accept()

    // Reject marks the metric as processed unsuccessfully.
    Reject()

    // Drop marks the metric as processed successfully without being written
    // to any output.
    Drop()

    // SetAggregate indicates the metric is an aggregated value.
    //
    // This method may be removed in the future and its use is discouraged.
    SetAggregate(bool)

    // IsAggregate returns true if the Metric is an aggregate.
    //
    // This method may be removed in the future and its use is discouraged.
    IsAggregate() bool
}

第二部分inputs:所有输入插件的list,其中一个cpu采集插件为例:

// RunningInput类型
type RunningInput struct {
    Input  telegraf.Input
    Config *InputConfig

    log         telegraf.Logger
    defaultTags map[string]string

    MetricsGathered selfstat.Stat
    GatherTime      selfstat.Stat
}

// Input类型 —— 接口
type Input interface {
    PluginDescriber
    // PluginDescriber定义两个函数
    // type PluginDescriber interface {
    //    // SampleConfig returns the default configuration of the Processor
    //    SampleConfig() string
    //    // Description returns a one-sentence description on the Processor
    //    Description() string
    // }
    // Gather takes in an accumulator and adds the metrics that the Input
    // gathers. This is called every "interval"
    Gather(Accumulator) error
}

// Input类型 —— 实体
type CPUStats struct {
    ps        system.PS
    lastStats map[string]cpu.TimesStat

    PerCPU         bool `toml:"percpu"`
    TotalCPU       bool `toml:"totalcpu"`
    CollectCPUTime bool `toml:"collect_cpu_time"`
    ReportActive   bool `toml:"report_active"`
}
func (s *CPUStats) Gather(acc telegraf.Accumulator) error { ... }
func (_ *CPUStats) SampleConfig() string { ... }
func (_ *CPUStats) Description() string { ... }


// 数据
0 = {*github.com/influxdata/telegraf/models.RunningInput | 0xc00021e320} 
    Input = {github.com/influxdata/telegraf.Input | *github.com/influxdata/telegraf/plugins/inputs/cpu.CPUStats} 
        ps = {github.com/influxdata/telegraf/plugins/inputs/system.PS | *github.com/influxdata/telegraf/plugins/inputs/system.SystemPS} 
        lastStats = {map[string]github.com/shirou/gopsutil/cpu.TimesStat} nil
        PerCPU = {bool} true
        TotalCPU = {bool} true
        CollectCPUTime = {bool} false
        ReportActive = {bool} false
    Config = {*github.com/influxdata/telegraf/models.InputConfig | 0xc0004b24e0} 
        Name = {string} "cpu"
        Alias = {string} ""
        Interval = {time.Duration} 0 0s
        CollectionJitter = {time.Duration} 0 0s
        Precision = {time.Duration} 0 0s
        NameOverride = {string} ""
        MeasurementPrefix = {string} ""
        MeasurementSuffix = {string} ""
        Tags = {map[string]string} 
        Filter = {github.com/influxdata/telegraf/models.Filter} 
    log = {github.com/influxdata/telegraf.Logger | *github.com/influxdata/telegraf/models.Logger}  
    defaultTags = {map[string]string} 
    MetricsGathered = {github.com/influxdata/telegraf/selfstat.Stat | *github.com/influxdata/telegraf/selfstat.stat} 
    GatherTime = {github.com/influxdata/telegraf/selfstat.Stat | *github.com/influxdata/telegraf/selfstat.timingStat} 

从返回数据iu来看,结果主要包含Input接口实现的cpu.CPUStats与Metric类型输出chan。
函数属于初始化的一部分,没有启线程的操作。

3 Agent::run()中的runInputs都做了什么?

知识点:为什么用sync.WaitGroup?
一直等到所有的goroutine执行完成,并且阻塞主线程的执行,直到所有的goroutine执行完成。
A WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished.

继续看Run()部分的代码

func (a *Agent) Run(ctx context.Context) error {
    ...
    iu, err := a.startInputs(next, a.Config.Inputs)  
    ...
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        err := a.runInputs(ctx, startTime, iu)       // <---- 这里重点分析
        if err != nil {
            log.Printf("E! [agent] Error running inputs: %v", err)
        }
    }()
    wg.Wait()
    log.Printf("D! [agent] Stopped Successfully")
    return err
}

启动goroutine并记录到sync.WaitGroup里面,用于Block主进程(主线程大部分时间会Block在这里,runInputs内部实现循环采集)

// runInputs starts and triggers the periodic gather for Inputs.
//
// When the context is done the timers are stopped and this function returns
// after all ongoing Gather calls complete.
func (a *Agent) runInputs(
    ctx context.Context,
    startTime time.Time,
    unit *inputUnit,
) error {
    var wg sync.WaitGroup
    // (1)获取采集间隔10秒:golang.org/x/oauth2.expiryDelta (10 000 000 000)
    // (2)创建Ticker
    // (3)创建Accumulator
    wg.Add(1)
    go func(input *models.RunningInput) {
            defer wg.Done()
            a.gatherLoop(ctx, acc, input, ticker, interval)
        }(input)
}

进入采集循环gatherLoop,使用ticker触发采集逻辑

// gather runs an input's gather function periodically until the context is
// done.
func (a *Agent) gatherLoop(
    ctx context.Context,
    acc telegraf.Accumulator,
    input *models.RunningInput,
    ticker Ticker,
    interval time.Duration,
) {
    defer panicRecover(input)

    for {
        select {
        case <-ticker.Elapsed():
            err := a.gatherOnce(acc, input, ticker, interval)
            if err != nil {
                acc.AddError(err)
            }
        case <-ctx.Done():
            return
        }
    }
}

**注意这里

// gatherOnce runs the input's Gather function once, logging a warning each
// interval it fails to complete before.
func (a *Agent) gatherOnce(
    acc telegraf.Accumulator,
    input *models.RunningInput,
    ticker Ticker,
    interval time.Duration,
) error {
    done := make(chan error)
    go func() {
        done <- input.Gather(acc)
    }()

    // Only warn after interval seconds, even if the interval is started late.
    // Intervals can start late if the previous interval went over or due to
    // clock changes.
    slowWarning := time.NewTicker(interval)
    defer slowWarning.Stop()

    for {
        select {
        case err := <-done:
            return err
        case <-slowWarning.C:
            log.Printf("W! [%s] Collection took longer than expected; not complete after interval of %s",
                input.LogName(), interval)
        case <-ticker.Elapsed():
            log.Printf("D! [%s] Previous collection has not completed; scheduled collection skipped",
                input.LogName())
        }
    }
}

**
答案:
首先在采集函数中异步拉起采集函数,然后定义超时Ticker,同时捕获 正常采集完成信号、超时信号、下一次调度信号:

if 收到:
    正常采集完成信号:
        返回
    超时信号:
        警告并继续等待
    下一次调度信号:
        警告并继续等待

这样做的好处是避免了上一层增加复杂的处理逻辑来判断是否超时,代码简单。

4 Agent模型启动了多少goroutine?

总结一下单个插件的inputs会拉起1+2+1个goroutine
其中一个用于采集SIGUP,两个用于并发启动其他插件,最后一个用于处理采集超时问题。

telegraf.go:main()
  |
  telegraf_posix.go:run()
    |
    telegraf.go:reloadLoop()                                  ----goroutine----监控SIGUP----当前进程继续
      \ ctx = context.WithCancel(context.Background())
      |
      telegraf.go:runAgent(ctx,..)
        |
        agent.go:run(ctx,..)                           
          \  ... var wg sync.WaitGroup
          \ go func() { a.runInputs(ctx, startTime, iu) ...}  ----goroutine----启动采集函数----当前进程Block
          |
          agent.go:runInputs(ctx,startTime,unit)
            \ ... var wg sync.WaitGroup
            \ go func() { gatherLoop(...) }                   ----goroutine----启动采集循环----当前进程Block
            |
            agent.go:gatherLoop(ctx,...)
              \ case <-ticker.Elapsed() 触发gatherOnce
              |
              agent.go:gatherOnce(acc,...) 
                \ go func() { done <- input.Gather(acc) }()   ----goroutine----启动采集----当前进程等待done、超时、下一次调度三种信号
                \ for { select { case done / slowWarning / ticker.Elapsed() ... } }

5 性能指标需要计算差值怎么办?

有很多性能指标采集出来是不断增长的,例如MYSQL的lock_times等等,这类指标需要当前采集值减去上一次采集值才有意义,在telegraf中是这样处理的:

插件自己定义变量保存上一次采集结果:

type CPUStats struct {
    ....
    lastStats map[string]cpu.TimesStat
    ...
}

func (s *CPUStats) Gather(acc telegraf.Accumulator) error {
    ...
    s.lastStats = make(map[string]cpu.TimesStat)
    for _, cts := range times {
        s.lastStats[cts.CPU] = cts
    }

    return err
}

6 多插件数据如何标准化?

还是以CPU插件为例:

func (s *CPUStats) Gather(acc telegraf.Accumulator) error {
...
    for _, cts := range times {
    ...
            fieldsG := map[string]interface{}{
            "usage_user":       100 * (cts.User - lastCts.User - (cts.Guest - lastCts.Guest)) / totalDelta,
            "usage_system":     100 * (cts.System - lastCts.System) / totalDelta,
            "usage_idle":       100 * (cts.Idle - lastCts.Idle) / totalDelta,
            "usage_nice":       100 * (cts.Nice - lastCts.Nice - (cts.GuestNice - lastCts.GuestNice)) / totalDelta,
            "usage_iowait":     100 * (cts.Iowait - lastCts.Iowait) / totalDelta,
            "usage_irq":        100 * (cts.Irq - lastCts.Irq) / totalDelta,
            "usage_softirq":    100 * (cts.Softirq - lastCts.Softirq) / totalDelta,
            "usage_steal":      100 * (cts.Steal - lastCts.Steal) / totalDelta,
            "usage_guest":      100 * (cts.Guest - lastCts.Guest) / totalDelta,
            "usage_guest_nice": 100 * (cts.GuestNice - lastCts.GuestNice) / totalDelta,
        }
        if s.ReportActive {
            fieldsG["usage_active"] = 100 * (active - lastActive) / totalDelta
        }
        acc.AddGauge("cpu", fieldsG, tags, now)
    } 

经过一系列处理把数据格式放入metric
第一步acc.AddGauge(“cpu”, fieldsG, tags, now)

func (ac *accumulator) AddGauge(
    measurement string,
    fields map[string]interface{},
    tags map[string]string,
    t ...time.Time,
) {
    ac.addFields(measurement, tags, fields, telegraf.Gauge, t...)
}


第二步addFields

func (ac *accumulator) addFields(
    measurement string,
    tags map[string]string,
    fields map[string]interface{},
    tp telegraf.ValueType,
    t ...time.Time,
) {
    m, err := metric.New(measurement, tags, fields, ac.getTime(t), tp)
    if err != nil {
        return
    }
    if m := ac.maker.MakeMetric(m); m != nil {
        ac.metrics <- m
    }
}

其中

  • measurement = "cpu"
  • tags = {"cpu", "cpu3"}
  • fields = {usage_user -> 0.7641921397345768, usage_system -> 0.6550218340606909, usage_nice -> 0, ...}

第三步metric.New(measurement, tags, fields, ac.getTime(t), tp)

m = {github.com/influxdata/telegraf.Metric | *github.com/influxdata/telegraf/metric.metric} 
  name = {string} "cpu"
  tags = {[]*github.com/influxdata/telegraf.Tag} len:1, cap:1
    0 = {*github.com/influxdata/telegraf.Tag | 0xc0006ccc60} 
  fields = {[]*github.com/influxdata/telegraf.Field} len:10, cap:10
    0 = {*github.com/influxdata/telegraf.Field | 0xc00028e040} 
      Key = {string} "usage_idle"
      Value = {interface {} | float64} 98.58078602615508
    1 = {*github.com/influxdata/telegraf.Field | 0xc00028e060} 
      Key = {string} "usage_irq"
      Value = {interface {} | float64} 0
    2 = {*github.com/influxdata/telegraf.Field | 0xc00028e080} 
    3 = {*github.com/influxdata/telegraf.Field | 0xc00028e0a0} 
    4 = {*github.com/influxdata/telegraf.Field | 0xc00028e0c0} 
    5 = {*github.com/influxdata/telegraf.Field | 0xc00028e0e0} 
    6 = {*github.com/influxdata/telegraf.Field | 0xc00028e100} 
    7 = {*github.com/influxdata/telegraf.Field | 0xc00028e120} 
    8 = {*github.com/influxdata/telegraf.Field | 0xc00028e140} 
    9 = {*github.com/influxdata/telegraf.Field | 0xc00028e160} 
  tm = {time.Time} 
    wall = {uint64} 0
    ext = {int64} 63738083030
    loc = {*time.Location | 0x87a0800} 
  tp = {github.com/influxdata/telegraf.ValueType} Gauge (2)
  aggregate = {bool} false

第四步ac.metrics <- m

func (ac *accumulator) addFields(
    measurement string,
    tags map[string]string,
    fields map[string]interface{},
    tp telegraf.ValueType,
    t ...time.Time,
) {
    m, err := metric.New(measurement, tags, fields, ac.getTime(t), tp)
    if err != nil {
        return
    }
    if m := ac.maker.MakeMetric(m); m != nil {
        ac.metrics <- m
    }
}

最终插件的Gather函数填充acc,这里的metrics就是第四步ac.metrics <- m的结果。

acc = {github.com/influxdata/telegraf.Accumulator | *github.com/influxdata/telegraf/agent.accumulator} 
  maker = {github.com/influxdata/telegraf/agent.MetricMaker | *github.com/influxdata/telegraf/models.RunningInput} 
  metrics = {chan<- github.com/influxdata/telegraf.Metric} 
  precision = {time.Duration} github.com/couchbase/go-couchbase.initialRetryInterval (1000000000)
 类似资料: