tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /usr/local/src/
cd /usr/local/src
mv apache-flume-1.7.0-bin/ flume
vi /etc/profile
export FLUME_HOME=/usr/local/src/flume
export PATH=$PATH:$FLUME_HOME/bin
source /etc/profile
cd $FLUME_HOME/conf/
cp flume-env.sh.template flume-env.sh
#使用mobax自带编辑器或 vi命令修改 flume-env.sh
vi flume-env.sh
#修改第22行jdk的路径为以下值
export JAVA_HOME=/usr/local/src/jdk
cd /usr/local/src
scp -r flume/ slave1:$PWD
scp -r flume/ slave2:$PWD
flume-ng version
案例1
(1) 需求: 监听文件内容变动,将新增加的内容输出到控制台。
(2) 实现: 主要使用 Exec Source 配合 tail 命令实现。新建一个文件 /root/data/stuinfo.log
touch /root/data/stuinfo.log
(3) 配置:创建一个配置文件 exec-memory-logger.properties
配置文件的后缀名与flume配置模板文件一致。
cd $FLUME_HOME/conf/
vi exec-memory-logger.properties
#向配置文件中追加写入以下配置内容
#命名组件
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /root/data/stuinfo.log
a1.sources.s1.shell = /bin/bash -c
#sink
a1.sinks.k1.type = logger
#chennal
a1.channels.c1.type = memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactioncapacity=100
#bind
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
(4) 启动Flume
flume-ng agent --name a1 --conf conf/ --conf-file ./conf/exec-memory-
logger.properties -Dflume.root.logger=INFO,console
(5)测试
echo 123456789 >> /root/data/log.txt
echo asdfghjkldaskfjas >> /root/data/log.txt
100,张三丰,200,男
mkdir /root/logs
(3) 配置: 创建一个配置文件 spooling-memory-hdfs.properties
cd $FLUME_HOME/conf/
vi spooling-memory-hdfs.properties
#向新建的配置文件中追加写入以下配置内容
#命名组件
a1.sources = s1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /root/logs
a1.sources.s1.basenameHeader = true
a1.sources.s1.basenameHeaderKey = fileName
#sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:8020/flumedata/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.filePrefix = %{fileName}
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactioncapacity=100
#bind
a1.sinks.k1.channel = c1
a1.sources.s1.channels = c1
(4)启动Flume
cd $FLUME_HOME
flume-ng agent --name a1 --conf conf/ --conf-file ./conf/exec-memory-
logger.properties -Dflume.root.logger=INFO,console
(5)测试
cp log.txt /root/logs/
cd /root/tools
tar -zxvf kafka_2.11-2.0.0.tgz -C /usr/local/src/
#1.重命名
cd /usr/local/src
mv kafka_2.11-2.0.0/ kafka
#2. 配置环境变量
vi /etc/profile
#向文件中追加以下配置项
export KAFKA_HOME=/usr/local/src/kafka
export PATH=$PATH:$KAFKA_HOME/bin
#3.重新加载配置文件
source /etc/profile
(1)在master节点上修改config目录中server.properties配置文件
cd $KAFKA_HOME/config
vi server.properties
############################# Server Basics #############################
#修改broker.id, kafka集群中的每个节点上都要唯一
broker.id=1
############################# Socket Server Settings #############################
# 配置监听,修改为本机ip或主机名
advertised.listeners==PLAINTEXT://master:9092
############################# Log Basics #############################
# kafka 运行日志和数据的存放路径
log.dirs=/usr/local/src/kafka/kafka-logs
# topic 在当前broker上的分片个数,与broker保持一致
num.partitions=3
############################# Zookeeper #############################
# 配置三台服务zookeeper连接地址
zookeeper.connect=master:2181,slave1:2181,slave2:2181
delete.topic.enable=true
(2)分发kafka安装目录到其他的节点
cd /usr/local/src
scp -r kafka/ slave1:$PWD
scp -r kafka/ slave2:$PWD
(3-1)分别修改slave1, slave2节点上的环境变量
#配置环境变量
vi /etc/profile
#向文件中追加以下配置项
export KAFKA_HOME=/usr/local/src/kafka
export PATH=$PATH:$KAFKA_HOME/bin
#重新加载配置文件
source /etc/profile
(3-2)分别修改slave1, slave2节点上kafka的配置文件server.properties
修改broker.id为2 和 3, 修改advertised.listeners主机名为slave1和slave2
cd /usr/local/src/kafka/config
vi server.properties
############################# Server Basics #############################
#修改broker.id, kafka集群中的每个节点上都要唯一
broker.id=2
############################# Socket Server Settings #############################
# 配置监听,修改为本机ip或主机名
advertised.listeners=PLAINTEXT://slave1:9092
ssh slave2
cd /usr/local/src/kafka/config
vi server.properties
############################# Server Basics #############################
#修改broker.id, kafka集群中的每个节点上都要唯一
broker.id=3
############################# Socket Server Settings #############################
# 配置监听,修改为本机ip或主机名
advertised.listeners=PLAINTEXT://slave2:9092
(1)在三个节点上依次启动zookeeper
zkServer.sh start
zkServer.sh status
(2)在三个节点上依次启动Kafka
cd $KAFKA_HOME
./bin/kafka-server-start.sh -daemon config/server.properties
(1)任意节点上执行以下命令创建主题
cd $KAFKA_HOME/bin
./kafka-topics.sh --create --topic testtopic --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partitions 3
(2)任意节点上执行以下命令查看主题列表
cd $KAFKA_HOME/bin
./kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
(3)查看Topic的详细信息
cd $KAFKA_HOME/bin
./kafka-topics.sh --describe --topic testtopic --zookeeper master:2181,slave1:2181,slave2:2181
(4)删除指定topic
./kafka-topics.sh --delete --topic testtopic --zookeeper master:2181,slave1:2181,slave2:2181
cd $KAFKA_HOME/bin
./kafka-server-stop.sh
zkServer.sh stop
cd $KAFKA_HOME
mkdir zkdata/ zklog/
cd $KAFKA_HOME/config
vi zookeeper.properties
#修改zookeeper的数据目录路径
dataDir=/usr/local/src/kafka/zkdata
#注释掉 maxClientCnxns=0
# maxClientCnxns=0
#追加以下内容
dataLogDir=/usr/local/src/kafka/zklog
tlientCnxns=0
initLimit=10
syncLimit=5
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888
cd $KAFKA_HOME/zkdata
touch myid
echo 1 > myid
cd $KAFKA_HOME
scp -r zkdata/ zklog/ config/ slave1:$PWD
scp -r zkdata/ zklog/ config/ slave2:$PWD
#(1)修改slave1节点上的myid
ssh slave1
cd $KAFKA_HOME/zkdata
echo 2 > myid
#(2)修改slave2节点上的myid
ssh slave1
cd $KAFKA_HOME/zkdata
echo 3 > myid
cd $KAFKA_HOME/config
vi server.properties
############################# Server Basics #############################
#修改broker.id, kafka集群中的每个节点上都要唯一
broker.id=1
############################# Socket Server Settings #############################
# 配置监听,修改为本机ip或主机名
advertised.listeners==PLAINTEXT://master:9092
############################# Log Basics #############################
# kafka 运行日志和数据的存放路径
log.dirs=/usr/local/src/kafka/kafka-logs
# topic 在当前broker上的分片个数,与broker保持一致
num.partitions=3
############################# Zookeeper #############################
# 配置三台服务zookeeper连接地址
zookeeper.connect=master:2181,slave1:2181,slave2:2181
delete.topic.enable=true
cd $KAFKA_HOME/config
scp -r server.properties slave1:$PWD
scp -r server.properties slave2:$PWD
修改broker.id为2 和 3, 修改advertised.listeners主机名为slave1和slave2
(1) 修改slave1
ssh slave1
cd /usr/local/src/kafka/config
vi server.properties
############################# Server Basics #############################
#修改broker.id, kafka集群中的每个节点上都要唯一
broker.id=2
############################# Socket Server Settings #############################
# 配置监听,修改为本机ip或主机名
advertised.listeners=PLAINTEXT://slave1:9092
(2)修改slave2
ssh slave2
cd /usr/local/src/kafka/config
vi server.properties
############################# Server Basics #############################
#修改broker.id, kafka集群中的每个节点上都要唯一
broker.id=3
############################# Socket Server Settings #############################
# 配置监听,修改为本机ip或主机名
advertised.listeners=PLAINTEXT://slave2:9092
第一个节点启动时会报错
connect refused
,是因为其余节点未启动的原因,无须理会,其余节点启动之后,报错会消失
#(1)master节点启动zookeeper(后台启动)
cd $KAFKA_HOME
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zklog/zookeeper.log 2>1 &
#(2)slave1节点启动zookeeper
ssh slave1
cd $KAFKA_HOME
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zklog/zookeeper.log 2>1 &
#(3)slave2节点启动zookeeper
ssh slave2
cd $KAFKA_HOME
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zklog/zookeeper.log 2>1 &
#(1)master节点启动
cd $KAFKA_HOME
#使用-daemon 参数在后天启动,打印信息不会占用控制台
bin/kafka-server-start.sh -daemon config/server.properties
#(2)slave1节点启动
ssh slave1
cd $KAFKA_HOME
bin/kafka-server-start.sh -daemon config/server.properties
#(2)slave2节点启动
ssh slave2
cd $KAFKA_HOME
bin/kafka-server-start.sh -daemon config/server.properties
#创建主题test-topic
bin/kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --topic test-topic --replication-factor 3 --partitions 3
#查看topic列表
bin/kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
#查看topic详细信息
bin/kafka-topics.sh --describe --zookeeper master:2181,slave1:2181,slave2:2181 --topic test-topic
#删除topic,注意,只是删除Topic在zk的元数据,日志数据仍需手动删除
bin/kafka-topics.sh --delete --zookeeper master:2181,slave1:2181,slave2:2181 -topic test-topic
#三个节点上执行以下命令,停止zookeeper
cd $KAFKA_HOME
bin/zookeeper-server-stop.sh
#三个节点上执行以下命令,停止kafka
cd $KAFKA_HOME
bin/kafka-server-stop.sh
zkServer.sh start
zkServer.sh status
cd $KAFKA_HOME
bin/kafka-server-start.sh -daemon config/server.properties
(1)slave1节点开启生产者
#创建topic
cd $KAFKA_HOME
bin/kafka-topics.sh --create --topic producer-consumer --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 3 --partitions 3
#查看topic
bin/kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
#开启生产者producer
bin/kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic producer-consumer
(2)slave2节点作为消费者
#开启消费者consumer
bin/kafka-console-consumer.sh --from-beginning --bootstrap-server master:9092,slave1:9092,slave2:9092 --topic producer-consumer
(3)生产者发布消息,消费者读取消息
#生产者进程中发布以下信息
生产者向topic发送信息,消费者能够读取到信息。Kafka就像一个消息公告栏!
所以kafka又被称作“分布式发布订阅消息系统”
#查看消费者端接收的信息
#所有节点上启动zookeeper
zkServer.sh start
zkServer.sh status
#所有节点上启动kafka
cd $KAFKA_HOME
bin/kafka-server-start.sh -daemon config/server.properties
jps
flume-kafka
,之后 Flume 收集到的数据都会发到这个主题#任意节点上创建主题 flume-kafka
cd $KAFKA_HOME
#如果版本老于2.2,应该用--zookeeper,并且端口号是2181, 使用--bootstrap-server会报错
bin/kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --topic flume-kafka --partitions 3 --replication-factor 3
#查看创建的主题
bin/kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
flume-kafka
主题:cd $KAFKA_HOME
bin/kafka-console-consumer.sh --from-beginning --bootstrap-server master:9092,slave1:9092,slave2:9092 --topic flume-kafka
cd $FLUME_HOME/conf
vi log-flume-kafka.properties
#文件中写入以下配置
a1.sources = s1
a1.sinks = k1
a1.channels = c1
a1.sources.s1.type=exec
a1.sources.s1.command=tail -F /root/data/stuinfo.log
a1.sources.s1.shell= /bin/bash -c
a1.sources.s1.channels=c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic=flume-kafka
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder
a1.sinks.k1.channel = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
cd
vi outputlog.sh
#向文件中写入以下内容
#!/bin/bash
for((i=5612;i<6000;i++));
do
echo 'When we will see you again. Put a little sunshine in your life.----' + $i >> $PWD/data/stuinfo.log
done
flume-ng agent --conf conf/ --conf-file /usr/local/src/flume/conf/log-flume-kafka.properties --name a1 -Dflume.root.logger=INFO,console
cd
chmod 777 outputlog.sh
./outputlog.sh