Log-Pilot是阿里开源的一款容器日志收集项目,他的基本流程是动态监听容器的事件变化,然后依据容器的标签来进行解析,然后生成filebeat/fluentd
的配置文件交由filebeat/fluentd
采集插件来进行日志采集,来达到日志收集随容器的动态调度而自动伸缩的效果。
docker
容器事件并获取容器日志挂载目录、标签、环境变量等信息,动态生成filebeat/fluentd
配置文件// Pilot entry point
type Pilot struct {
piloter Piloter // Piloter 相关
mutex sync.Mutex // 并发锁,多个容器事件触发,谁先抢到锁,谁先处理
templ *template.Template // 日志采集客户端配置文件模板,log-pilot 使用 golang 的 text/template 模块渲染配置文件
client *k8s.Client // docker 容器客户端,通过docker 事件接口 api 获取相关容器信息
lastReload time.Time // 最后一次配置文件重载时间
reloadChan chan bool // 重载通知 chan
stopChan chan bool // 停止通知 chan
baseDir string // docker 在宿主机上面的数据存储位置
logPrefix []string // 定义环境变量以什么字符开头来表示应用日志所在目录,log-pilot 以配置环境变量的方式配置定每个容器中应用程序的日志路径位置,默认为aliyun
createSymlink bool // 是否创建硬连接的方式关联要搜集的日志文件
}
log-pilot
支持filebeat
和fluentd
两种日志收集器。 Piloter定义了收集器需要操作的一些方法,主要负责收集工具的启用、停止、重载的具体操作。type Piloter interface {
Name() string // "filebeat" 与 "fluentd",分别表示不同的搜集工具
Start() error // 启动搜集工具
Reload() error // 重载配置文件
Stop() error // 停止搜集工具
GetBaseConf() string // 日志采集客户端配置文件位置,如 filebeat 的 /etc/filebeat
GetConfHome() string // 日志采集客户端统一配置文件目录,如 filebeat 的 prospectors.d 位置
GetConfPath(container string) string // 具体配置文件路径
OnDestroyEvent(container string) error // 监听容器停止事件
}
pilot.Run
]( # Pilot.Run) 启动程序func main() {
// 指定配置模板
template := flag.String("template", "", "Template filepath for fluentd or filebeat.")
// 指定docker日志所在目录
base := flag.String("base", "", "Directory which mount host root.")
// 指定收集日志等级
level := flag.String("log-level", "INFO", "Log level")
// ....
// 读取模板文件
b, err := ioutil.ReadFile(*template)
// 启动程序入口
log.Fatal(pilot.Run(string(b), baseDir))
}
pilot.New
](# pilot.New) 初始化 Pilot 数据,Polit 中包含了对应 filebeat/fluentd
配置模版、dokcer client
、并发锁、piloter
对象等pilot.watch
](# pilot.watch) 开启容器事件监控func Run(templ string, baseDir string) error {
// 初始化Pilot数据
p, err := New(templ, baseDir)
if err != nil {
panic(err)
}
// 开启监听
return p.watch()
}
func New(tplStr string, baseDir string) (*Pilot, error) {
// 解析模板文件
templ, err := template.New("pilot").Parse(tplStr)
// ...
// 创建Docker容器客户端
client, err := k8s.NewEnvClient()
// 创建Piloter接口
piloter, err := NewPiloter(baseDir)
// ...
// 从环境变量中获取日志路径位置,默认aliyun
logPrefix := []string{"aliyun"}
if os.Getenv(ENV_PILOT_LOG_PREFIX) != "" {
envLogPrefix := os.Getenv(ENV_PILOT_LOG_PREFIX)
logPrefix = strings.Split(envLogPrefix, ",")
}
// 从环境变量中获取是否创建硬连接的方式关联要搜集的日志文件
createSymlink := os.Getenv(ENV_PILOT_CREATE_SYMLINK) == "true"
return &Pilot{
client: client,
templ: templ,
baseDir: baseDir,
reloadChan: make(chan bool),
stopChan: make(chan bool),
piloter: piloter,
logPrefix: logPrefix,
createSymlink: createSymlink,
}, nil
}
pilot.processEvent
监听 docker 事件func (p *Pilot) watch() error {
// ....
// 启动收集工具
err := p.piloter.Start()
// ....
// 接受 docker 事件,返回 chan
msgs, errs := p.client.Events(ctx, options)
go func() {
// ....
// 无限循环获取事件
for {
select {
case msg := <-msgs:
// 处理 docker 事件
if err := p.processEvent(msg); err != nil {
log.Errorf("fail to process event: %v, %v", msg, err)
}
// ....
}
}()
// ....
}
pilot.newContainer
生成配置文件pilot.delContainer
删除配置文件func (p *Pilot) processEvent(msg events.Message) error {
// 获取容器id
containerId := msg.Actor.ID
// 上下文
ctx := context.Background()
// ....
// 判断事件
switch msg.Action {
case "start", "restart":
// ....
// 返回容器信息
containerJSON, err := p.client.ContainerInspect(ctx, containerId)
return p.newContainer(&containerJSON)
case "destroy", "die":
// ....
err := p.delContainer(containerId)
return nil
}
pilot.getLogConfigs
pilot.render
func (p *Pilot) newContainer(containerJSON *types.ContainerJSON) error {
// ....
// 创建容器对象
container := container(containerJSON)
for _, e := range env { // 处理环境变量, env由containerJSON 得到
// .....
}
// 获取配置文件模板数据,属性由ContainerJSON提供
logConfigs, err := p.getLogConfigs(jsonLogPath, mounts, labels)
if err != nil {
return err
}
// ....
// 关联 docker 容器中应用日志文件或目录
p.createVolumeSymlink(containerJSON)
// 渲染配置文件模板数据,生成具体的配置文件
logConfig, err := p.render(id, container, logConfigs)
if err != nil {
return err
}
// 保存配置文件
if err = ioutil.WriteFile(p.piloter.GetConfPath(id), []byte(logConfig), os.FileMode(0644)); err != nil {
return err
}
// 重载配置文件
p.tryReload()
return nil
}
func (p *Pilot) delContainer(id string) error {
// 移除关联docker 容器中应用日志文件或目录
p.removeVolumeSymlink(id)
if p.piloter.Name() == PILOT_FLUENTD {
clean := func() {
// ....
// 删除
if err := os.Remove(p.piloter.GetConfPath(id)); err != nil {
// ...
return
}
// 重载配置文件
p.tryReload()
}
// 15分钟后执行clean
time.AfterFunc(15*time.Minute, clean)
return nil
}
return p.piloter.OnDestroyEvent(id)
}
type LogConfig struct {
Name string // 日志名
HostDir string // 日志文件在宿主机上的目录
ContainerDir string // 容器应用日志目录
Format string // 日志采集应用的格式(none, json, csv等)
FormatConfig map[string]string
File string // 具体的日志文件名
Tags map[string]string // 标签数据
Target string // 自定义输出目标,可以是索引或kafka主题等
EstimateTime bool
Stdout bool
CustomFields map[string]string // 自定义添加日志字段
CustomConfigs map[string]string // 自定义配置文件项
}
pilot.parseLogConfig
func (p *Pilot) getLogConfigs(jsonLogPath string, mounts []types.MountPoint, labels map[string]string) ([]*LogConfig, error) {
var ret []*LogConfig
// 获取容器的挂载目录
mountsMap := make(map[string]types.MountPoint)
for _, mount := range mounts {
mountsMap[mount.Destination] = mount
}
var labelNames []string
// 取出label并排序
for k := range labels {
labelNames = append(labelNames, k)
}
sort.Strings(labelNames)
// 通过label获取配置的日志所在目录
root := newLogInfoNode("")
for _, k := range labelNames {
for _, prefix := range p.logPrefix {
// ....
}
}
// 接续容器数据获得配置文件模板渲染数据
for name, node := range root.children {
logConfig, err := p.parseLogConfig(name, node, jsonLogPath, mountsMap)
// ....
ret = append(ret, logConfig)
}
return ret, nil
}
func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath string, mounts map[string]types.MountPoint) (*LogConfig, error) {
// 获取日志目录
path := strings.TrimSpace(info.value)
// 获取标签数据
tags := info.get("tags")
tagMap, err := p.parseTags(tags)
// 获取索引或者kafka主题
target := info.get("target")
// 添加默认索引或主题
if _, ok := tagMap["index"]; !ok {
// ....
}
if _, ok := tagMap["topic"]; !ok {
// ....
}
// 检查是否有效
if err := p.tryCheckKafkaTopic(tagMap["topic"]); err != nil {
return nil, err
}
// 日志格式化
format := info.children["format"]
if format == nil || format.value == "none" {
format = newLogInfoNode("nonex")
}
formatConfig, err := Convert(format)
//特殊处理路径中通配符 regex
if format.value == "regexp" {
format.value = fmt.Sprintf("/%s/", formatConfig["pattern"])
delete(formatConfig, "pattern")
}
// 如果是stdout, 采集容器的标准输出日志
if path == "stdout" {
logFile := filepath.Base(jsonLogPath)
if p.piloter.Name() == PILOT_FILEBEAT {
logFile = logFile + "*"
}
return &LogConfig{
Name: name,
HostDir: filepath.Join(p.baseDir, filepath.Dir(jsonLogPath)),
File: logFile,
Format: format.value,
Tags: tagMap,
FormatConfig: map[string]string{"time_format": "%Y-%m-%dT%H:%M:%S.%NZ"},
Target: target,
EstimateTime: false,
Stdout: true,
}, nil
}
// 输出到容器内部的具体文件日志路径
containerDir := filepath.Dir(path)
file := filepath.Base(path)
hostDir := p.hostDirOf(containerDir, mounts)
cfg := &LogConfig{
Name: name,
ContainerDir: containerDir,
Format: format.value,
File: file,
Tags: tagMap,
HostDir: filepath.Join(p.baseDir, hostDir),
FormatConfig: formatConfig,
Target: target,
}
return cfg, nil
}