之前一直使用zookeeper来让kafka之间通信,但是zookeeper老挂,所以就想换种kafka自带的维护方式来通信
准备:
三台服务器:我买的是三台腾讯云,而且是不同机房的腾讯云,所以要在各个机器的安全组中设置所有端口对对方开放才能进行通信;
kafka安装包:版本为kafka_2.12-3.0.0.tgz,下载去官网:Apache Kafka
第一步:
先在holden001中解压:
tar -zxvf /opt/software/kafka_2.12-3.0.0.tgz -C /opt/module
再进入到kafka/config/kraft的路径下修改server.properties文件:
# 此台机器扮演的角色,这里我设置了它既是broker也是controller
process.roles=broker,controller
# 机器的唯一标识号
node.id=1
# controller之间交互的端口,这里我有三台机器,注意我这里直接写机器名是因为我在hosts中做了ip地址映射
controller.quorum.voters=1@holden001:9093,2@holden002:9093,3@holden003:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
# 这里要将目录改到不会被删除的目录下,因为默认是tmp目录,会被主机清空的
log.dirs=/opt/module/kafka/kraft-combined-logs
# 这里要写,因为如果外部环境要调用api的话,是从这个端口进来,这个9092端口也要在安全组开放
advertised.listeners=PLAINTEXT://你主机的公网:9092
# 默认分区个数
num.partitions=1
# 日志过期时间,我设置为一天
log.retention.hours=24
第二步:
将kafka整个文件分发到其余两个机器,注意需要为其他两个机器修改node.id和advertised.listeners。
第三步:
执行kafka自带脚本生成uuid,记住这个uuid,因为这三台主机共用这个uuid,此时才能组成一个集群,若各用各的uuid,那就是三台不同kafka机器。
bin/kafka-storage.sh random-uuid
然后再三台机器分别执行这条语句,记住UUID要相同;
$ bin/kafka-storage.sh format -t 你的UUID -c config/kraft/server.properties
第四步:
然后群起kafka,使用我的这个脚本可以比较省事:
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input..."
exit ;
fi
case $1 in
"start")
for i in holden001 holden002 holden003
do
echo "==========start kafka $i =========="
ssh $i '/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/kraft/server.properties'
done
;;
"stop")
for i in holden001 holden002 holden003
do
echo "===========stop kafka $i============"
ssh $i '/opt/module/kafka/bin/kafka-server-stop.sh stop'
done
;;
*)
echo "Args Error"
;;
esac
执行jpsall,若看到三台机器的进程有kafka运行了就说明成功了:
若想测试可以使用我提供的这个脚本:
#!/bin/bash
if [ $1 = 'list' ];then
/opt/module/kafka/bin/kafka-topics.sh --bootstrap-server $(hostname):9092 --list
elif [ $1 = 'create' ];then
/opt/module/kafka/bin/kafka-topics.sh --bootstrap-server holden001:9092 --create --topic $2 --partitions $3 --replication-factor $4
elif [ $1 = 'deletetopic' ];then
/opt/module/kafka/bin/kafka-topics.sh --bootstrap-server holden001:9092 -delete --topic $2
elif [ $1 = 'describe' ];then
/opt/module/kafka/bin/kafka-topics.sh --bootstrap-server holden001:9092 --describe --topic $2
elif [ $1 = 'alter' ];then
/opt/module/kafka/bin/kafka-topics.sh --alter --bootstrap-server $(hostname):2181 --topic $2 --partitions $3
elif [ $1 = 'send' ];then
/opt/module/kafka/bin/kafka-console-producer.sh --topic $2 --broker-list $(hostname):9092
elif [ $1 = 'receive' ];then
/opt/module/kafka/bin/kafka-console-consumer.sh --topic $2 --bootstrap-server $(hostname):9092
elif [ $1 = 'receive-begin' ];then
/opt/module/kafka/bin/kafka-console-consumer.sh --topic $2 --bootstrap-server $(hostname):9092 --from-beginning
elif [ $1 = 'group' ];then
/opt/module/kafka/bin/kafka-console-consumer.sh --topic $2 --bootstrap-server hadoop102:9092 --consumer-property group.id=$3
elif [ $1 = 'listgroup' ];then
/opt/module/kafka/bin/kafka-consumer-groups.sh --bootstrap-server $(hostname):9092 --list
elif [ $1 = 'descgroup' ];then
/opt/module/kafka/bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --describe --group $2
elif [ $1 = 'deletegroup' ];then
/opt/module/kafka/bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --delete --group $2
else
echo 'no happended'
fi