当前位置: 首页 > 工具软件 > scrape > 使用案例 >

Prometheus源码系列:指标采集(scrapeManager)

祁兴运
2023-12-01
  • 指标采集(scrapeManager)简介

  从上篇文章:Prometheus源码系列:服务发现 (serviceDiscover),我们已经知道,为了从服务发现(serviceDiscover)实时获取监控服务(targets),指标采集(scrapeManager)通过协程把管道(chan)获取来的服务(targets)存进一个map类型:map[string][]*targetgroup.Group.其中,map的key是job_name,map的value是结构体targetgroup.Group,该结构体包含该job_name对应的Targets,Labels和Source.

  指标采集(scrapeManager)获取服务(targets)的变动,可分为多种情况,以服务增加为例,若有新的job添加,指标采集(scrapeManager)会进行重载,为新的job创建一个scrapePool,并为job中的每个target创建一个scrapeLoop.若job没有变动,只增加了job下对应的targets,则只需创建新的targets对应的scrapeLoop.

  为本文分析的代码都基于版本 v2.7.1,会通过dlv输出多个参数的示例,所用的配置文件:Prometheus.yml配置文件示例

  • 指标采集(scrapeManager)实时获取监控服务

  指标采集(scrapeManager)获取实时监控服务(targets)的入口函数:scrapeManager.Run(discoveryManagerScrape.SyncCh()):

prometheus/cmd/prometheus/main.go

// Scrape manager.
g.Add(
	func() error {
		// When the scrape manager receives a new targets list
		// it needs to read a valid config for each job.
		// It depends on the config being in sync with the discovery manager so
		// we wait until the config is fully loaded.
		<-reloadReady.C

		err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
		level.Info(logger).Log("msg", "Scrape manager stopped")
		return err
	},
	func(err error) {
		// Scrape manager needs to be stopped before closing the local TSDB
		// so that it doesn't try to write samples to a closed storage.
		level.Info(logger).Log("msg", "Stopping scrape manager...")
		scrapeManager.Stop()
	},
)


// ts即map[string][]*targetgroup.Group
(dlv) p ts["prometheus"]
[]*github.com/prometheus/prometheus/discovery/targetgroup.Group len: 1, cap: 1, [
	*{
		Targets: []github.com/prometheus/common/model.LabelSet len: 1, cap: 1, [
			[...],
		],
		Labels: github.com/prometheus/common/model.LabelSet nil,
		Source: "0",},
]

  其中包含两个部分:scrapeManager的初始化和起一个协程监控服务(targets)的变化

  1. scrapeManager的初始化,调用NewManager方法实现:

prometheus/cmd/prometheus/main.go

//fanoutStorage是监控的存储的抽象
scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)

  (1) NewManager方法了实例化结构体Manager:

prometheus/scrape/manager.go

// NewManager is the Manager constructor
func NewManager(logger log.Logger, app Appendable) *Manager {
	if logger == nil {
		logger = log.NewNopLogger()
	}
	return &Manager{
		append:        app,
		logger:        logger,
		scrapeConfigs: make(map[string]*config.ScrapeConfig),
		scrapePools:   make(map[string]*scrapePool),
		graceShut:     make(chan struct{}),
		triggerReload: make(chan struct{}, 1),
	}
}

  (2) 结构体Manager维护map类型的scrapePools和targetSets,两者key都是job_name,但scrapePools的value对应结构体scrapepool,而targetSets的value对应的结构体是Group,分别给出了两者的示例输出

prometheus/scrape/manager.go

// Manager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups form the discovery manager.
type Manager struct {
	logger    log.Logger  //系统日志
	append    Appendable  //存储监控指标
	graceShut chan struct{}  //退出

	mtxScrape     sync.Mutex // Guards the fields below. 读写锁
	scrapeConfigs map[string]*config.ScrapeConfig  //prometheus.yml的srape_config配置部分,key对应job_name,value对应job_name的配置参数
	scrapePools   map[string]*scrapePool  //key对应job_name,value对应结构体scrapePool,包含该job_name下所有的targets
	targetSets    map[string][]*targetgroup.Group  //key对应job_name,value对应结构体Group,包含job_name对应的Targets,Labels和Source

	triggerReload chan struct{} //若有新的服务(targets)通过服务发现(serviceDisvoer)传过来,会向该管道传值,触发加载配置文件操作,后面会讲到
}


基于job_name:node的targetSets的示例输出:
(dlv) p m.targetSets["node"]
[]*github.com/prometheus/prometheus/discovery/targetgroup.Group len: 1, cap: 1, [
	*{
		Targets: []github.com/prometheus/common/model.LabelSet len: 1, cap: 1, [
	                 [
		                   "__address__": "localhost:9100", 
	                 ],
		],
		Labels: github.com/prometheus/common/model.LabelSet nil,
		Source: "0",},
]

基于job_name:node的scrapePools示例输出:
(dlv) p m.scrapePools
map[string]*github.com/prometheus/prometheus/scrape.scrapePool [
	"node": *{
		appendable: github.com/prometheus/prometheus/scrape.Appendable(*github.com/prometheus/prometheus/storage.fanout) ...,
		logger: github.com/go-kit/kit/log.Logger(*github.com/go-kit/kit/log.context) ...,
		mtx: (*sync.RWMutex)(0xc001be0020),
		config: *(*"github.com/prometheus/prometheus/config.ScrapeConfig")(0xc00048ab40),
		client: *(*"net/http.Client")(0xc000d303c0),
		activeTargets: map[uint64]*github.com/prometheus/prometheus/scrape.Target [],
		droppedTargets: []*github.com/prometheus/prometheus/scrape.Target len: 0, cap: 0, nil,
		loops: map[uint64]github.com/prometheus/prometheus/scrape.loop [],
		cancel: context.WithCancel.func1,
		newLoop: github.com/prometheus/prometheus/scrape.newScrapePool.func2,}, 
]

  2.在前面已经多次提到,指标采集(scrapeManager)在main.go启动时,会起一个协程运行Run方法,从服务发现(serviceDiscover)实时获取被监控服务(targets),接下来看下Run方法的具体实现

prometheus/scrape/manager.go

// Run receives and saves target set updates and triggers the scraping loops reloading.
// Reloading happens in the background so that it doesn't block receiving targets updates.
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
     //定时(5s)更新服务(targets),结合triggerReload一起使用,即每5s判断一次triggerReload是否更新.
	go m.reloader() 
	for {
		select {
        //通过管道获取被监控的服务(targets)
		case ts := <-tsets:  
			m.updateTsets(ts)

			select {
       //若从服务发现 (serviceDiscover)有服务(targets)变动,则给管道triggerReload传值,并触发reloader()方法更新服务.
			case m.triggerReload <- struct{}{}: 
			default:
			}

		case <-m.graceShut:
			return nil
		}
	}
}

  以上流程还是比较清晰,若服务发现(serviceDiscovery)有服务(target)变动,Run方法就会向管道triggerReload注入值:m.triggerReload <- struct{}{}中,并起了一个协程,运行reloader方法.用于定时更新服务(targets).启动这个协程应该是为了防止阻塞从服务发现(serviceDiscover)获取变动的服务(targets)

  reloader方法启动了一个定时器,在无限循环中每5s判断一下管道triggerReload,若有值,则执行reload方法.

prometheus/scrape/manager.go

func (m *Manager) reloader() {
    //定时器5s
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-m.graceShut:
			return
       // 若服务发现(serviceDiscovery)有服务(targets)变动,就会向管道triggerReload写入值,定时器每5s判断一次triggerReload管道是否有值,若有值,则触发reload方法
		case <-ticker.C:
			select {
			case <-m.triggerReload:
				m.reload()
			case <-m.graceShut:
				return
			}
		}
	}
}

  reload方法会根据job_name比较targetSets,scrapePools和scrapeConfigs的一致性,并把每个job_name下的类型为[]*targetgroup.Group的groups通过协程传给sp.Sync方法,增加并发.

prometheus/scrape/manager.go

func (m *Manager) reload() {
	m.mtxScrape.Lock()
	var wg sync.WaitGroup
  //setName对应job_name,
  //group的结构体包含job_name对应的Targets,Labels和source,这个在上篇文章已经详细介绍
	for setName, groups := range m.targetSets {
		var sp *scrapePool
		existing, ok := m.scrapePools[setName]
     //若该job_name不在scrapePools中,分为两种情况处理
        //(1)job_name不在scrapeConfigs中,则报错
     //(2)job_name在scrapeConfigs中,则需要把该job_name加到scrapePools中
		if !ok {
			scrapeConfig, ok := m.scrapeConfigs[setName]
			if !ok {
				level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
				continue
			}
			sp = newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", setName))
			m.scrapePools[setName] = sp
		} else {
			sp = existing
		}

		wg.Add(1)
		// Run the sync in parallel as these take a while and at high load can't catch up.
		go func(sp *scrapePool, groups []*targetgroup.Group) {
       //把groups转换为targets类型
			sp.Sync(groups)
			wg.Done()
		}(sp, groups)

	}
	m.mtxScrape.Unlock()
	wg.Wait()
}

  sp.Sync方法引入了Target结构体,把[]*targetgroup.Group类型的groups转换为targets类型,其中每个groups对应一个job_name下多个targets.随后,调用sp.sync方法,同步scrape服务

prometheus/scrape/scrape.go

// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
	start := time.Now()

	var all []*Target
	sp.mtx.Lock()
	sp.droppedTargets = []*Target{}
	for _, tg := range tgs {
    //转换targetgroup.Group类型为Target
		targets, err := targetsFromGroup(tg, sp.config)
		if err != nil {
			level.Error(sp.logger).Log("msg", "creating targets failed", "err", err)
			continue
		}
        // 这里有个疑问,tg对应一个target,为什么返回回来的targets不是对应一个target相关参数,需要用for循环?
		for _, t := range targets {
            //判断Target的有效label是否大于0
			if t.Labels().Len() > 0 {
				all = append(all, t)
			} else if t.DiscoveredLabels().Len() > 0 {
                //若为无效Target,则加入scrapeLoop的droppedTargets中
				sp.droppedTargets = append(sp.droppedTargets, t)
			}
		}
	}
	sp.mtx.Unlock()
	sp.sync(all)

	targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
		time.Since(start).Seconds(),
	)
	targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
}

 Target结构体定义:

// Target refers to a singular HTTP or HTTPS endpoint.
type Target struct {
	// Labels before any processing.
	discoveredLabels labels.Labels
	// Any labels that are added to this target and its metrics.
	labels labels.Labels
	// Additional URL parmeters that are part of the target URL.
	params url.Values

	mtx                sync.RWMutex
	lastError          error
	lastScrape         time.Time
	lastScrapeDuration time.Duration
	health             TargetHealth
	metadata           metricMetadataStore
}

  sp.sync方法对比新的Target列表和原来的Target列表,若发现不在原来的Target列表中,则新建该targets的scrapeLoop,通过协程启动scrapeLoop的run方法,并发采集存储指标.然后判断原来的Target列表是否存在失效的Target,若存在,则移除

prometheus/scrape/scrape.go

