原文地址:http://www.linuxidc.com/Linux/2013-06/86598.htm
1.实验场景
操作系统:CentOS 5.5 JDK版本:1.7.0_21 Flume版本:1.3.1 Hadoop版本:0.20.2
配置1个agent ,2个collector,1个storage |
2.安装步骤JDK+flume
#下载安装jdk1.7 http://www.Oracle.com/technetwork/java/javase/downloads/index.html tar zxvf jdk-7u21-linux-x64.gz -C /usr/local/
#/etc/profile增加环境变量 pathmunge /usr/local/jdk1.7.0_21/bin export JAVA_HOME=/usr/local/jdk1.7.0_21/ export JRE_HOME=/usr/local/jdk1.7.0_21/jre export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
#验证java java -version
#下载安装flume 1.3.1 Flume的下载地址 http://www.apache.org/dyn/closer.cgi/flume/1.3.1/apache-flume-1.3.1-bin.tar.gz tar zxvf flume-distribution-0.9.4-bin.tar.gz -C /usr/local/
#/etc/profile增加环境变量 export FLUME_HOME=/usr/local/apache-flume-1.3.1-bin export FLUME_CONF_DIR=$FLUME_HOME/conf export PATH=.:$PATH::$FLUME_HOME/bin
#验证 flume # flume-ng version Flume 1.3.1 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 77b5d2885fecb3560a873bd89f49cbac8a010347 Compiled by hshreedharan on Fri Dec 21 22:14:21 PST 2012 From source with checksum 2565bdfd8b6af459dbf85c6960f189a5 |
3.一个简单的例子
#设置配置文件 [root@cc-staging-loginmgr2 conf]# cat example.conf # example.conf: A single-node Flume configuration
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1
# Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444
# Describe the sink a1.sinks.k1.type = logger
# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
#命令参数说明 -c conf 指定配置目录为conf -f conf/example.conf 指定配置文件为conf/example.conf -n a1 指定agent名字为a1,需要与example.conf中的一致 -Dflume.root.logger=INFO,console 指定DEBUF模式在console输出INFO信息
#启动agent cd /usr/local/apache-flume-1.3.1-bin flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console
2013-05-24 00:00:09,288 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:150)] Source starting 2013-05-24 00:00:09,303 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
#在另一个终端进行测试 [root@cc-staging-loginmgr2 conf]# telnet 127.0.0.1 44444 Trying 127.0.0.1... Connected to localhost.localdomain (127.0.0.1). Escape character is '^]'. hello world! OK
#在启动的终端查看console输出 2013-05-24 00:00:24,306 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D hello world!. }
#测试成功,flume可以正常使用 |
4. Flume Source测试
测试1: avro source可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制 #设置avro配置文件 [root@cc-staging-loginmgr2 conf]# cat avro.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1
# Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141
# Describe the sink a1.sinks.k1.type = logger
# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
#启动flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f avro.conf -n a1 -Dflume.root.logger=INFO,console
#创建指定文件 echo "hello world" > /usr/logs/log.10
#使用avro-client发送文件 flume-ng avro-client -c . -H localhost -p 4141 -F /usr/logs/log.10
#在启动的终端查看console输出 2013-05-27 01:11:45,852 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world}
测试2: Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out
#修改的配置文件 [root@cc-staging-loginmgr2 conf]# cat exec.conf # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = cat /usr/logs/log.10 a1.sources.r1.channels = c1
#启动flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f exec.conf -n a1 -Dflume.root.logger=INFO,console
#追加内容到文件 echo "exec test" >> /usr/logs/log.10
#在启动的终端查看console输出 2013-05-27 01:50:12,825 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world } 2013-05-27 01:50:12,826 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 exec test }
#如果要使用tail命令,必选使得file足够大才能看到输出内容 a1.sources.r1.command = tail -F /usr/logs/log.10
#生成足够多的内容在文件里 for i in {1..100};do echo "exec test$i" >> /usr/logs/log.10;echo $i;done
#可以在console看到output 2013-05-27 19:17:18,157 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:155)] Exec source starting with command:tail -n 5 -F /usr/logs/log.10 2013-05-27 19:19:50,334 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 37 exec test7 }
测试3: Spooling directory source This source lets you ingest data by dropping files in a spooling directory on disk. Unlike other asynchronous sources, this source avoids data loss even if Flume is restarted or fails. SpoolSource:是监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:1) 拷贝到spool目录下的文件不可以再打开编辑。 2) spool目录下不可包含相应的子目录
#修改的配置文件 [root@cc-staging-loginmgr2 conf]# cat spool.conf # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /usr/logs/flumeSpool a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1
#启动flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f spool.conf -n a1 -Dflume.root.logger=INFO,console
#追加内容到spool目录 [root@cc-staging-loginmgr2 ~]# echo "spool test1" > /usr/logs/flumeSpool/spool1.log
#在启动的终端查看console输出 2013-05-27 22:49:06,098 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:229)] Preparing to move file /usr/logs/flumeSpool/spool1.log to /usr/logs/flumeSpool/spool1.log.COMPLETED 2013-05-27 22:49:06,101 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{file=/usr/logs/flumeSpool/spool1.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31 spool test1 }
测试4 Netcat source 参见第3部分一个简单的例子
测试5 Syslog tcp source
#修改的配置文件 [root@cc-staging-loginmgr2 conf]# cat syslog.conf # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1
#启动flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f syslog.conf -n a1 -Dflume.root.logger=INFO,console
#测试产生syslog, <37>因为需要wire format数据,否则会报错” Failed to extract syslog wire entry” echo "<37>hello via syslog" | nc localhost 5140
#在启动的终端查看console输出 2013-05-27 23:39:10,755 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 73 79 73 6C 6F 67 hello via syslog }
#UDP需要修改配置文件 a1.sources.r1.type = syslogudp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1
#测试产生syslog echo "<37>hello via syslog" | nc -u localhost 5140
测试6 HTTP source JSONHandler
#修改的配置文件 [root@cc-staging-loginmgr2 conf]# cat post.conf # Describe/configure the source a1.sources = r1 a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 5140 a1.sources.r1.channels = c1
#启动flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f post.conf -n a1 -Dflume.root.logger=INFO,console
#生成JSON 格式的POST request curl -X POST -d '[{ "headers" :{"namenode" : "namenode.example.com","datanode" : "random_datanode.example.com"},"body" : "really_random_body"}]' http://localhost:5140
#在启动的终端查看console输出 2013-05-28 01:17:47,186 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{namenode=namenode.example.com, datanode=random_datanode.example.com} body: 72 65 61 6C 6C 79 5F 72 61 6E 64 6F 6D 5F 62 6F really_random_bo } |
1.flume sink 测试
测试1 #hdfs sink Using this sink requires Hadoop to be installed so that Flume can use the Hadoop jars to communicate with the HDFS cluster 需要安装hadoop
在/usr/local/apache-flume-1.3.1-bin/conf/flume-env.sh加入 export HADOOP_HOME=/usr/local/hadoop
#修改配置文件 a1.sources.r1.type = syslogtcp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5140 a1.sources.r1.channels = c1
a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hadoop/flume/collected/ a1.sinks.k1.hdfs.filePrefix = Syslog a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute
#启动flume agent a1 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f hdfs.conf -n a1 -Dflume.root.logger=INFO,console
#测试产生syslog echo "<37>hello via syslog to hdfs testing one" | nc -u localhost 5140
#在启动的终端查看console输出,文件生成成功 2013-05-29 00:53:58,078 (hdfs-k1-call-runner-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)] Creating hdfs://master:9000/user/hadoop/flume/collected//Syslog.1369814037714.tmp 2013-05-29 00:54:28,220 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)] Renaming hdfs://master:9000/user/hadoop/flume/collected/Syslog.1369814037714.tmp to hdfs://master:9000/user/hadoop/flume/collected/Syslog.1369814037714
#在hadoop上查看文件 ./hadoop dfs -cat hdfs://172.25.4.35:9000/user/hadoop/flume/collected/Syslog.1369814037714 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable^;>Gv$hello via syslog to hdfs testing one
#修改配置文件以时间形式自动生成目录 a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5140 a1.sources.r1.channels = c1
# Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hadoop/flume/collected/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.filePrefix = Syslog.%{host} a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute
#生成JSON 格式的POST request, header的timestamp 参数如果格式不对则无法解析 需要生成13为的timestamp才能解析出正确的时间,包含MilliSec #linux生成当前时间10位Unix timestamp date +%s #linux生成当前时间13位Unix timestamp date +%s%N|awk '{print substr($0,1,13)}'
curl -X POST -d '[{ "headers":{"timestamp":"1369818213654","host":"cc-staging-loginmgr2"},"body": "hello via post"}]' http://localhost:5140
#在启动的终端查看console输出,文件生成成功 2013-05-29 02:03:38,646 (hdfs-k1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)] Creating hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614.tmp 2013-05-29 02:04:08,714 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)] Renaming hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614.tmp to hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614
#在hadoop上查看文件 ./hadoop dfs -ls hdfs://172.25.4.35:9000/user/hadoop/flume/collected/2013-05-29/0203 Found 1 items -rw-r--r-- 3 root supergroup 129 2013-05-29 02:04 /user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614
#测试2 logger sink Logs event at INFO level. Typically useful for testing/debugging purpose
#测试3 Avro sink Flume events sent to this sink are turned into Avro events and sent to the configured hostname / port pair
#Avro Source配置文件 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4545
#Avro Sink配置文件 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 172.25.4.23 a1.sinks.k1.port = 4545
#先启动Avro的Source,监听端口 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f avro.conf -n a1 -Dflume.root.logger=INFO,console
#再启动Avro的Sink cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f avro_sink.conf -n a1 -Dflume.root.logger=INFO,console
#可以看到已经建立连接 2013-06-02 19:23:00,237 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x7a0e28bf, /172.25.4.32:14894 => /172.25.4.23:4545] CONNECTED: /172.25.4.32:14894
#在Avro Sink上生成测试log echo "<37>hello via avro sink" | nc localhost 5140
#在Avro Source上可以看到log已经生成 2013-06-02 19:24:13,740 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 61 76 72 6F 20 73 hello via avro s }
#测试4 File Roll Sink Stores events on the local filesystem
#修改配置文件 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume
#启动file roll 配置文件 cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f file_roll.conf -n a1 -Dflume.root.logger=INFO,console
#生成测试log echo "<37>hello via file roll" | nc localhost 5140 echo "<37>hello via file roll 2" | nc localhost 5140
#查看/var/log/flume下是否生成文件,默认每30秒生成一个新文件 -rw-r--r-- 1 root root 20 Jun 2 19:44 1370227443397-1 -rw-r--r-- 1 root root 0 Jun 2 19:44 1370227443397-2 -rw-r--r-- 1 root root 22 Jun 2 19:45 1370227443397-3
cat 1370227443397-1 1370227443397-3 hello via file roll hello via file roll 2 |
2.Flume Channels测试
#Memory Channel The events are stored in a an in-memory queue with configurable max size. It’s ideal for flow that needs higher throughput and prepared to lose the staged data in the event of a agent failures
#flume channel selectors # Replicating Channel Selector通道复制测试 #2个channel和2个sink的配置文件 # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2
# Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.selector.type = replicating a1.sources.r1.channels = c1 c2
# Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 172.25.4.23 a1.sinks.k1.port = 4545
a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = 172.25.4.33 a1.sinks.k2.port = 4545 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100
#查看是否都建立了连接 2013-06-04 00:01:53,467 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x122a0fad, /172.25.4.32:55518 => /172.25.4.23:4545] BOUND: /172.25.4.23:4545 2013-06-04 00:01:53,467 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x122a0fad, /172.25.4.32:55518 => /172.25.4.23:4545] CONNECTED: /172.25.4.32:55518
2013-06-04 00:01:53,773 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x021881a7, /172.25.4.32:23731 => /172.25.4.33:4545] BOUND: /172.25.4.33:4545 2013-06-04 00:01:53,773 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x021881a7, /172.25.4.32:23731 => /172.25.4.33:4545] CONNECTED: /172.25.4.32:23731
#生成测试log echo "<37>hello via channel selector" | nc localhost 5140
#查看2个sink是否得到数据 2013-06-04 00:02:06,479 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 63 68 61 6E 6E 65 hello via channe }
2013-06-04 00:02:09,788 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 63 68 61 6E 6E 65 hello via channe }
#flume channel selectors # Multiplexing Channel Selector 通道复用测试 #2个channel和2个sink的配置文件 a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2
# Describe/configure the source a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 5140 a1.sources.r1.host = 0.0.0.0 a1.sources.r1.selector.type = multiplexing a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 a1.sources.r1.selector.default = c1
# Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 172.25.4.23 a1.sinks.k1.port = 4545
a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = 172.25.4.33 a1.sinks.k2.port = 4545 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100
#根据配置文件生成测试的header 为state的POST请求 curl -X POST -d '[{ "headers" :{"state" : "CZ"},"body" : "TEST1"}]' http://localhost:5140 curl -X POST -d '[{ "headers" :{"state" : "US"},"body" : "TEST2"}]' http://localhost:5140 curl -X POST -d '[{ "headers" :{"state" : "SH"},"body" : "TEST3"}]' http://localhost:5140
#查看2个sink得到数据是否和配置文件一致 Sink1: 2013-06-04 23:45:35,296 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=CZ} body: 54 45 53 54 31 TEST1 } 2013-06-04 23:45:50,309 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=SH} body: 54 45 53 54 33 TEST3 }
Sink2: 2013-06-04 23:45:42,293 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=US} body: 54 45 53 54 32 TEST2 } |
1.Flume Sink Processors测试
#Failover Sink Processor Failover Sink Processor maintains a prioritized list of sinks, guaranteeing that so long as one is available events will be processed (delivered) #配置文件 # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000
# Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.selector.type = replicating a1.sources.r1.channels = c1 c2
# Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 172.25.4.23 a1.sinks.k1.port = 4545
a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = 172.25.4.33 a1.sinks.k2.port = 4545 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100
#生成测试log echo "<37>test1 failover" | nc localhost 5140
#在sink2上产生log,sink1由于优先级小,没有产生 2013-06-05 00:10:51,194 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 31 20 66 61 69 6C 6F 76 65 72 test1 failover }
#主动关闭sink2,再次生成测试log echo "<37>test2 failover" | nc localhost 5140
#在sink1上会同时生成test1和test2 2013-06-05 00:11:14,312 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 31 20 66 61 69 6C 6F 76 65 72 test1 failover } 2013-06-05 00:11:14,312 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 32 20 66 61 69 6C 6F 76 65 72 test2 failover }
#再次打开sink2,log会根据优先级再到sink2上 echo "<37>test4 failover" | nc localhost 5140 echo "<37>test5 failover" | nc localhost 5140
2013-06-05 00:12:33,071 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 34 20 66 61 69 6C 6F 76 65 72 test4 failover } 2013-06-05 00:12:55,088 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 35 20 66 61 69 6C 6F 76 65 72 test5 failover }
#Load balancing Sink Processor测试 Load balancing sink processor provides the ability to load-balance flow over multiple sinks. It maintains an indexed list of active sinks on which the load must be distributed.
#配置文件,注:load balance type下必须指定同一个channel到不同的sinks,否则不生效 # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin
# Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1
# Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 172.25.4.23 a1.sinks.k1.port = 4545
a1.sinks.k2.type = avro a1.sinks.k2.channel = c1 a1.sinks.k2.hostname = 172.25.4.33 a1.sinks.k2.port = 4545
# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
#生成4个测试log [root@cc-staging-loginmgr2 ~]# echo "<37>test2 loadbalance" | nc localhost 5140 [root@cc-staging-loginmgr2 ~]# echo "<37>test3 loadbalance" | nc localhost 5140 [root@cc-staging-loginmgr2 ~]# echo "<37>test4 loadbalance" | nc localhost 5140 [root@cc-staging-loginmgr2 ~]# echo "<37>test5 loadbalance" | nc localhost 5140
#查看sink输出结果是否为轮询模式 Sink1: 2013-06-06 01:36:03,516 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 32 20 6C 6F 61 64 62 61 6C 61 6E 63 test2 loadbalanc } 2013-06-06 01:36:09,769 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 34 20 6C 6F 61 64 62 61 6C 61 6E 63 test4 loadbalanc }
Sink2: 2013-06-06 01:36:05,809 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 33 20 6C 6F 61 64 62 61 6C 61 6E 63 test3 loadbalanc } 2013-06-06 01:36:37,057 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 35 20 6C 6F 61 64 62 61 6C 61 6E 63 test5 loadbalanc } |
2. Event Serializers测试
Body Text Serializer Alias: text. This interceptor writes the body of the event to an output stream without any transformation or modification(把body中的内容变成文本内容)
#配置文件 a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1
# Describe the sink a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume a1.sinks.k1.sink.serializer = text a1.sinks.k1.sink.serializer.appendNewline = false
#生成测试log curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST1 BODY TEXT"}]' http://localhost:5140 curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST2 BODY TEXT"}]' http://localhost:5140 curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : "TEST3 BODY TEXT"}]' http://localhost:5140
#查看file roll 文件中的文本内容 cat /var/log/flume/1370675739270-1 TEST1 BODY TEXT TEST2 BODY TEXT TEST3 BODY TEXT
#Avro Event Serializer Alias: avro_event. This interceptor serializes Flume events into an Avro container file 把flume event变成avro 中包含的文件
|
1.Flume Interceptors测试
Timestamp Interceptor This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor inserts a header with key timestamp whose value is the relevant timestamp
Host Interceptor This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host
#配置文件 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1
# Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5140 a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.preserveExisting = false a1.sources.r1.interceptors.i1.type = timestamp a1.sources.r1.interceptors.i2.type = host a1.sources.r1.interceptors.i2.hostHeader = hostname a1.sources.r1.interceptors.i2.useIP = false
# Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://master:9000/user/Hadoop/flume/collected/%Y-%m-%d/%H%M a1.sinks.k1.hdfs.filePrefix = %{hostname}.
# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
#启动agent cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f dynamic_intercept.conf -n a1 -Dflume.root.logger=INFO,console
#生成测试log echo "<37>test dynamic interceptor" | nc localhost 5140
#查看hdfs生成的文件,可以看到timestamp和hostname都已经生成在header里面,可以根据自定义的格式生成文件夹 ./hadoop dfs -ls hdfs://172.25.4.35:9000/user/hadoop/flume/collected/2013-06-16/2331/ Found 1 items -rw-r--r-- 3 root supergroup 140 2013-06-16 23:32 /user/hadoop/flume/collected/2013-06-16/2331/cc-staging-loginmgr2..1371450697118
Static Interceptor Static interceptor allows user to append a static header with static value to all events
#配置文件 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1
# Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = datacenter a1.sources.r1.interceptors.i1.value = NEW_YORK
# Describe the sink a1.sinks.k1.type = logger
# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
#启动agent cd /usr/local/apache-flume-1.3.1-bin/conf flume-ng agent -c . -f dynamic_intercept.conf -n a1 -Dflume.root.logger=INFO,console
#生成测试log echo "<37>test1 static interceptor" | nc localhost 5140
#查看console输出结果 2013-06-17 00:15:38,453 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4, datacenter=NEW_YORK} body: 74 65 73 74 31 20 73 74 61 74 69 63 20 69 6E 74 test1 static int } |
2. zabbix监控Flume
#JVM性能监控 Young GC counts sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java)|tail -1|awk '{print $6}'
Full GC counts sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java)|tail -1|awk '{print $8}'
JVM total memory usage sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java)|grep Total|awk '{print $3}'
JVM total instances usage sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java)|grep Total|awk '{print $2}'
#flume应用参数监控 启动时加上JSON repoting参数,这样就可以通过http://localhost:34545/metrics访问 flume-ng agent -c . -f exec.conf -n a1 -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
#生成一些数据 for i in {1..100};do echo "exec test$i" >> /usr/logs/log.10;echo $i;done
#通过shell脚本对JSON输出进行排版 [root@cc-staging-loginmgr2 conf]# curl http://localhost:34545/metrics 2>/dev/null|sed -e 's/\([,]\)\s*/\1\n/g' -e 's/[{}]/\n/g' -e 's/[",]//g'
CHANNEL.c1: EventPutSuccessCount:100 ChannelFillPercentage:0.0 Type:CHANNEL StopTime:0 EventPutAttemptCount:100 ChannelSize:0 StartTime:1371709073310 EventTakeSuccessCount:100 ChannelCapacity:1000 EventTakeAttemptCount:115
#配置监控flume的脚本文件 [root@cc-staging-loginmgr2 conf]#cat /opt/scripts/monitor_flume.sh curl http://localhost:34545/metrics 2>/dev/null|sed -e 's/\([,]\)\s*/\1\n/g' -e 's/[{}]/\n/g' -e 's/[",]//g'|grep $1|awk -F: '{print $2}'
#在zabbix agent配置文件进行部署 cat /etc/zabbix/zabbix_agentd/zabbix_agentd.userparams.conf UserParameter=ygc.counts,sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java|head -1)|tail -1|awk '{print $6}' UserParameter=fgc.counts,sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java|head -1)|tail -1|awk '{print $8}' UserParameter=jvm.memory.usage,sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java|head -1)|grep Total|awk '{print $3}' UserParameter=jvm.instances.usage,sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java|head -1)|grep Total|awk '{print $2}' UserParameter=flume.monitor[*],/bin/bash /opt/scripts/monitor_flume.sh $1 |