fluent-bit很好的与k8s融合,离不开本身的设计。fluent-bit采用配置插件的方式对输入、过滤、分析、输出进行实现,这使得与k8s集成时,只需要采用configmap(crd中使用的secret)就可以完成原有的功能。
在fluent-bit operator(接下来简称fbo)中并没有直接使用k8s原生配置资源,而是使用了自定义CRD,接下来让我们一一分析。
每个插件对应一种日志收集功能。
type InputSpec struct {
// 输入别名
Alias string json:"alias,omitempty"
// Dummy defines Dummy Input configuration.
Dummy *input.Dummy json:"dummy,omitempty"
// tail收集配置
Tail *input.Tail json:"tail,omitempty"
// Systemd收集配置
Systemd *input.Systemd json:"systemd,omitempty"
}
// 监控文本文件
type Tail struct {
// 读取文件缓冲大小
BufferChunkSize string `json:"bufferChunkSize,omitempty"`
// 缓冲最大
BufferMaxSize string `json:"bufferMaxSize,omitempty"`
// 日志路径 比如/var/log/containers/*.log
Path string `json:"path,omitempty"`
// 排除文件
// e.g: exclude_path=*.gz,*.zip
ExcludePath string `json:"excludePath,omitempty"`
// 刷新监视文件列表的时间间隔,以秒为单位。
RefreshIntervalSeconds *int64 `json:"refreshIntervalSeconds,omitempty"`
RotateWaitSeconds *int64 `json:"rotateWaitSeconds,omitempty"`
// 根据日志时间检索,仅当Parser能够解析出time后使用
IgnoreOlder string `json:"ignoredOlder,omitempty"`
// 监视文件缓存已满后是否结束
SkipLongLines *bool `json:"skipLongLines,omitempty"`
// 指定数据库文件以跟踪监控的文件和偏移量。 比如/tail/pos.db
DB string `json:"db,omitempty"`
// tail使用内存量,如果达到峰值则暂停等恢复
MemBufLimit string `json:"memBufLimit,omitempty"`
// 指定解析器 例如 docker
Parser string `json:"parser,omitempty"`
// 未应用解析器时,为记录指定key值
Key string `json:"key,omitempty"`
// 指定日志记录的tag
// E.g. kube.<namespace_name>.<pod_name>.<container_name>
Tag string `json:"tag,omitempty"`
}
对input传来的日志记录进行解析格式化。
// The regex parser plugin
type Regex struct {
// 正则表达式
Regex string `json:"regex,omitempty"`
// Time_Key
TimeKey string `json:"timeKey,omitempty"`
// Time_Format, eg. %Y-%m-%dT%H:%M:%S %z
TimeFormat string `json:"timeFormat,omitempty"`
// Time_Keep
TimeKeep *bool `json:"timeKeep,omitempty"`
//eg. "code:integer size:integer"
Types string `json:"types,omitempty"`
}
过滤日志的功能
具体参数
type FilterSpec struct {
// 标签匹配规则 例如:kube.* 标识匹配所有标签是kube*的日志记录,kube标识k8s日志信息,在输入配置时配置。支持大小写与*
Match string `json:"match,omitempty"`
// 正则匹配,如果上面没满足就使用这个
MatchRegex string `json:"matchRegex,omitempty"`
// 过滤器插件
FilterItems []FilterItem `json:"filters,omitempty"`
}
type FilterItem struct {
// Kubernetes过滤器配置.
Kubernetes *filter.Kubernetes
}
type Kubernetes struct {
// 设置http缓冲区大小
// +kubebuilder:validation:Pattern:="^\\d+(k|K|KB|kb|m|M|MB|mb|g|G|GB|gb)?$"
BufferSize string `json:"bufferSize,omitempty"`
// apiserver url
KubeURL string `json:"kubeURL,omitempty"`
// CA 证书文件
KubeCAFile string `json:"kubeCAFile,omitempty"`
// 扫描证书绝对路径
KubeCAPath string `json:"kubeCAPath,omitempty"`
// 令牌文件
KubeTokenFile string `json:"kubeTokenFile,omitempty"`
// 当源记录来自 Tail 输入插件时,
// 此选项允许指定 Tail 配置中使用的前缀。 比如 kube
KubeTagPrefix string `json:"kubeTagPrefix,omitempty"`
// 启用时,它会检查日志字段内容是否为 JSON 字符串映射,
// 如果是这样,它将映射字段作为日志结构的一部分附加。
MergeLog *bool `json:"mergeLog,omitempty"`
// When Merge_Log is enabled, the filter tries to assume the log field from the incoming message is a JSON string message
// and make a structured representation of it at the same level of the log field in the map.
// Now if Merge_Log_Key is set (a string name), all the new structured fields taken from the original log content are inserted under the new key.
MergeLogKey string `json:"mergeLogKey,omitempty"`
// When Merge_Log is enabled, trim (remove possible \n or \r) field values.
MergeLogTrim *bool `json:"mergeLogTrim,omitempty"`
// Optional parser name to specify how to parse the data contained in the log key. Recommended use is for developers or testing only.
MergeParser string `json:"mergeParser,omitempty"`
// When Keep_Log is disabled, the log field is removed
// from the incoming message once it has been successfully merged
// (Merge_Log must be enabled as well).
KeepLog *bool `json:"keepLog,omitempty"`
// Debug level between 0 (nothing) and 4 (every detail).
TLSDebug *int32 `json:"tlsDebug,omitempty"`
// When enabled, turns on certificate validation when connecting to the Kubernetes API server.
TLSVerify *bool `json:"tlsVerify,omitempty"`
// When enabled, the filter reads logs coming in Journald format.
UseJournal *bool `json:"useJournal,omitempty"`
// 设置备用解析器来处理记录标签并提取 pod_name、namespace_name、container_name 和 docker_id。
// 解析器必须注册在解析器文件中(以解析器filter-kube-test为例)。
RegexParser string `json:"regexParser,omitempty"`
// Allow Kubernetes Pods to suggest a pre-defined Parser
// (read more about it in Kubernetes Annotations section)
K8SLoggingParser *bool `json:"k8sLoggingParser,omitempty"`
// Allow Kubernetes Pods to exclude their logs from the log processor
// (read more about it in Kubernetes Annotations section).
K8SLoggingExclude *bool `json:"k8sLoggingExclude,omitempty"`
// 在额外的元数据中包含 Kubernetes 资源标签。
Labels *bool `json:"labels,omitempty"`
// 在额外的元数据中包含 Kubernetes 资源注释。
Annotations *bool `json:"annotations,omitempty"`
// 如果设置,Kubernetes 元数据可以从该目录下 JSON 格式的文件中缓存/预加载,
KubeMetaPreloadCacheDir string `json:"kubeMetaPreloadCacheDir,omitempty"`
}
将日志记录输出给指定的存储介质,比如发送给els
type Elasticsearch struct {
// IP地址 支持k8sservice格式访问
Host string `json:"host,omitempty"`
// 端口
Port *int32 `json:"port,omitempty"`
// 请求路径
Path string `json:"path,omitempty"`
// 缓存空间
BufferSize string `json:"bufferSize,omitempty"`
// If you are using Elastic's Elasticsearch Service you can specify the cloud_id of the cluster running.
CloudID string `json:"cloudID,omitempty"`
// Specify the credentials to use to connect to Elastic's Elasticsearch Service running on Elastic Cloud.
CloudAuth string `json:"cloudAuth,omitempty"`
// Optional username credential for Elastic X-Pack access
HTTPUser *plugins.Secret `json:"httpUser,omitempty"`
// Password for user defined in HTTP_User
HTTPPasswd *plugins.Secret `json:"httpPassword,omitempty"`
// Index name
Index string `json:"index,omitempty"`
// Type name
Type string `json:"type,omitempty"`
// 启用 Logstash 格式兼容性。
LogstashFormat *bool `json:"logstashFormat,omitempty"`
// 启用 Logstash_Format 时,索引名称由前缀和日期组成,
// 例如:如果 Logstash_Prefix 等于“mydata”,您的索引将变 为“mydata-YYYY.MM.DD”。
// 最后附加的字符串属于生成数据的日期。
LogstashPrefix string `json:"logstashPrefix,omitempty"`
// 时间格式(基于 strftime)生成索引名称的第二部分。
LogstashDateFormat string `json:"logstashDateFormat,omitempty"`
//启用 Logstash_Format 后,每条记录都会获得一个新的时间戳字段。
// Time_Key 属性定义该字段的名称。
TimeKey string `json:"timeKey,omitempty"`
// 启用 Logstash_Format 时,此属性定义时间戳的格式。
TimeKeyFormat string `json:"timeKeyFormat,omitempty"`
// 启用后,它会将标签名称附加到记录中。
IncludeTagKey *bool `json:"includeTagKey,omitempty"`
// 启用 Include_Tag_Key 时,此属性定义标签的键名
TagKey string `json:"tagKey,omitempty"`
// 启用时,将 elasticsearch API 调用打印到标准输出(仅适用于诊断)
TraceOutput *bool `json:"traceOutput,omitempty"`
`json:"currentTimeIndex,omitempty"`
// 使用此字符串为键添加前缀
LogstashPrefixKey string
}
部署fbo-Controller与fbo-Watcher(类似于deployment)。
// FluentBitSpec defines the desired state of FluentBit
type FluentBitSpec struct {
// Fluent Bit 镜像.
Image string `json:"image,omitempty"`
// Fluent Bit Watcher command line arguments.
Args []string `json:"args,omitempty"`
// 镜像拉去策略
ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
// 镜像拉去证书
ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
// 数据库,如果使用tail需要指定
PositionDB corev1.VolumeSource `json:"positionDB,omitempty"`
// 当前容器日志地址
ContainerLogRealPath string `json:"containerLogRealPath,omitempty"`
// 当前容器使用资源
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
// 节点学则器
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
// 关联的fluentbit-config资源的名称
FluentBitConfigName string `json:"fluentBitConfigName,omitempty"`
}
用于FluentBit系统配置
在FluentBitConfig中,程序会根据InputSelector,FilterSelector,OutputSelector,ParserSelector获取相对应的crd资源,然后根据crd资源创建Secret。
fluentbit通过watcher对本地的input,filter,output,parser进行监听,如果其中有变动,会结束fluentbit程序,然后进行重启来达到热部署工作。
// FluentBitConfigSpec defines the desired state of FluentBitConfig
type FluentBitConfigSpec struct {
// 定义了一些全局行为
Service *Service `json:"service,omitempty"`
// input标签选择器
InputSelector metav1.LabelSelector `json:"inputSelector,omitempty"`
// filter标签选择器
FilterSelector metav1.LabelSelector `json:"filterSelector,omitempty"`
// output标签选择器
OutputSelector metav1.LabelSelector `json:"outputSelector,omitempty"`
// parser标签选择器
ParserSelector metav1.LabelSelector `json:"parserSelector,omitempty"`
}
type Service struct {
// 如果 true 在开始时转到后台
Daemon *bool `json:"daemon,omitempty"`
// 刷新输出的间隔
FlushSeconds *int64 `json:"flushSeconds,omitempty"`
// 记录诊断输出的文件
LogFile string `json:"logFile,omitempty"`
// 输出等级 (error/warning/info/debug/trace)
LogLevel string `json:"logLevel,omitempty"`
// parsers文件,相当于内置的parser插件
ParsersFile string `json:"parsersFile,omitempty"`
}