// sync takes a list of potentially duplicated targets, deduplicates them, starts
// scrape loops for new targets, and stops scrape loops for disappeared targets.
// It returns after all stopped scrape loops terminated.
func (sp *scrapePool) sync(targets []*Target) {
	sp.mtx.Lock()
	defer sp.mtx.Unlock()

	var (
		uniqueTargets = map[uint64]struct{}{} 
		interval      = time.Duration(sp.config.ScrapeInterval) //指标采集周期
		timeout       = time.Duration(sp.config.ScrapeTimeout)  //指标采集超时时间
		limit         = int(sp.config.SampleLimit) //指标采集的限额
		honor         = sp.config.HonorLabels  //
		mrc           = sp.config.MetricRelabelConfigs
	)

	for _, t := range targets {
		t := t
		hash := t.hash()
		uniqueTargets[hash] = struct{}{}
     //若发现不在原来的Target列表中,则新建该target的scrapeLoop.
		if _, ok := sp.activeTargets[hash]; !ok {
			s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
			l := sp.newLoop(t, s, limit, honor, mrc)

			sp.activeTargets[hash] = t
			sp.loops[hash] = l
            //通过协程启动scrapeLoop的run方法,采集存储指标
			go l.run(interval, timeout, nil)
		} else {
			// Need to keep the most updated labels information
			// for displaying it in the Service Discovery web page.
			sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
		}
	}

	var wg sync.WaitGroup

	// Stop and remove old targets and scraper loops.
    //判断原来的Target列表是否存在失效的Target,若存在则移除
	for hash := range sp.activeTargets {
		if _, ok := uniqueTargets[hash]; !ok {
			wg.Add(1)
			go func(l loop) {

				l.stop()

				wg.Done()
			}(sp.loops[hash])

			delete(sp.loops, hash)
			delete(sp.activeTargets, hash)
		}
	}

	// Wait for all potentially stopped scrapers to terminate.
	// This covers the case of flapping targets. If the server is under high load, a new scraper
	// may be active and tries to insert. The old scraper that didn't terminate yet could still
	// be inserting a previous sample set.
	wg.Wait()
}

  sp.sync方法起了一个协程运行scrapePool的run方法去采集并存储监控指标(metrics),run方法实现如下:

prometheus/scrape/scrape.go

func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
	select {
    //检测超时
	case <-time.After(sl.scraper.offset(interval)):
		// Continue after a scraping offset.
  //停止,退出
	case <-sl.scrapeCtx.Done():
		close(sl.stopped)
		return
	}

	var last time.Time
  //设置定时器
	ticker := time.NewTicker(interval)
	defer ticker.Stop()

mainLoop:
	for {
		select {
		case <-sl.ctx.Done():
			close(sl.stopped)
			return
		case <-sl.scrapeCtx.Done():
			break mainLoop
		default:
		}

		var (
			start             = time.Now()
			scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout)
		)

		// Only record after the first scrape.
		if !last.IsZero() {
			targetIntervalLength.WithLabelValues(interval.String()).Observe(
				time.Since(last).Seconds(),
			)
		}
        //获取上次scrape(拉取)指标(metric)占用空间
		b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
       //根据上次的占用的空间申请存储空间
		buf := bytes.NewBuffer(b)

        //开始scrape(拉取)指标
		contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
		cancel()

		if scrapeErr == nil {
			b = buf.Bytes()
			// NOTE: There were issues with misbehaving clients in the past
			// that occasionally returned empty results. We don't want those
			// to falsely reset our buffer size.
			if len(b) > 0 {
         //存储本次scrape拉取磁盘占用的空间,留待下次scrape(拉取)使用
				sl.lastScrapeSize = len(b)
			}
		} else {
			level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())
			if errc != nil {
				errc <- scrapeErr
			}
		}

		// A failed scrape is the same as an empty scrape,
		// we still call sl.append to trigger stale markers.
        //存储指标
		total, added, appErr := sl.append(b, contentType, start)
		if appErr != nil {
			level.Warn(sl.l).Log("msg", "append failed", "err", appErr)
			// The append failed, probably due to a parse error or sample limit.
			// Call sl.append again with an empty scrape to trigger stale markers.
			if _, _, err := sl.append([]byte{}, "", start); err != nil {
				level.Warn(sl.l).Log("msg", "append failed", "err", err)
			}
		}

		sl.buffers.Put(b)

		if scrapeErr == nil {
			scrapeErr = appErr
		}

		if err := sl.report(start, time.Since(start), total, added, scrapeErr); err != nil {
			level.Warn(sl.l).Log("msg", "appending scrape report failed", "err", err)
		}
		last = start

        //停止scrapeLoop
		select {
		case <-sl.ctx.Done():
			close(sl.stopped)
			return
		case <-sl.scrapeCtx.Done():
			break mainLoop
		case <-ticker.C:
		}
	}

	close(sl.stopped)

	sl.endOfRunStaleness(last, ticker, interval)
}

  run方法主要实现两个功能:指标采集(scrape)和指标存储.此外,为了实现对象的复用,在采集(scrape)过程中,使用了sync.Pool机制提高性能,即每次采集(scrape)完成后,都会申请和本次采集(scrape)指标存储空间一样的大小的bytes,加入到buffer中,以备下次指标采集(scrape)直接使用.

  • 指标采集(scrapeManager)配置初始化和应用

  指标采集(scrapeManager)调用scrapeManager.ApplyConfig方法,完成配置初始化及应用,具体方法如下:

prometheus/scrape/manager.go

// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
func (m *Manager) ApplyConfig(cfg *config.Config) error {
   //操作前加锁
	m.mtxScrape.Lock()
   //完成后解锁
	defer m.mtxScrape.Unlock()ApplyConfig

  // 创建一个map,key是job_name,value是结构体config.ScrapeConfig
	c := make(map[string]*config.ScrapeConfig)
	for _, scfg := range cfg.ScrapeConfigs {
		c[scfg.JobName] = scfg
	}
	m.scrapeConfigs = c

    //首次启动不执行
	// Cleanup and reload pool if config has changed.
	for name, sp := range m.scrapePools {
        // 若job_name在scrapePools中,不在scrapeConfigs中,则说明已经更新,停止该job_name对应的scrapePool
		if cfg, ok := m.scrapeConfigs[name]; !ok {
			sp.stop()
			delete(m.scrapePools, name)
		} else if !reflect.DeepEqual(sp.config, cfg) {
            // 若job_name在scrapePools中,也在scrapeConfigs中,但配置有变化,比如target增加或减少,需要重新加载
			sp.reload(cfg)
		}
	}

	return nil
}

  调用reload方法重新加载配置文件:

prometheus/scrape/scrape.go

// reload the scrape pool with the given scrape configuration. The target state is preserved
// but all scrape loops are restarted with the new scrape configuration.
// This method returns after all scrape loops that were stopped have stopped scraping.
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
	start := time.Now()

    // 操作前加锁
	sp.mtx.Lock()
  // 完成后解锁
	defer sp.mtx.Unlock()

    // 生成client,用于获取指标(metircs)
	client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName)
	if err != nil {
		// Any errors that could occur here should be caught during config validation.
		level.Error(sp.logger).Log("msg", "Error creating HTTP client", "err", err)
	}
	sp.config = cfg
	sp.client = client

	var (
		wg       sync.WaitGroup
		interval = time.Duration(sp.config.ScrapeInterval)
		timeout  = time.Duration(sp.config.ScrapeTimeout)
		limit    = int(sp.config.SampleLimit)
		honor    = sp.config.HonorLabels
		mrc      = sp.config.MetricRelabelConfigs
	)

  // 停止该scrapePool下对应的所有的oldLoop,更具配置创建所有的newLoop,并通过协程启动.
	for fp, oldLoop := range sp.loops {
		var (
			t       = sp.activeTargets[fp]
			s       = &targetScraper{Target: t, client: sp.client, timeout: timeout}
			newLoop = sp.newLoop(t, s, limit, honor, mrc)
		)
		wg.Add(1)

		go func(oldLoop, newLoop loop) {
			oldLoop.stop()
			wg.Done()

			go newLoop.run(interval, timeout, nil)
		}(oldLoop, newLoop)

		sp.loops[fp] = newLoop
	}

	wg.Wait()
	targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
		time.Since(start).Seconds(),
	)
}

至此,指标采集(scrapeManager)功能分析结束

 类似资料: