当前位置: 首页 > 工具软件 > log-pilot > 使用案例 >

Log-Pilot 源码简析

赵高雅
2023-12-01

简单介绍

Log-Pilot是阿里开源的一款容器日志收集项目,他的基本流程是动态监听容器的事件变化,然后依据容器的标签来进行解析,然后生成filebeat/fluentd的配置文件交由filebeat/fluentd采集插件来进行日志采集,来达到日志收集随容器的动态调度而自动伸缩的效果。

源码简析

Pilot结构体

  • 主要监听 docker 容器事件并获取容器日志挂载目录、标签、环境变量等信息,动态生成filebeat/fluentd配置文件
// Pilot entry point
type Pilot struct {
    piloter       Piloter               // Piloter 相关
    mutex         sync.Mutex            // 并发锁,多个容器事件触发,谁先抢到锁,谁先处理
    templ         *template.Template    // 日志采集客户端配置文件模板,log-pilot 使用 golang 的 text/template 模块渲染配置文件
    client        *k8s.Client           // docker 容器客户端,通过docker 事件接口 api 获取相关容器信息
    lastReload    time.Time             // 最后一次配置文件重载时间
    reloadChan    chan bool             // 重载通知 chan
    stopChan      chan bool             // 停止通知 chan
    baseDir       string                // docker 在宿主机上面的数据存储位置
    logPrefix     []string              // 定义环境变量以什么字符开头来表示应用日志所在目录,log-pilot 以配置环境变量的方式配置定每个容器中应用程序的日志路径位置,默认为aliyun
    createSymlink bool                  // 是否创建硬连接的方式关联要搜集的日志文件
}

Piloter接口

  • log-pilot支持filebeatfluentd两种日志收集器。 Piloter定义了收集器需要操作的一些方法,主要负责收集工具的启用停止重载的具体操作。
type Piloter interface {
    Name() string       // "filebeat" 与 "fluentd",分别表示不同的搜集工具
 
    Start() error       // 启动搜集工具
    Reload() error      // 重载配置文件
    Stop() error        // 停止搜集工具
 
    GetBaseConf() string    // 日志采集客户端配置文件位置,如 filebeat 的 /etc/filebeat
    GetConfHome() string    // 日志采集客户端统一配置文件目录,如 filebeat 的 prospectors.d 位置
    GetConfPath(container string) string    // 具体配置文件路径
 
    OnDestroyEvent(container string) error  // 监听容器停止事件
}

main函数

  • 程序的入口、命令行处理:指定日志收集配置模板、收集日志等级、Docker日志路径等。
  • [pilot.Run]( # Pilot.Run) 启动程序
func main() {
	// 指定配置模板
	template := flag.String("template", "", "Template filepath for fluentd or filebeat.")
    // 指定docker日志所在目录
	base := flag.String("base", "", "Directory which mount host root.")
	// 指定收集日志等级
	level := flag.String("log-level", "INFO", "Log level")
    
    // ....
    
    // 读取模板文件
    b, err := ioutil.ReadFile(*template)
    // 启动程序入口
	log.Fatal(pilot.Run(string(b), baseDir))
}

Pilot.Run

  • [pilot.New](# pilot.New) 初始化 Pilot 数据,Polit 中包含了对应 filebeat/fluentd 配置模版、dokcer client、并发锁、piloter 对象等
  • [pilot.watch](# pilot.watch) 开启容器事件监控
func Run(templ string, baseDir string) error {
    // 初始化Pilot数据
	p, err := New(templ, baseDir)
	if err != nil {
		panic(err)
	}
    // 开启监听
	return p.watch()
}

Pilot.New

  • 初始化Pilot数据
func New(tplStr string, baseDir string) (*Pilot, error) {
    // 解析模板文件
	templ, err := template.New("pilot").Parse(tplStr)
	// ...
	
    // 创建Docker容器客户端 
	client, err := k8s.NewEnvClient()
	// 创建Piloter接口
	piloter, err := NewPiloter(baseDir)
	
	// ...
    
    // 从环境变量中获取日志路径位置,默认aliyun
	logPrefix := []string{"aliyun"}
	if os.Getenv(ENV_PILOT_LOG_PREFIX) != "" {
		envLogPrefix := os.Getenv(ENV_PILOT_LOG_PREFIX)
		logPrefix = strings.Split(envLogPrefix, ",")
	}
	// 从环境变量中获取是否创建硬连接的方式关联要搜集的日志文件
	createSymlink := os.Getenv(ENV_PILOT_CREATE_SYMLINK) == "true"
	return &Pilot{
		client:        client,
		templ:         templ,
		baseDir:       baseDir,
		reloadChan:    make(chan bool),
		stopChan:      make(chan bool),
		piloter:       piloter,
		logPrefix:     logPrefix,
		createSymlink: createSymlink,
	}, nil
}

Pilot.watch

func (p *Pilot) watch() error {
    // ....
    // 启动收集工具
    err := p.piloter.Start()           
    // ....
    
    // 接受 docker 事件,返回 chan
    msgs, errs := p.client.Events(ctx, options)  
 
    go func() {
        // ....
        // 无限循环获取事件
        for {           
            select {
            case msg := <-msgs:
                // 处理 docker 事件
                if err := p.processEvent(msg); err != nil {     
                    log.Errorf("fail to process event: %v,  %v", msg, err)
                }
            // ....
        }
    }()
    // ....
}

Pilot.processEvent

func (p *Pilot) processEvent(msg events.Message) error {
	// 获取容器id
	containerId := msg.Actor.ID
    // 上下文
    ctx := context.Background()
	// ....
    // 判断事件
	switch msg.Action {
	case "start", "restart":
            // ....
        	// 返回容器信息
        	containerJSON, err := p.client.ContainerInspect(ctx, containerId)
            return p.newContainer(&containerJSON)
	case "destroy", "die":
            // ....
	    err := p.delContainer(containerId)
	return nil
}

Pilot.newContainer

func (p *Pilot) newContainer(containerJSON *types.ContainerJSON) error {
    // .... 
 	// 创建容器对象
    container := container(containerJSON)
 
    for _, e := range env {         // 处理环境变量, env由containerJSON 得到
        // .....
    }
    // 获取配置文件模板数据,属性由ContainerJSON提供
    logConfigs, err := p.getLogConfigs(jsonLogPath, mounts, labels)
    if err != nil {
        return err
    }
 
    // ....
 
    // 关联 docker 容器中应用日志文件或目录
    p.createVolumeSymlink(containerJSON)
 
    // 渲染配置文件模板数据,生成具体的配置文件
    logConfig, err := p.render(id, container, logConfigs)
    if err != nil {
        return err
    }
    // 保存配置文件
    if err = ioutil.WriteFile(p.piloter.GetConfPath(id), []byte(logConfig), os.FileMode(0644)); err != nil {
        return err
    }
    // 重载配置文件
    p.tryReload()
    return nil
}

Pilot.delContainer

  • 删除配置文件
  • reload 配置文件
func (p *Pilot) delContainer(id string) error {
    // 移除关联docker 容器中应用日志文件或目录
	p.removeVolumeSymlink(id)
	
	if p.piloter.Name() == PILOT_FLUENTD {
		clean := func() {
			// ....
             // 删除
			if err := os.Remove(p.piloter.GetConfPath(id)); err != nil {
				// ...
				return
			}
             // 重载配置文件
			p.tryReload()
		}
        // 15分钟后执行clean
		time.AfterFunc(15*time.Minute, clean)
		return nil
	}

	return p.piloter.OnDestroyEvent(id)
}

LogConfig

  • 动态渲染配置文件模板数据集
type LogConfig struct {
    Name         string                 // 日志名
    HostDir      string                 // 日志文件在宿主机上的目录
    ContainerDir string                 // 容器应用日志目录
    Format       string                 // 日志采集应用的格式(none, json, csv等)
    FormatConfig map[string]string     
    File         string                 // 具体的日志文件名
    Tags         map[string]string      // 标签数据
    Target       string                 // 自定义输出目标,可以是索引或kafka主题等
    EstimateTime bool
    Stdout       bool
 
    CustomFields  map[string]string     // 自定义添加日志字段
    CustomConfigs map[string]string     // 自定义配置文件项
}

pilot.getLogConfigs

func (p *Pilot) getLogConfigs(jsonLogPath string, mounts []types.MountPoint, labels map[string]string) ([]*LogConfig, error) {
	var ret []*LogConfig
	// 获取容器的挂载目录
	mountsMap := make(map[string]types.MountPoint)
	for _, mount := range mounts {
		mountsMap[mount.Destination] = mount
	}

	var labelNames []string
	// 取出label并排序
	for k := range labels {
		labelNames = append(labelNames, k)
	}
	sort.Strings(labelNames)
    
    
    // 通过label获取配置的日志所在目录
	root := newLogInfoNode("")
	for _, k := range labelNames {
		for _, prefix := range p.logPrefix {
			// ....
		}
	}
	// 接续容器数据获得配置文件模板渲染数据
	for name, node := range root.children {
		logConfig, err := p.parseLogConfig(name, node, jsonLogPath, mountsMap)
		// ....
		ret = append(ret, logConfig)
	}
	return ret, nil
}

pilot.parseLogConfig

  • 接续容器数据获得配置文件模板渲染数据
func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath string, mounts map[string]types.MountPoint) (*LogConfig, error) {
    // 获取日志目录
	path := strings.TrimSpace(info.value)
	 
	// 获取标签数据
	tags := info.get("tags")
	tagMap, err := p.parseTags(tags)
	
    // 获取索引或者kafka主题
	target := info.get("target")
	// 添加默认索引或主题
	if _, ok := tagMap["index"]; !ok {
		// ....
	}
	if _, ok := tagMap["topic"]; !ok {
		// ....
	}

	// 检查是否有效
	if err := p.tryCheckKafkaTopic(tagMap["topic"]); err != nil {
		return nil, err
	}
	
    // 日志格式化
	format := info.children["format"]
	if format == nil || format.value == "none" {
		format = newLogInfoNode("nonex")
	}
	formatConfig, err := Convert(format)

	//特殊处理路径中通配符 regex
	if format.value == "regexp" {
		format.value = fmt.Sprintf("/%s/", formatConfig["pattern"])
		delete(formatConfig, "pattern")
	}
	
    // 如果是stdout, 采集容器的标准输出日志
	if path == "stdout" {
		logFile := filepath.Base(jsonLogPath)
		if p.piloter.Name() == PILOT_FILEBEAT {
			logFile = logFile + "*"
		}

		return &LogConfig{
			Name:         name,
			HostDir:      filepath.Join(p.baseDir, filepath.Dir(jsonLogPath)),
			File:         logFile,
			Format:       format.value,
			Tags:         tagMap,
			FormatConfig: map[string]string{"time_format": "%Y-%m-%dT%H:%M:%S.%NZ"},
			Target:       target,
			EstimateTime: false,
			Stdout:       true,
		}, nil
	}
	
 	// 输出到容器内部的具体文件日志路径
	containerDir := filepath.Dir(path)
	file := filepath.Base(path)
	hostDir := p.hostDirOf(containerDir, mounts)

	cfg := &LogConfig{
		Name:         name,
		ContainerDir: containerDir,
		Format:       format.value,
		File:         file,
		Tags:         tagMap,
		HostDir:      filepath.Join(p.baseDir, hostDir),
		FormatConfig: formatConfig,
		Target:       target,
	}

	return cfg, nil
}

pilot.render

  • 渲染配置文件模板数据,生成具体的配置文件

参考资料:

log-pilot源码简析

容器日志采集利器Log-Pilot

 类似资料: