当前位置: 首页 > 工具软件 > Fluentd > 使用案例 >

fluentd

娄振
2023-12-01

介绍

fluentd 是一个实时的数据收集系统,不仅可以收集日志,还可以收集定期执行的命令输出和HTTP 请求内容。

数据被收集后按照用户配置的解析规则,形成一个个 event,event 格式如下:

tag = xxx
time = xxx
record = {
    "key1": "value1",
    "key2": "value2"
}

其中:

  • tag:为数据流的标记。  当 fluentd 中有数不清的 event 时,tag 可以用于分组处理。
    •   比如说 tag 为 water 的,需要其中的 record 添加一个 hostname 的 kv 值。
    •   又或者 tag 为 fire 的,不做任何处理,直接输出到文件中的。
  • time: event 产生的时间,该字段通常由日志内的时间字段解析出来。
  • record: 日志的内容,为 JSON 格式。

source

source 定义数据源,是 fluentd 的输入端,流入 fluentd 的配置都是在 source 中的,一个 fluentd 中可以有多个数据源,因此,一个 fluentd 中可以有多个 source 。

一个 source 由一个输入插件和插件的配置组成,也就意味着,一个 source 中只能有一种类型的输入。

输入插件

输入插件有很多,具体的可以去官网查看,很详细。链接如下:

Fluentd

在这里,只是总结一下,我自己使用的插件:

@type 'kafka'

该插件是以“单消费者”模式订阅 kafka 消息。

单消费者模式:每个 kafka 输入插件独立地订阅 kafka 消息。

很简单,但有缺陷,因此目前大多以 “消费组模式”订阅。

单消费者模式缺陷如下(网上抄的,实际情况如何不清楚):

  • 如果存在多个单消费者进程同时订阅相同的 topic,进程之间无法协调和分配不同的分区。
  • 如果多个消费者进程中的某个进程挂掉,其他进程无法从该进程原先订阅位置进行恢复。

配置如下:

<source>
  # 插件类型 kafka
  @type kafka
  
  # 逗号分隔的 broker 列表,每个 broker 需要指定 ip 和端口
  brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
  # 逗号分隔的 topic 列表
  topics <listening topics(separate with comma',')>
  # 输入消息的格式,有 text、json、ltsv、msgpack 等几种,默认 json
  format <input text type (text|json|ltsv|msgpack)> :default => json
  # tag 增加前缀
  add_prefix <tag prefix (Optional)>
  # tag 增加后缀
  add_suffix <tag suffix (Optional)>
</source>

其中:

  • topic: 填写的是想要消费 kafka 中 topic 的名字,可以同时消费多个 topic 。
  • tag: tag 的名字默认是 topic 的名字,如果想要修改 tag ,可以使用 add_prefix 和 add_suffix 在 tag 的前后添加字符串。 
    • 例如:当目标 topic 名称为 app_event 时,tag 为 app_event 。使用add_prefix kafka,tag 就是kafka.app_event。

以上是最简单的配置,同时也是我用的配置。想要查看更复杂的配置,请移步:

GitHub - fluent/fluent-plugin-kafka: Kafka input and output plugin for Fluentd

GitHub - zendesk/ruby-kafka: A Ruby client library for Apache Kafka

@type 'kafka_group'

插件以“消费者组”模式订阅 kafka 消息。消费者组模式解决了单消费者模式存在的几个缺点,可以同时启动多个 Fluentd 进程协同工作。

配置如下:

<source>
  # 插件类型 kafka_group
  @type kafka_group
  
  # 逗号分隔的 broker 列表,每个 broker 需要指定 ip 和端口
  brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
  # 设定消费者组名称,必须设置
  consumer_group <consumer group name, must set>
  # 逗号分隔的 topic 列表
  topics <listening topics(separate with comma',')>
  # 输入消息的格式,有 text、json、ltsv、msgpack 等几种,默认 json
  format <input text type (text|json|ltsv|msgpack)> :default => json
  # 如果为 true,添加 kafka 的消息头到记录中
  add_headers <If true, add kafka's message headers to record>
  # tag 增加前缀
  add_prefix <tag prefix (Optional)>
  # tag 增加后缀
  add_suffix <tag suffix (Optional)>

  username USERNAME
  password PASSWORD
  sasl_over_ssl false
  ssl_ca_certs_from_system false
  get_kafka_client_log false
</source>

match

match 定义数据的输出目标,match 指令通过匹配 tag 字段来将事件输出到其他的系统。

同样 match 指令也必须指定 @type 参数,该参数用来指定使用哪个输出插件。

@type kafka2

<match app.**>
  # 插件类型 kafka2
  @type kafka2

  # 逗号分隔的 broker 列表,每个 broker 需要指定 ip 和端口 
  brokers               <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
  # 默认 topic,若未设置 topic_key,则 topic 取此处的值
  default_topic         (string) :default => nil

  # 设置输出消息格式,支持 json、ltsv、msgpack或其他输出插件,默认是 json
  <format>
    @type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
  </format>

  <buffer topic>
    flush_interval 10s
  </buffer>

  username USERNAME
  password PASSWORD
  sasl_over_ssl false
  ssl_ca_certs_from_system false
  get_kafka_client_log false

</match>

default_topic:将要输出到 kafka 中的 topic 名字

buffer: 缓存配置,一般只配 flush_interval ,代表间隔一定时间去输出一次。

@type webhdfs

该插件是用于将日志输出到 hdfs 中的。

HDFS (Hadoop)是存储和处理大量数据的。

既然要输出到 hdfs 中,所以首先要安装一些软件或插件:

  • Fluentd
  • webhdfs 输出插件 (out_webhdfs)
  • Apache HDFS

输出目的地将是WebHDFS。输出配置应该如下所示:

<match hdfs.*.*>
  @type webhdfs
  host namenode.your.cluster.local
  port 50070
  path "/log/%Y%m%d_%H/access.log.#{Socket.gethostname}"
  <buffer>
    flush_interval 10s
  </buffer>
</match>
  • <match>部分指定了用于查找匹配标签的正则表达式。如果日志中的标签匹配,则使用相应的匹配配置(即相应地路由日志)。
  • flush_interval参数指定数据写入HDFS的频率。追加操作用于将传入数据追加到path参数指定的文件中。 时间和主机名的占位符可以与path参数一起使用。这可以防止多个Fluentd实例将数据追加到同一个文件中,而追加操作必须避免这种情况。
  • 其他选项指定HDFS的NameNode 的主机和端口。

除了配置 fluentd 的配置外,还需要对 hdfs 的配置做一些修改,将以下配置添加到 hdfs-site.xml 文件中,然后重新启动整个群集:

<property>
  <name>dfs.webhdfs.enabled</name>
  <value>true</value>
</property>

<property>
  <name>dfs.support.append</name>
  <value>true</value>
</property>

<property>
  <name>dfs.support.broken.append</name>
  <value>true</value>
</property>

同时要确认 hdfs 用户对指定为网络文件系统输出的路径具有写权限。

@type stdout

这个类型的插件,适用于 debug 时使用的。

在使用其他输出插件时,如果在目的接收端收不到日志,或收到的日志数据不准确,可以先将日志输出到 stdout 进行查看。

它的配置很简单:

<match pattern>
  @type stdout
</match>

其中 pattern 要替换成需要匹配 tag 的正则表达式,如果想要匹配全部 tag,pattern 替换成 *.*

当然,如果debug,还有一种方法,在任意一个 插件类型的下方,添加一个 @log_level debug ,控制台就会输出 debug 级别的日志,否则,默认只输出 info 级别的日志。

@type copy

match 匹配到第一个 match,就直接输出了,不会再继续匹配下一个 match,如果需要将日志同时输出到两个地方,就需要用输出插件中的 copy 搞搞。

具体配置如下:

<match pattern>
  @type copy
  <store>
    @type file
    path /var/log/fluent/myapp1
    ...
  </store>
  <store>
    ...
  </store>
  <store>
    ...
  </store>
</match>

fluentd 生命周期

在 fluentd 中有以下几个类型: source、 parser、filter、output 四种。

parse 和 filter 用于解析和过滤,在我的这次项目中没有用到。

在 fluentd 中,数据的流向如下:

 source -> parser -> filter -> output

从  source 数据源进来,流过所有的 parser 和 filter ,最后优先匹配到一个 output 输出出去。

 类似资料:

相关阅读

相关文章

相关问答