fluentd 是一个实时的数据收集系统,不仅可以收集日志,还可以收集定期执行的命令输出和HTTP 请求内容。
数据被收集后按照用户配置的解析规则,形成一个个 event,event 格式如下:
tag = xxx
time = xxx
record = {
"key1": "value1",
"key2": "value2"
}
其中:
source 定义数据源,是 fluentd 的输入端,流入 fluentd 的配置都是在 source 中的,一个 fluentd 中可以有多个数据源,因此,一个 fluentd 中可以有多个 source 。
一个 source 由一个输入插件和插件的配置组成,也就意味着,一个 source 中只能有一种类型的输入。
输入插件有很多,具体的可以去官网查看,很详细。链接如下:
在这里,只是总结一下,我自己使用的插件:
该插件是以“单消费者”模式订阅 kafka 消息。
单消费者模式:每个 kafka 输入插件独立地订阅 kafka 消息。
很简单,但有缺陷,因此目前大多以 “消费组模式”订阅。
单消费者模式缺陷如下(网上抄的,实际情况如何不清楚):
配置如下:
<source>
# 插件类型 kafka
@type kafka
# 逗号分隔的 broker 列表,每个 broker 需要指定 ip 和端口
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
# 逗号分隔的 topic 列表
topics <listening topics(separate with comma',')>
# 输入消息的格式,有 text、json、ltsv、msgpack 等几种,默认 json
format <input text type (text|json|ltsv|msgpack)> :default => json
# tag 增加前缀
add_prefix <tag prefix (Optional)>
# tag 增加后缀
add_suffix <tag suffix (Optional)>
</source>
其中:
以上是最简单的配置,同时也是我用的配置。想要查看更复杂的配置,请移步:
GitHub - fluent/fluent-plugin-kafka: Kafka input and output plugin for Fluentd
GitHub - zendesk/ruby-kafka: A Ruby client library for Apache Kafka
插件以“消费者组”模式订阅 kafka 消息。消费者组模式解决了单消费者模式存在的几个缺点,可以同时启动多个 Fluentd 进程协同工作。
配置如下:
<source>
# 插件类型 kafka_group
@type kafka_group
# 逗号分隔的 broker 列表,每个 broker 需要指定 ip 和端口
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
# 设定消费者组名称,必须设置
consumer_group <consumer group name, must set>
# 逗号分隔的 topic 列表
topics <listening topics(separate with comma',')>
# 输入消息的格式,有 text、json、ltsv、msgpack 等几种,默认 json
format <input text type (text|json|ltsv|msgpack)> :default => json
# 如果为 true,添加 kafka 的消息头到记录中
add_headers <If true, add kafka's message headers to record>
# tag 增加前缀
add_prefix <tag prefix (Optional)>
# tag 增加后缀
add_suffix <tag suffix (Optional)>
username USERNAME
password PASSWORD
sasl_over_ssl false
ssl_ca_certs_from_system false
get_kafka_client_log false
</source>
match 定义数据的输出目标,match 指令通过匹配 tag 字段来将事件输出到其他的系统。
同样 match 指令也必须指定 @type 参数,该参数用来指定使用哪个输出插件。
<match app.**>
# 插件类型 kafka2
@type kafka2
# 逗号分隔的 broker 列表,每个 broker 需要指定 ip 和端口
brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
# 默认 topic,若未设置 topic_key,则 topic 取此处的值
default_topic (string) :default => nil
# 设置输出消息格式,支持 json、ltsv、msgpack或其他输出插件,默认是 json
<format>
@type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
</format>
<buffer topic>
flush_interval 10s
</buffer>
username USERNAME
password PASSWORD
sasl_over_ssl false
ssl_ca_certs_from_system false
get_kafka_client_log false
</match>
default_topic:将要输出到 kafka 中的 topic 名字
buffer: 缓存配置,一般只配 flush_interval ,代表间隔一定时间去输出一次。
该插件是用于将日志输出到 hdfs 中的。
HDFS (Hadoop)是存储和处理大量数据的。
既然要输出到 hdfs 中,所以首先要安装一些软件或插件:
输出目的地将是WebHDFS。输出配置应该如下所示:
<match hdfs.*.*>
@type webhdfs
host namenode.your.cluster.local
port 50070
path "/log/%Y%m%d_%H/access.log.#{Socket.gethostname}"
<buffer>
flush_interval 10s
</buffer>
</match>
除了配置 fluentd 的配置外,还需要对 hdfs 的配置做一些修改,将以下配置添加到 hdfs-site.xml 文件中,然后重新启动整个群集:
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.support.append</name>
<value>true</value>
</property>
<property>
<name>dfs.support.broken.append</name>
<value>true</value>
</property>
同时要确认 hdfs 用户对指定为网络文件系统输出的路径具有写权限。
这个类型的插件,适用于 debug 时使用的。
在使用其他输出插件时,如果在目的接收端收不到日志,或收到的日志数据不准确,可以先将日志输出到 stdout 进行查看。
它的配置很简单:
<match pattern>
@type stdout
</match>
其中 pattern 要替换成需要匹配 tag 的正则表达式,如果想要匹配全部 tag,pattern 替换成 *.*
当然,如果debug,还有一种方法,在任意一个 插件类型的下方,添加一个 @log_level debug ,控制台就会输出 debug 级别的日志,否则,默认只输出 info 级别的日志。
match 匹配到第一个 match,就直接输出了,不会再继续匹配下一个 match,如果需要将日志同时输出到两个地方,就需要用输出插件中的 copy 搞搞。
具体配置如下:
<match pattern>
@type copy
<store>
@type file
path /var/log/fluent/myapp1
...
</store>
<store>
...
</store>
<store>
...
</store>
</match>
在 fluentd 中有以下几个类型: source、 parser、filter、output 四种。
parse 和 filter 用于解析和过滤,在我的这次项目中没有用到。
在 fluentd 中,数据的流向如下:
source -> parser -> filter -> output
从 source 数据源进来,流过所有的 parser 和 filter ,最后优先匹配到一个 output 输出出去。