当前位置: 首页 > 工具软件 > ng-pipes > 使用案例 >

flume-ng三大组件介绍与应用

焦苏燕
2023-12-01

一、概述
flume是分布式的,可靠的,用于从不同的来源有效收集 聚集 和 移动 大量的日志数据用以集中式的数据存储的系统,最初由Cloudra公司开发,现为Apache基金会的顶级项目之一。

二、 概念、模型
1. Flume Event Flume事件 - 被定义为一个具有有效荷载的字节数据流和可选的字符串属性集。
2. Flume Agent: Flume 代理 - 是一个进程承载从外部源事件流到下一个目的地的过程。包含source channel 和 sink
3. Source : 数据源 - 消耗外部传递给他的事件,外部源将数据按照Flume Source 能识别的格式将Flume 事件发送给Flume Source
4. Channel : 数据通道 - 是一个被动的存储,用来保持事件,直到由一个Flume Sink消耗。
5. Sink :数据汇聚点 - 代表外部数据存放位置。发送flume中的事件到指定的外部目标。
6. 复杂流动
Flume允许用户进行多级流动到最终目的地,也允许扇出流(一到多)、扇入流(多到一)的流动和故障转移、失败处理。
7. 可靠性
事务型的数据传递,保证数据的可靠性。
8. 可恢复
通道可以以内存或文件的方式实现,内存更快,但是不可恢复,文件比较慢但提供了可恢复性。

三、入门案列
首先需要通过一个配置文件来配置Agent。
通过flume提供的工具启动agent就可以工作了。

(1)编写配置文件

    #example.conf:单节点Flume配置
    #命名Agent a1的组件
    a1.sources  =  r1
    a1.sinks  =  k1
    a1.channels  =  c1
    #描述/配置Source
    a1.sources.r1.type  =  netcat
    a1.sources.r1.bind  =  0.0.0.0
    a1.sources.r1.port  =  44444
    #描述Sink
    a1.sinks.k1.type  =  logger
    #描述内存Channel
    a1.channels.c1.type  =  memory
    a1.channels.c1.capacity  =  1000 
    a1.channels.c1.transactionCapacity  =  100
    #为Channle绑定Source和Sink
    a1.sources.r1.channels  =  c1
    a1.sinks.k1.channel  =  c1

注意:
一个配置文件中可以配置多个Agent,一个Agent中可以包含多个Source Sink Channel
一个Source 可以绑定到多个通道,一个Sink只能绑定到一个通道

(2) 通过flume的工具启动agent
$ bin/flume-ng agent –conf conf –conf-file example.conf –name a1 -Dflume.root.logger=INFO,console
(3) 在windows中通过telnet命令连接flume所在机器的44444端口发送数据。发现,flume确实收集到了该信息。

五、source组件
1. Avro Source
监听Avro端口来接受来自外部Avro客户端的事件流。
利用Avro Source可以实现多级流动、扇出流、扇入流等效果。
另外也可以接受通过flume提供的Avro客户端发送的日志信息。

支持的属性:
!channels –
!type – 类型名称,”AVRO”
!bind – 需要监听的主机名或IP
!port – 要监听的端口
threads – 工作线程最大线程数
selector.type
selector.*
interceptors – 空格分隔的拦截器列表
interceptors.*
compression-type none 压缩类型,可以是“none”或“default”,这个值必须和AvroSource的压缩格式匹配
ssl false 是否启用ssl加密,如果启用还需要配置一个“keystore”和一个“keystore-password”.
keystore – 为SSL提供的 java密钥文件 所在路径
keystore-password – 为SSL提供的 java密钥文件 密码
keystore-type JKS 密钥库类型可以是 “JKS” 或 “PKCS12”.
exclude-protocols SSLv3 空格分隔开的列表,用来指定在SSL / TLS协议中排除。SSLv3将总是被排除除了所指定的协议。
ipFilter false 如果需要为netty开启ip过滤,将此项设置为true
ipFilterRules – 陪netty的ip过滤设置表达式规则

案例:

                        编写配置文件:
                        #命名Agent a1的组件
                        a1.sources  =  r1
                        a1.sinks  =  k1
                        a1.channels  =  c1
                        #描述/配置Source
                        a1.sources.r1.type  =  avro
                        a1.sources.r1.bind  =  0.0.0.0
                        a1.sources.r1.port  =  33333
                        #描述Sink
                        a1.sinks.k1.type  =  logger
                        #描述内存Channel
                        a1.channels.c1.type  =  memory
                        a1.channels.c1.capacity  =  1000
                        a1.channels.c1.transactionCapacity  =  100
                        #为Channle绑定Source和Sink
                        a1.sources.r1.channels  =  c1
                        a1.sinks.k1.channel  =  c1

启动flume:
./flume-ng agent –conf ../conf –conf-file ../conf/template2.conf –name a1 -Dflume.root.logger=INFO,console
通过flume提供的avro客户端向指定机器指定端口发送日志信息:
./flume-ng avro-client –conf ../conf –host 0.0.0.0 –port 33333 –filename ../mydata/log1.txt
发现确实收集到了日志
2. Exec Source
可以将命令产生的输出作为源
属性说明:
!channels –
!type – 类型名称,需要是”exec”
!command – 要执行的命令
shell – A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.
restartThrottle 10000 毫秒为单位的时间,用来声明等待多久后尝试重试命令
restart false 如果cmd挂了,是否重启cmd
logStdErr false 无论是否是标准错误都该被记录
batchSize 20 同时发送到通道中的最大行数
batchTimeout 3000 如果缓冲区没有满,经过多长时间发送数据
selector.type 复制还是多路复用
selector.* Depends on the selector.type value
interceptors – 空格分隔的拦截器列表
interceptors.*
案例:
编写配置文件:

            #命名Agent a1的组件
            a1.sources  =  r1
            a1.sinks  =  k1
            a1.channels  =  c1

            #描述/配置Source
            a1.sources.r1.type  =  avro
            a1.sources.r1.bind  =  0.0.0.0
            a1.sources.r1.port  =  33333

            #描述Sink
            a1.sinks.k1.type  =  logger
            #描述内存Channel
            a1.channels.c1.type  =  memory
            a1.channels.c1.capacity  =  1000
            a1.channels.c1.transactionCapacity  =  100

            #为Channle绑定Source和Sink
            a1.sources.r1.channels  =  c1
            a1.sinks.k1.channel  =  c1

启动flume:
./flume-ng agent –conf ../conf –conf-file ../conf/template2.conf –name a1 -Dflume.root.logger=INFO,console

注意:可以通过tail命令,收集日志文件中后续追加的日志

3. Spooling Directory Source    

这个Source允许你将文件将要收集的数据放置到”自动搜集”目录中。这个Source将监视该目录,并将解析新文件的出现。事件处理逻辑是可插拔的,当一个文件被完全读入信道,它会被重命名或可选的直接删除。
要注意的是,放置到自动搜集目录下的文件不能修改,如果修改,则flume会报错。
另外,也不能产生重名的文件,如果有重名的文件被放置进来,则flume会报错。

    属性说明:

!channels –
!type – 类型,需要指定为”spooldir”
!spoolDir – 读取文件的路径,即”搜集目录”
fileSuffix .COMPLETED 对处理完成的文件追加的后缀
deletePolicy never 处理完成后是否删除文件,需是”never”或”immediate”
fileHeader false Whether to add a header storing the absolute path filename.
fileHeaderKey file Header key to use when appending absolute path filename to event header.
basenameHeader false Whether to add a header storing the basename of the file.
basenameHeaderKey basename Header Key to use when appending basename of file to event header.
ignorePattern ^$ 正则表达式指定哪些文件需要忽略
trackerDir .flumespool Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
consumeOrder 处理文件的策略,oldest, youngest 或 random。
maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter.
batchSize 100 Granularity at which to batch transfer to the channel
inputCharset UTF-8 读取文件时使用的编码。
decodeErrorPolicy FAIL 当在输入文件中发现无法处理的字符编码时如何处理。FAIL:抛出一个异常而无法 ​​解析该文件。REPLACE:用“替换字符”字符,通常是Unicode的U + FFFD更换不可解析角色。 忽略:掉落的不可解析的字符序列。
deserializer LINE 声明用来将文件解析为事件的解析器。默认一行为一个事件。处理类必须实现EventDeserializer.Builder接口。
deserializer.* Varies per event deserializer.
bufferMaxLines – (Obselete) This option is now ignored.
bufferMaxLineLength 5000 (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.
selector.type replicating replicating or multiplexing
selector.* Depends on the selector.type value
interceptors – Space-separated list of interceptors
interceptors.*
案例:
编写配置文件:

            #命名Agent a1的组件
            a1.sources  =  r1
            a1.sinks  =  k1
            a1.channels  =  c1

            #描述/配置Source
            a1.sources.r1.type  = spooldir
            a1.sources.r1.spoolDir  = /home/park/work/apache-flume-1.6.0-bin/mydata

            #描述Sink
            a1.sinks.k1.type  =  logger
            #描述内存Channel
            a1.channels.c1.type  =  memory
            a1.channels.c1.capacity  =  1000
            a1.channels.c1.transactionCapacity  =  100

            #为Channle绑定Source和Sink
            a1.sources.r1.channels  =  c1
            a1.sinks.k1.channel  =  c1


        启动flume:
            ./flume-ng agent --conf ../conf --conf-file ../conf/template4.conf --name a1 -Dflume.root.logger=INFO,console

        向指定目录中传输文件,发现flume收集到了该文件,将文件中的每一行都作为日志来处理。

4 . NetCat Source
一个NetCat Source用来监听一个指定端口,并将接收到的数据的每一行转换为一个事件。

属性说明:
!channels –
!type – 类型名称,需要被设置为”netcat”
!bind – 指定要绑定到的ip或主机名。
!port – 指定要绑定到的端口号
max-line-length 512 单行最大字节数
ack-every-event true 对于收到的每一个Event是否响应”OK”
selector.type
selector.*
interceptors –
interceptors.*

    案例:
        参见快速入门案例

5 . Sequence Generator Source – 序列发生器源
一个简单的序列发生器,不断的产生事件,值是从0开始每次递增1。
主要用来进行测试。

参数说明:
!channels –
!type – 类型名称,必须为”seq”
selector.type
selector.*
interceptors –
interceptors.*
batchSize 1

案例:
编写配置文件:

            #命名Agent a1的组件
            a1.sources  =  r1
            a1.sinks  =  k1
            a1.channels  =  c1

            #描述/配置Source
            a1.sources.r1.type  = seq

            #描述Sink
            a1.sinks.k1.type  =  logger
            #描述内存Channel
            a1.channels.c1.type  =  memory
            a1.channels.c1.capacity  =  1000
            a1.channels.c1.transactionCapacity  =  100

            #为Channle绑定Source和Sink
            a1.sources.r1.channels  =  c1
            a1.sinks.k1.channel  =  c1

启动flume:
./flume-ng agent –conf ../conf –conf-file ../conf/template4.conf –name a1 -Dflume.root.logger=INFO,console
发现打印了日志

6 . HTTP Source
此Source接受HTTP的GET和POST请求作为Flume的事件。
其中GET方式应该只用于试验。
需要提供一个可插拔的”处理器”来将请求转换为事件对象,这个处理器必须实现HTTPSourceHandler接口
这个处理器接受一个 HttpServletRequest对象,并返回一个Flume Envent对象集合。
从一个HTTP请求中得到的事件将在一个事务中提交到通道中。thus allowing for increased efficiency on channels like the file channel。
如果处理器抛出一个异常,Source将会返回一个400的HTTP状态码。
如果通道已满,无法再将Event加入Channel,则Source返回503的HTTP状态码,表示暂时不可用。

参数说明:
!type 类型,必须为”HTTP”
!port – 监听的端口
bind 0.0.0.0 监听的主机名或ip
handler org.apache.flume.source.http.JSONHandler 处理器类,需要实现HTTPSourceHandler接口
handler.* – 处理器的配置参数
selector.type
selector.*
interceptors –
interceptors.*
enableSSL false 是否开启SSL,如果需要设置为true。注意,HTTP不支持SSLv3。
excludeProtocols SSLv3 空格分隔的要排除的SSL/TLS协议。SSLv3总是被排除的。
keystore 密钥库文件所在位置。
keystorePassword Keystore 密钥库密码

案例:
编写配置文件:

            #命名Agent a1的组件
            a1.sources  =  r1
            a1.sinks  =  k1
            a1.channels  =  c1

            #描述/配置Source
            a1.sources.r1.type  = http
            a1.sources.r1.port  = 66666

            #描述Sink
            a1.sinks.k1.type  =  logger
            #描述内存Channel
            a1.channels.c1.type  =  memory
            a1.channels.c1.capacity  =  1000
            a1.channels.c1.transactionCapacity  =  100

            #为Channle绑定Source和Sink
            a1.sources.r1.channels  =  c1
            a1.sinks.k1.channel  =  c1

启动flume:
./flume-ng agent –conf ../conf –conf-file ../conf/template6.conf –name a1 -Dflume.root.logger=INFO,console
通过命令发送HTTP请求到指定端口:
curl -X POST -d ‘[{ “headers” :{“a” : “a1”,”b” : “b1”},”body” : “hello~http~flume~”}]’ http://0.0.0.0:6666
发现flume收集到了日志

常见的Handler:
JSONHandler
可以处理JSON格式的数据,并支持UTF-8 UTF-16 UTF-32字符集
该handler接受Evnet数组,并根据请求头中指定的编码将其转换为Flume Event
如果没有指定编码,默认编码为UTF-8.
JSON格式如下:

            [{
              "headers" : {
                         "timestamp" : "434324343",
                         "host" : "random_host.example.com"
                         },
              "body" : "random_body"
              },
              {
              "headers" : {
                         "namenode" : "namenode.example.com",
                         "datanode" : "random_datanode.example.com"
                         },
              "body" : "really_random_body"
              }]


To set the charset, the request must have content type specified as application/json;charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 as required).
        One way to create an event in the format expected by this handler is to use JSONEvent provided in the Flume SDK and use Google Gson to create the JSON string using the Gson#fromJson(Object, Type) method.
        Typetype=newTypeToken<List<JSONEvent>>(){}.getType();

BlobHandlerBlobHandler是一种将请求中上传文件信息转化为event的处理器。参数说明,加!为必须属性:!handler – The FQCN of this class:org.apache.flume.sink.solr.morphline.BlobHandlerhandler.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request

7 . Custom source – 自定义源
自定义源是自己实现源接口得到的。
自定义源的类和其依赖包必须在开始时就放置到Flume的类加载目录下。

    参数说明,加!为必须属性:
        !channels   –    
        !type   –   类型,必须设置为自己的自定义处理类的全路径名
        selector.type       
        selector.*  
        interceptors    –   
        interceptors.*  

六、 sink组件
周末,先歇会儿

 类似资料: