Kafka_2.12-2.1.0+Zookeeper-3.4.13集群部署详细教程

令狐宏浚
2023-12-01

安装kafka和zookeeper教程

1、安装准备

三台服务器:192.168.133.3  192.168.133.4 192.168.133.5
安装包版本:zookeeper-3.4.13? ?kafka_2.12-2.1.1

java版本:

[root@linux02 kafka_2.11-2.1.0]# java -version
java version "1.8.0_141"
Java(TM) SE Runtime Environment (build 1.8.0_141-b15)
Java HotSpot(TM) 64-Bit Server VM (build 25.141-b15, mixed mode)

2.安装zookeeper

2.1下载zookeeper安装包:

wget http://mirrors.shu.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz

2.2解压安装包并移动到/opt/apps目录下:

 tar  -zxvf  zookeeper-3.4.13.tar.gz

2.3将zookeeper-3.4.13/conf/zoo_sample.cfg复制一份并重命名为zoo.cfg:

cp /opt/zookeeper-3.4.13/conf/zoo_sample.cfg?/opt/zookeeper-3.4.13/conf/zoo.cfg

2.4修改zoo.cfg中的配置如下:

vi /opt/apps/zookeeper-3.4.13/conf/zoo.cfg
# The number of milliseconds of each tick
 
tickTime=2000
 
# The number of ticks that the initial
 
# synchronization phase can take
 
initLimit=10
 
# The number of ticks that can pass between
 
# sending a request and getting an acknowledgement
 
syncLimit=5
 
# the directory where the snapshot is stored.
 
# do not use /tmp for storage, /tmp here is just
 
# example sakes.
 
dataDir=/home/zookeeper/data
 
dataLogDir=/home/zookeeper/dataLog
 
server.1=192.168.133.3:2888:3888
 
server.2=192.168.133.4:2888:3888
 
server.3=192.168.133.5:2888:3888
 
# the port at which the clients will connect
 
clientPort=2181
 
# the maximum number of client connections.
 
# increase this if you need to handle more clients
 
#maxClientCnxns=60
 
#
 
# Be sure to read the maintenance section of the
 
# administrator guide before turning on autopurge.
 
#
 
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
 
#
 
# The number of snapshots to retain in dataDir
 
#autopurge.snapRetainCount=3
 
# Purge task interval in hours
 
# Set to "0" to disable auto purge feature


 
#autopurge.purgeInterval=1

说明:

1.dataDir 和 dataLogDir 需要在启动前创建完成
2.clientPort 为 zookeeper的服务端口
3.server.1、server.2 、server.3为zookeeper集群中三个node的信息,定义格式为 hostname:port1:port2,其中 port1 是 node 间通信使用的端口,port2 是node 选举使用的端口,需确保三台主机的这两个端口都是互通的。

2.5分别将三个节点的node编号写入dataDir目录的myid文件

节点一:

[root@server1 download]# echo "1" > /home/zookeeper/data/myid

节点二:

[root@server1 download]# echo "2" > /home/zookeeper/data/myid

节点三:

[root@server1 download]# echo "3" > /home/zookeeper/data/myid

2.6修改log4j的配置文件:

[root@server1 download]# vi /opt/apps/zookeeper-3.4.13/conf/log4j.properties
# Define some default values that can be overridden by system properties
zookeeper.root.logger=INFO, ROLLINGFILE
zookeeper.console.threshold=INFO
zookeeper.log.dir=/home/zookeeper/logs
zookeeper.log.file=zookeeper.log
zookeeper.log.threshold=DEBUG
zookeeper.tracelog.dir=/home/zookeeper/logs
zookeeper.tracelog.file=zookeeper_trace.log

2.7设置zookeeper环境变量:

vi /etc/profile

在/etc/profile文件最后添加以下内容:

export ZK_HOME=/opt/apps/zookeeper-3.4.13
export PATH=$PATH:$JAVA_HOME/bin:$ZK_HOME/bin

添加完成后保存退出并使之立即生效:

source /etc/profile

在防火墙中开启zookeeper使用的三个端口:

[root@server1 download]# firewall-cmd --zone=public --add-port=2181/tcp --permanent
[root@server1 download]# firewall-cmd --zone=public --add-port=2888/tcp --permanent
[root@server1 download]# firewall-cmd --zone=public --add-port=3888/tcp --permanent

重新加载防火墙配置:

[root@server1 download]# firewall-cmd --reload

在另外两个节点分别配置好zookeeper。

启动并验证zookeeper,分别在三个节点启动zookeeper:

[root@server1 download]# zkServer.sh start

启动完成后查看zookeeper状态,正常情况一个节点的Mode为leader,两个节点的Mode为follower

[root@server1 download]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: follower

3.安装kafka_2.12-2.1.1?

3.1下载安装包:

[root@server1 download]# wget http://mirrors.shu.edu.cn/apache/kafka/2.1.1/kafka_2.12-2.1.1.tgz

3.2解压安装包并移动到/opt/apps目录下:

[root@server1 download]# tar -zvxf kafka_2.12-2.1.1.tgz
[root@server1 download]# mv kafka_2.12-2.1.1 /opt/apps

3.3修改/opt/apps/kafka_2.12-2.1.1/config/server.properties配置文件:

[root@server1 download]# vi /opt/apps/kafka_2.12-2.1.1/config/server.properties

说明:下面的配置只列出里需要修改的项,没有列出的保持默认值

############################# Server Basics #############################
 
# The id of the broker. This must be set to a unique integer for each broker.
#指定broker的id
broker.id=1
 
############################# Socket Server Settings #############################
 
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.133.3:9092
 
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://192.168.133.3:9092
 
############################# Log Basics #############################
 
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka/logs
 
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3
 
############################# Log Retention Policy #############################
 
 
# The minimum age of a log file to be eligible for deletion due to age
#log.retention.hours=168
 
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
log.retention.bytes=268435456
 
############################# Zookeeper #############################
 
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
#指定zk的地址
zookeeper.connect=192.168.133.3:2181,192.168.133.4:2181,192.168.133.5:2181

3.4创建kafka日志目录

[root@server1 download]# mkdir -p /tmp/kafka/logs

3.5设置kafka环境变量

[root@server1 download]# vi /etc/profile

将之前在文最下方添加的内容修改如下:

export ZOOKEEPER_HOME=/opt/apps/zookeeper-3.4.14
export KAFKA_HOME=/opt/apps/kafka_2.12-2.1.0
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$KAFKA_HOME

修改防火墙,开放kafka通信端口:

[root@server1 download]# firewall-cmd --zone=public --add-port=9092/tcp --permanent
[root@server1 download]# firewall-cmd --reload

分别在两外两个节点安装并配置kafka,注意broker.id的值。

3.6分别在三个节点启动kafka:

kafka-server-start.sh -daemon /opt/apps/kafka_2.11-2.1.0/config/server.properties 
[root@server1.download]#kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties

创建一个拥有三个分区,三个副本,名为test的topic:

[root@server1 download]#  kafka-topics.sh --zookeeper linux01:2181,linux02:2181,linux03:2181 --create --topic test11 --replication-factor 3 --partitions 3

Created topic “test”.

3.7查看topic状态:

kafka-topics.sh --describe --zookeeper 192.168.133.3:2181,192.168.133.4:2181,192.168.133.5:2181 --topic test11
[root@server1 download]#kafka-topics.sh --describe --zookeeper 192.168.133.3:2181,192.168.133.4:2181,192.168.133.5:2181 --topic test11
Topic:test11    PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: test11   Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: test11   Partition: 1    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
        Topic: test11   Partition: 2    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2

启动命令行一个生产者

 kafka-console-producer.sh --broker-list linux01:9092,linux02:9092,linux03:9092 --topic test11 ----group test-consumer-group

启动一个命令行消费者

kafka-console-consumer.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --topic test11 --from-beginning

启动消费者组消费主题数据

 kafka-console-consumer.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --topic test11 --group test-consumer-group

消费者提交偏移量消费

kafka-console-consumer.sh --bootstrap-server linux01:9092,linux02:9092,linux03:9092 --topic test11 --execute --reset-offsets --group test-consumer-group
 类似资料: