安装kafka和zookeeper教程
三台服务器:192.168.133.3 192.168.133.4 192.168.133.5
安装包版本:zookeeper-3.4.13? ?kafka_2.12-2.1.1
java版本:
[root@linux02 kafka_2.11-2.1.0]# java -version
java version "1.8.0_141"
Java(TM) SE Runtime Environment (build 1.8.0_141-b15)
Java HotSpot(TM) 64-Bit Server VM (build 25.141-b15, mixed mode)
wget http://mirrors.shu.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
tar -zxvf zookeeper-3.4.13.tar.gz
cp /opt/zookeeper-3.4.13/conf/zoo_sample.cfg?/opt/zookeeper-3.4.13/conf/zoo.cfg
vi /opt/apps/zookeeper-3.4.13/conf/zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/zookeeper/data
dataLogDir=/home/zookeeper/dataLog
server.1=192.168.133.3:2888:3888
server.2=192.168.133.4:2888:3888
server.3=192.168.133.5:2888:3888
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
说明:
1.dataDir 和 dataLogDir 需要在启动前创建完成
2.clientPort 为 zookeeper的服务端口
3.server.1、server.2 、server.3为zookeeper集群中三个node的信息,定义格式为 hostname:port1:port2,其中 port1 是 node 间通信使用的端口,port2 是node 选举使用的端口,需确保三台主机的这两个端口都是互通的。
节点一:
[root@server1 download]# echo "1" > /home/zookeeper/data/myid
节点二:
[root@server1 download]# echo "2" > /home/zookeeper/data/myid
节点三:
[root@server1 download]# echo "3" > /home/zookeeper/data/myid
[root@server1 download]# vi /opt/apps/zookeeper-3.4.13/conf/log4j.properties
# Define some default values that can be overridden by system properties
zookeeper.root.logger=INFO, ROLLINGFILE
zookeeper.console.threshold=INFO
zookeeper.log.dir=/home/zookeeper/logs
zookeeper.log.file=zookeeper.log
zookeeper.log.threshold=DEBUG
zookeeper.tracelog.dir=/home/zookeeper/logs
zookeeper.tracelog.file=zookeeper_trace.log
vi /etc/profile
在/etc/profile文件最后添加以下内容:
export ZK_HOME=/opt/apps/zookeeper-3.4.13
export PATH=$PATH:$JAVA_HOME/bin:$ZK_HOME/bin
添加完成后保存退出并使之立即生效:
source /etc/profile
在防火墙中开启zookeeper使用的三个端口:
[root@server1 download]# firewall-cmd --zone=public --add-port=2181/tcp --permanent
[root@server1 download]# firewall-cmd --zone=public --add-port=2888/tcp --permanent
[root@server1 download]# firewall-cmd --zone=public --add-port=3888/tcp --permanent
重新加载防火墙配置:
[root@server1 download]# firewall-cmd --reload
在另外两个节点分别配置好zookeeper。
启动并验证zookeeper,分别在三个节点启动zookeeper:
[root@server1 download]# zkServer.sh start
启动完成后查看zookeeper状态,正常情况一个节点的Mode为leader,两个节点的Mode为follower
[root@server1 download]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: follower
[root@server1 download]# wget http://mirrors.shu.edu.cn/apache/kafka/2.1.1/kafka_2.12-2.1.1.tgz
[root@server1 download]# tar -zvxf kafka_2.12-2.1.1.tgz
[root@server1 download]# mv kafka_2.12-2.1.1 /opt/apps
[root@server1 download]# vi /opt/apps/kafka_2.12-2.1.1/config/server.properties
说明:下面的配置只列出里需要修改的项,没有列出的保持默认值
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
#指定broker的id
broker.id=1
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.133.3:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://192.168.133.3:9092
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka/logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3
############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion due to age
#log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
log.retention.bytes=268435456
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
#指定zk的地址
zookeeper.connect=192.168.133.3:2181,192.168.133.4:2181,192.168.133.5:2181
[root@server1 download]# mkdir -p /tmp/kafka/logs
[root@server1 download]# vi /etc/profile
将之前在文最下方添加的内容修改如下:
export ZOOKEEPER_HOME=/opt/apps/zookeeper-3.4.14
export KAFKA_HOME=/opt/apps/kafka_2.12-2.1.0
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME
修改防火墙,开放kafka通信端口:
[root@server1 download]# firewall-cmd --zone=public --add-port=9092/tcp --permanent
[root@server1 download]# firewall-cmd --reload
分别在两外两个节点安装并配置kafka,注意broker.id的值。
kafka-server-start.sh -daemon /opt/apps/kafka_2.11-2.1.0/config/server.properties
[root@server1.download]#kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
创建一个拥有三个分区,三个副本,名为test的topic:
[root@server1 download]# kafka-topics.sh --zookeeper linux01:2181,linux02:2181,linux03:2181 --create --topic test11 --replication-factor 3 --partitions 3
Created topic “test”.
kafka-topics.sh --describe --zookeeper 192.168.133.3:2181,192.168.133.4:2181,192.168.133.5:2181 --topic test11
[root@server1 download]#kafka-topics.sh --describe --zookeeper 192.168.133.3:2181,192.168.133.4:2181,192.168.133.5:2181 --topic test11
Topic:test11 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test11 Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test11 Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test11 Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
启动命令行一个生产者
kafka-console-producer.sh --broker-list linux01:9092,linux02:9092,linux03:9092 --topic test11 ----group test-consumer-group
启动一个命令行消费者
kafka-console-consumer.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --topic test11 --from-beginning
启动消费者组消费主题数据
kafka-console-consumer.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --topic test11 --group test-consumer-group
消费者提交偏移量消费
kafka-console-consumer.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --topic test11 --execute --reset-offsets --group test-consumer-group