[root@bigdata003 ~]# wget https://dlcdn.apache.org/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
[root@bigdata003 ~]#
[root@bigdata003 ~]# tar -zxvf apache-flume-1.9.0-bin.tar.gz
拷贝得到flume-env.sh
[root@bigdata003 ~]# cp apache-flume-1.9.0-bin/conf/flume-env.sh.template apache-flume-1.9.0-bin/conf/flume-env.sh
[root@bigdata003 ~]#
然后修改flume-env.sh,修改内容如下:
export JAVA_HOME=/opt/jdk1.8.0_201
新建一个job,文件名称为flume-job.conf,内容如下:
# job的名称为j1
j1.sources = so1
j1.sinks = si1
j1.channels = c1
# source配置
j1.sources.so1.type = netcat
j1.sources.so1.bind = bigdata003
j1.sources.so1.port = 18888
# sink配置
j1.sinks.si1.type = logger
# channel进行短暂的event缓存
j1.channels.c1.type = memory
j1.channels.c1.capacity = 1000
j1.channels.c1.transactionCapacity = 100
# 绑定channel到source和sink
j1.sources.so1.channels = c1
j1.sinks.si1.channel = c1
启动job
[root@bigdata003 ~]# apache-flume-1.9.0-bin/bin/flume-ng agent --conf apache-flume-1.9.0-bin/conf --conf-file flume-job.conf --name j1 -Dflume.root.logger=INFO,console
......省略部分......
2022-04-08 17:36:24,351 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.8.113:18888]
安装netcat,然后通过netcat发送消息
[root@bigdata003 ~]# yum install netcat
[root@bigdata003 ~]#
[root@bigdata003 ~]# netcat bigdata003 18888
hello world
OK
hello china
OK
再次查看job界面,可以看到已经消费到了消息
2022-04-08 17:36:24,351 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.8.113:18888]
2022-04-08 17:43:42,367 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }
2022-04-08 17:43:44,491 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 63 68 69 6E 61 hello china }
也可以使用source type为TAILDIR,对目录下的所有文件进行监听