优秀文章的搬运工: http://just4fun.im/2018/05/25/study_alertmanager/
我们先从应用的角度来看详细的介绍一下alertmanager以下简称am,以下是官方文档介绍。
The Alertmanager handles alerts sent by client applications such as the Prometheus server. It takes care of deduplicating, grouping, and routing them to the correct receiver integrations such as email, PagerDuty, or OpsGenie. It also takes care of silencing and inhibition of alerts.
翻译一下就是,负责处理接受client(例如prometheus)发送的告警消息,包括重复告警的发送、聚合、发给相关人员,并且支持多种方式例如email或者pagerduty这种第三方通知告警平台,同时他还提供了静音以及告警抑制的功能。
这些功能基本涵盖了目前各大公司的告警痛点,重复告警(告警发生了但是一直也没人处理)、告警风暴(某次版本上线导致的大量服务机器指标异常)、告警信息重复(例如机器宕机之后又收到了网络不通的告警)。
这里注意下,prometheus族包括am他们的实现思路都是基于label来做的,后面会从代码层面详细介绍下
这里贴一下AM的配置文档,文档链接,简单介绍一下,AM里面需要配置的包括email配置,如果使用了第三方告警平台的话也需要在里面配置好。下面就几个关键的配置进行以下说明。
该项定义了各个告警接收人,例如我要发给bob,那么bob的邮件地址是什么,如果不在这里定义好receiver的话,那么后续在匹配到相关告警之后查无此人就没有办法发送出来了,注意receiver里面的配置是一个数组。每个元素是一个接收人。
对于告警抑制的配置实现思路非常值得借鉴,我们知道告警抑制的发生一定是至少有两个告警发生。例如A告警发生了那么B告警再来了之后如果是被A抑制的那么就不要发送了,因此一条告警规则的定义首先是有一个source字段表示如果有这些label匹配的告警信息达到了那么后续可能就需要抑制了,同时还有一个target字段来表示A告警会抑制那些告警,只有这两个字段肯定是不够的,还是拿之前的例子举例,如果A宕机了,那么我肯定只抑制A的网络故障告警,我是不能把B、C的网络故障也抑制掉的,所以最后还有一个equal字段用来全面指定,对于如下一条规则翻译一下就是,当有主机宕机的告警A类发生时,那么就将告警级别为warning并且node字段与A类相等的告警B类告警进行抑制,这里注意一下,就是配置里面额alertname、serverity、node这些label字段都是在告警信息当中携带的,如果有不明白的等下会从源码来解释,am的实现非常精妙。
inhibit_rules:
- source_match:
alertname: NodeDown
severity: critical
target_match:
severity: warning
equal:
- node
最后这里介绍一下route字段的配置他控制了告警的聚合以及发送频率,route字段本身是一个树状的,每一个节点是一个配置,配置包括了接收人、match字段以及相应的告警发送配置例如:首次聚合时间、告警后续发送频率。对于到达的告警会收心进行match,如果有一个节点match并且该节点的continue字段为true,那么会继续递归遍历它的子节点,直到到达最后一个,这样就把匹配到同一个节点的告警经过group_by字段的分类,放置在不同的簇里面,这样就完成了告警的聚合功能。
这里有一点需要着重指出的一点是,对于每个簇,有三个字段影响了告警发送的频率,group_wait、group_interval、repeat_interval。
group_wait:当告警A第一次到达之后由于之前并没有告警簇,此时会进行创建,创建完之后会等待group_wait时间之后才会进行发送,这是为什么呢?这其实是为了解决告警风暴的问题,例如当服务集群a发生了告警,例如有10条,如果他们在group_wait这段时间内相继到达,那么最终他们就会被合并成一条告警进行发送。而不是收到10次告警信息。
group_interval:控制的是遍历告警簇的时间间隔,am当中当有新的告警到达时(之前没有进行过发生的告警)会进行告警簇的发送或者当检测到上次告警发送时间距离当前时间已经大于repeat_interval那么此时会进行发送。
以上就是am当中比较关键以及难以理解的配置项,下面从代码层面来进行一下分析。
我们先来看一下对于alert数据结构的定义,其中Labels与Annotations他们两个的数据类型一样都是一个string map的数组,其中最为重要的数据结构是Labels这个字段,他里面存储了用来唯一标示一个告警的所有字段,换句话说当一个告警的Labels字段都一样的话那么这就是同一个告警。说到这里我们来看一下他的时间字段,包括StartsAt以及EndsAt,在am当中他的思想就是把告警当做有生命周期的事件来看待,而不是把每次告警当成孤立问题,这个思想必须要领会才能理解以上的配置,以及下面的代码分析。
例如,对于A机器的cpu告警,他在11:20的时候数值为91触发了告警,在11:25的时候达到了94再次触发了告警,那么这个告警信息在am看来他们就是一个告警,只不过告警的故障时间从11:2011:25变成了11:2011:30,从这种角度来看待告警会简化很多问题,同时把注意力放在关键的地方,例如labels字段。有了上述的设计以及考虑,因此Labels字段存储了唯一标示告警的字段,例如Host、env、metric、service等等(这些字段都是在请求到am的时候告警自带的),因此我们会把类似status、value这些变动的字段存放在Annotations字段。
在am当中用来识别告警是否解决是通过比较EndsAt以及当前时间来进行的,如果字段小于当前时间,那么告警就被表示为解决。
type Alert struct {
Labels LabelSet // []map[string]string
Annotations LabelSet // []map[string]string
StartsAt time.Time
EndsAt time.Time
GeneratorURL string
UpdatedAt time.Time
Timeout bool
}
在请求am的接口时,StartsAt以及EndsAt字段都可以不指定,如果不指定的话StartsAt就会被赋予now当前时间,而EndsAt则会加上一段时间
alert.UpdatedAt = now
// Ensure StartsAt is set.
if alert.StartsAt.IsZero() {
if alert.EndsAt.IsZero() {
alert.StartsAt = now
} else {
alert.StartsAt = alert.EndsAt
}
}
// If no end time is defined, set a timeout after which an alert
// is marked resolved if it is not updated.
if alert.EndsAt.IsZero() {
alert.Timeout = true
alert.EndsAt = now.Add(resolveTimeout)
}
am的接口为/api/v1/alerts,注意使用的是post方法传递的是alert的数组,并且还有一点就是,当利用python encode json的时候时间字段需要特殊处理一下加上后缀+0800 CST,否则会丢失时区信息在am当中变成UTC时间。
以上就是am当中最重要的数据结构的介绍,下面介绍下他的主要模块以及实现。
api模块顾名思义就是对外提供api接口的,我们通过请求的所有接口信息都是通过该模块提供的,它包括添加告警、添加silence、以及获取当前告警信息、查看告警聚合效果等等等,我们通过添加的告警会被api模块插入provider模块。
该模块主要是存储当前的告警信息,所有api通过http接受的合法告警都会进入provider模块,在provider当中定义的alerts interface有一个订阅的方法Subscribe,其他所有的模块都通过这个方法来进行告警的获取
func (a *Alerts) Subscribe() provider.AlertIterator {
var (
ch = make(chan *types.Alert, 200)
done = make(chan struct{})
)
alerts, err := a.getPending()
a.mtx.Lock()
i := a.next
a.next++
a.listeners[i] = ch
a.mtx.Unlock()
go func() {
defer func() {
a.mtx.Lock()
delete(a.listeners, i)
close(ch)
a.mtx.Unlock()
}()
for _, a := range alerts {
select {
case ch <- a:
case <-done:
return
}
}
<-done
}()
return provider.NewAlertIterator(ch, done, err)
}
//通过调用Subscribe方法来添加订阅,获取一个管道
func NewAlertIterator(ch <-chan *types.Alert, done chan struct{}, err error) AlertIterator {
return &alertIterator{
ch: ch,
done: done,
err: err,
}
}
// alertIterator implements AlertIterator. So far, this one fits all providers.
type alertIterator struct {
ch <-chan *types.Alert
done chan struct{}
err error
}
func (ai alertIterator) Next() <-chan *types.Alert {
return ai.ch
}
// 之后通过注册订阅得到的Next方法进行监听
//最后我们可以看到所有放入alerts的告警都会依次放入各个调用过Subscribe的订阅管道
func (a *Alerts) Put(alerts ...*types.Alert) error {
a.mtx.Lock()
defer a.mtx.Unlock()
for _, alert := range alerts {
fp := alert.Fingerprint()
if old, ok := a.alerts[fp]; ok {
// Merge alerts if there is an overlap in activity range.
if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) ||
(alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) {
alert = old.Merge(alert)
}
}
a.alerts[fp] = alert
for _, ch := range a.listeners {
ch <- alert
}
}
return nil
}
通过以上这种方式就完成了告警信息的分发工作。
type Inhibitor struct {
alerts provider.Alerts
rules []*InhibitRule
marker types.Marker
logger log.Logger
mtx sync.RWMutex
cancel func()
}
以上是告警抑制模块的数据结构,rules当中存储的就是我们在配置文件当中写入的抑制规则,marker当中是存储的Alert的状态(是否被抑制)。
func (ih *Inhibitor) run(ctx context.Context) {
it := ih.alerts.Subscribe()
defer it.Close()
for {
select {
case <-ctx.Done():
return
case a := <-it.Next():
if err := it.Err(); err != nil {
level.Error(ih.logger).Log("msg", "Error iterating alerts", "err", err)
continue
}
// Update the inhibition rules' cache.
for _, r := range ih.rules {
//当有source匹配到了的时候就会把告警存入匹配的告警规则
if r.SourceMatchers.Match(a.Labels) {
r.set(a)
}
}
}
}
}
//之后提供mutes方法来检测告警是否被抑制
func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
fp := lset.Fingerprint()
for _, r := range ih.rules {
// Only inhibit if target matchers match but source matchers don't.
if inhibitedByFP, eq := r.hasEqual(lset); !r.SourceMatchers.Match(lset) && r.TargetMatchers.Match(lset) && eq {
ih.marker.SetInhibited(fp, inhibitedByFP.String())
return true
}
}
ih.marker.SetInhibited(fp)
return false
}
以上就是抑制模块的实现逻辑,类似的还有silence模块,silence实现起来比较简单因为它提供的功能就是当设置的某几个字段匹配之后就直接静音了,同时它提供了一个失效时间的功能。因为silence不能一直在的,与inhibit不一样的是silence是通过api模块来添加的,这里就不在介绍了。
接下来就是告警聚合功能的实现,这里还是先介绍一下数据结构route
type Route struct {
parent *Route
// The configuration parameters for matches of this route.
RouteOpts RouteOpts
// Equality or regex matchers an alert has to fulfill to match
// this route.
Matchers types.Matchers
// If true, an alert matches further routes on the same level.
Continue bool
// Children routes of this route.
Routes []*Route
}
我们可以看到route是一个树状的结构同时他提供了深度优先遍历的算法实现,后续在进行告警簇的创建时会用到的,通过如下方法我们就找到了一个告警应该放置到哪个告警簇当中。
// Match does a depth-first left-to-right search through the route tree
// and returns the matching routing nodes.
func (r *Route) Match(lset model.LabelSet) []*Route {
if !r.Matchers.Match(lset) {
return nil
}
var all []*Route
for _, cr := range r.Routes {
matches := cr.Match(lset)
all = append(all, matches...)
if matches != nil && !cr.Continue {
break
}
}
// If no child nodes were matches, the current node itself is a match.
if len(all) == 0 {
all = append(all, r)
}
return all
}
接下来就是dispatch分发模块的实现代码,他的数据结构如下:
type Dispatcher struct {
route *Route
alerts provider.Alerts
stage notify.Stage
marker types.Marker
timeout func(time.Duration) time.Duration
aggrGroups map[*Route]map[model.Fingerprint]*aggrGroup//关键
mtx sync.RWMutex
done chan struct{}
ctx context.Context
cancel func()
logger log.Logger
}
//里面最重要的就是aggrGroups这个数据结构,通过看他的定义我们可以看出来在告警到达之后会首先通过route进行一次分类,之后再使用定义的group_by字段来进行最后一次分组,最终放置在了aggrGroup这个数据结构中,下面我们看一下aggrGroup的定义
type aggrGroup struct {
labels model.LabelSet
opts *RouteOpts
logger log.Logger
routeKey string
ctx context.Context
cancel func()
done chan struct{}
next *time.Timer
timeout func(time.Duration) time.Duration
mtx sync.RWMutex
alerts map[model.Fingerprint]*types.Alert //这里放置了簇里面的告警信息
hasFlushed bool
}
下面我们看一下分发功能是怎样实现的
func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
groupLabels := model.LabelSet{}
for ln, lv := range alert.Labels {
if _, ok := route.RouteOpts.GroupBy[ln]; ok {
groupLabels[ln] = lv
}
}
fp := groupLabels.Fingerprint()
//首先我们通过route的指针来检查一下数据中是否已经有该节点数据的存在,如果没有则进行创建
d.mtx.Lock()
group, ok := d.aggrGroups[route]
if !ok {
group = map[model.Fingerprint]*aggrGroup{}
d.aggrGroups[route] = group
}
d.mtx.Unlock()
//之后检查group_by字段的指纹是否存在如果不存在表示这个告警簇之前没有则调用newAggrGroup进行创建并初始化
// If the group does not exist, create it.
ag, ok := group[fp]
if !ok {
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
group[fp] = ag
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
level.Error(d.logger).Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err)
}
return err == nil
})
}
//将告警信息放入对应的告警簇,当告警放入相应的簇之后,在之后就按照我们上文提到的逻辑进行告警信息的发送。
ag.insert(alert)
}
以上就是am的主要功能模块的实现代码,下面还要额外提一下的一个是notify模块,这里面定义了告警的发送方式,如果有自定义需求的话最终是要在这里进行修改的。
am的高可用的实现逻辑是这样的,已prometheus的搭配使用来说明,例如我们又am1、am2两个实例,那么在prometheus当中我们就需要把这两个实例同时配置上,此时prometheus会把同一个告警发送给am1和am2两个实例。是不是觉得这样会有问题?我还没有说完,当两个am实例收到告警信息之后,他们都会走自己的流程,但是在最后的告警发送阶段,am实例之间会有一次信息同步的过程来避免重复发送,下面就简单介绍一下nflog模块和cluster模块。
nflog模块,我猜是notify log的简称,因为这个模块是用来记录告警发送历史记录的,他以高效的方式存储和编码告警的发送历史记录,例如那些告警的hash(int)发送过,上次告警发送时间。
cluster模块,这个模块就是集群功能的重点了,他首先是负责确保集群之间的通信正常,包括用可靠地方法检测集群是否都alive,同时还负责将一些必要信息来进行同步,例如nflog以及服务存储的silence信息。
最终通过确保所有实例在发送告警信息之前进行一次数据同步的流程来避免了告警重复发送的问题,同时实现了高可用。
prometheus真的是一个很棒的监控系统,他的许多理念真的是要超越上一代监控系统很多,首先对于告警信息的存储,它是基于label来实现,这样可以非常方便便捷的来实现各种组合查询。同时在告警阶段也沿用了label的理念,能够很高效和优雅的实现告警的聚合、抑制功能。
我想通过以上这些工作就能减少各个公司很大一部分告警量并且提升告警的优化效果了。但是后续能做的还是有很多,仍然值得深思