本文参考了网上部分文章,并进行了说明和归纳,详情请查看参考。
官方文档参考:https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools
kafka环境中机器磁盘告警很容易出现,原因可能是某一个topic
的partition
为1(或者partition不足导致的)
,只往一台机器上写数据,造成kafka集群空间使用不均。
下面主要使用kafka-topics.sh
和kafka-reassign-partitions.sh
来解决问题。
推荐使用kafka manager来管理kafka集群。
而kafka的partition离不开
Controller 的监听,先介绍一下Controlle。
Controller 在初始化时,会利用 ZK 的 watch 机制注册很多不同类型的监听器,当监听的事件被触发时,Controller 就会触发相应的操作。
Controller 在初始化时,会注册多种类型的监听器,主要有以下6种:
/admin/reassign_partitions
节点,用于分区副本迁移的监听;/isr_change_notification
节点,用于 Partition Isr 变动的监听,;/admin/preferred_replica_election
节点,用于需要进行 Partition 最优 leader 选举的监听;/brokers/topics
节点,用于 Topic 新建的监听;/brokers/topics/TOPIC_NAME
节点,用于 Topic Partition 扩容的监听;/admin/delete_topics
节点,用于 Topic 删除的监听;/brokers/ids
节点,用于 Broker 上下线的监听。本文主要讲解第一部分,也就是 Controller 对 Partition 副本迁移的处理。
./bin/kafka-topics.sh --zookeeper vlnx111122:2181 --alter --topic test --partitions 6
此命令执行完之后即可再kafka集群其他机器中找到此topic的目录
只要配置zookeeper.connect为要加入的集群,再启动Kafka进程,就可以让新的机器加入到Kafka集群。但是新的机器只针对新的Topic才会起作用,在之前就已经存在的Topic的分区,不会自动的分配到新增加的物理机中。
为了使新增加的机器可以分担系统压力,必须进行消息数据迁移。Kafka提供了kafka-reassign-partitions.sh
进行数据迁移。
这个脚本提供3个命令:
--generate
: 根据给予的Topic列表和Broker列表生成迁移计划。generate并不会真正进行消息迁移,而是将消息迁移计划计算出来,供execute命令使用。--execute
: 根据给予的消息迁移计划进行迁移。--verify
: 检查消息是否已经迁移完成。Partition 的副本迁移实际上就是将分区的副本重新分配到不同的代理节点上,如果 zk 中新副本的集合与 Partition 原来的副本集合相同,那么这个副本就不需要重新分配了。
Partition 的副本迁移是通过监听 zk 的 /admin/reassign_partitions
节点触发的,Kafka 也向用户提供相应的脚本工具进行副本迁移,副本迁移的脚本使用方法如下所示:
./bin/kafka-reassign-partitions.sh --zookeeper XXX --reassignment-json-file XXX.json --execute
其中 XXX.json 为要进行 Partition 副本迁移的 json 文件,json 文件的格式如下所示:
{
"version":1,
"partitions":[
{
"topic":"__consumer_offsets",
"partition":19,
"replicas":[
3,
9,
2
]
},
{
"topic":"__consumer_offsets",
"partition":26,
"replicas":[
2,
6,
4
]
},
{
"topic":"__consumer_offsets",
"partition":27,
"replicas":[
5,
3,
8
]
}
]
}
这个 json 文件的意思是将 Topic __consumer_offsets
Partition 19 的副本迁移到 {3, 2, 9} 上,Partition 26 的副本迁移到 {6, 2, 4} 上,Partition 27 的副本迁移到 {5, 3, 8} 上。
包括配置broker id等
~/kafka/bin/kafka-server-start.sh -daemon ~/kafka/config/server.properties
停止命令:~/kafka/bin/kafka-server-stop.sh
topic.json
topic为test
目前在broker id为1,2,3的机器上,现又添加了两台机器,broker id为4,5,现在想要将压力平均分散到这5台机器上。
手动生成一个json文件topic.json
{
"topics": [
{"topic": "test"}
],
"version": 1
}
--generate
生成迁移计划,将test
扩充到所有机器上
./bin/kafka-reassign-partitions.sh --zookeeper vlnx111122:2181 --topics-to-move-json-file topic.json --broker-list "1,2,3,4,5" --generate
生成类似于下方的结果
Current partition replica assignment
{"version":1,
"partitions":[....]
}
Proposed partition reassignment configuration
{"version":1,
"partitions":[.....]
}
Current partition replica assignment
表示当前的消息存储状况。
Proposed partition reassignment configuration
表示迁移后的消息存储状况。
那么我们就用下部分的json进行迁移。另外,也可以自己构造个类似的json文件,同样可以进行迁移。
将迁移后的json存入一个文件reassignment.json
,供--execute
命令使用。
--execute
进行扩容。./bin/kafka-reassign-partitions.sh --zookeeper vlnx111122:2181 --reassignment-json-file reassignment.json --execute
Current partition replica assignment
...
Save this to use as the --reassignment-json-file option during rollback
...
--verify
查看进度./bin/kafka-reassign-partitions.sh --zookeeper vlnx111122:2181 --reassignment-json-file reassignment.json --verify
分区由原来一个变成了三个,并且集群里面的所有kafka,data目录都会产生相应的文件夹
注意:0,1,2是kafka配置文件中的broker.id,在这里只是生成配置文件,并没有真正均衡
将2中,生成的json,copy到一个json文件中,执行以下命令
注意:到这里均衡已结束,但是对已有的数据,并不能起到均衡作用。
broker会watch "/admin/reassign_partitions"节点。当发现有迁移任务的时候,会将partiton的AR进行扩展,例如原先partiton的AR是[1, 2], 现在要迁移到[2, 3],那么partiton会先将AR扩展到[1, 2, 3],并监控ISR的变化。
当replica-2和replica-3都跟上后,即在ISR中的时候,表明新的repica-3已经和leader数据同步了。这个时候就可以将replica-1剔除了,最后得到迁移结果是[2, 3]。即迁移是一个先增加再减少的过程。
Assigned replicas (0,1) don't match the list of replicas for reassignment (1,0) for partition [testTopic,0] Reassignment of partition [testTopic,0] failed
在2.1.2中已经说明了,该错误是由于"/admin/reassign_partitions"节点已经被删除了,但是AR和目标迁移列表不相同报的错,一般需要看下controller的日志,看下controller在迁移过程中是不是抛出了异常。
迁移需要等目标迁移列表中的replic都跟上了leader才能完成,目前迁移列表一直跟不上,那么就不会完成。可以看下zk中“/brokers/topics/{topic}/partitions/{partiton}/state”,注意下目标迁移列表是不是在isr中,如果不在说明要迁移的replic还没有完成从leader拉取数据。具体为甚么没有拉取成功,可能是数据量比较大,拉取需要一定的时间;也可能是其他原因比如集群宕机了等,需要具体分析下
Kafka提供一个broker之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上限。当重新平衡集群,引导新broker,添加或移除broker时候,这是很有用的。因为它限制了这些密集型的数据操作从而保障了对用户的影响。
有2个接口可以实现限制。最简单和最安全的是调用kafka-reassign-partitions.sh时加限制。另外kafka-configs.sh也可以直接查看和修改限制值。
例如,当执行重新平衡时,用下面的命令,它在移动分区时,将不会超过50MB/s(--throttle 50000000)。
$ bin/kafka-reassign-partitions.sh --zookeeper myhost:2181--execute --reassignment-json-file bigger-cluster.json --throttle 50000000
当你运行这个脚本,你会看到这个限制:
The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.
如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行execute命令,用相同的reassignment-json-file:
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file bigger-cluster.json --throttle 700000000
There is an existing assignment running.
The throttle limit was set to 700000000 B/s
一旦重新平衡完成,可以使用--verify操作验证重新平衡的状态。如果重新平衡已经完成,限制也会通过--verify命令移除。这点很重要,因为一旦重新平衡完成,并通过--veriry操作及时移除限制。否则可能会导致定期复制操作的流量也受到限制。
当--verify执行,并且重新分配已完成时,此脚本将确认限制被移除:
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --verify --reassignment-json-file bigger-cluster.json
Status of partition reassignment:
Reassignment of partition [my-topic,1] completed successfully
Reassignment of partition [mytopic,0] completed successfully
Throttle was removed.
管理员还可以使用kafka-configs.sh验证已分配的配置。有2对限制配置用于管理限流。而限制值本身,是个broker级别的配置,用于动态属性配置:
leader.replication.throttled.rate
follower.replication.throttled.rate
此外,还有枚举集合的限流副本:
leader.replication.throttled.replicas
follower.replication.throttled.replicas
其中每个topic配置,所有4个配置值通过kafka-reassign-partitions.sh(下面讨论)自动分配。
查看限流配置:
$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers
Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
这显示了应用于复制协议的leader和follower的限制。默认情况下,2个都分配了相同的限制值。
要查看限流副本的列表:
$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics
Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
follower.replication.throttled.replicas=1:101,0:102
这里我们看到leader限制被应用到broker 102上的分区1和broker 101的分区0.同样,follower限制应用到broker 101的分区1和broker 102的分区0.
默认情况下,kafka-reassign-partitions.sh会将leader限制应用于重新平衡前存在的所有副本,任何一个副本都可能是leader。它将应用follower限制到所有移动目的地。因此,如果broker 101,102上有一个副本分区,被分配给102,103,则该分区的leader限制,将被应用到101,102,并且follower限制将仅被应用于103。
如果需要,你还可以使用kafka-configs.sh的--alter开关手动地更改限制配置。
./bin/kafka-console-producer.sh --broker-list vlnx111111:9092 --topic test
./bin/kafka-console-consumer.sh --zookeeper vlnx111122:2181 --topic test --from-beginning
./bin/kafka-topics.sh --zookeeper vlnx111122:2181 --list
./bin/kafka-topics.sh --zookeeper vlnx111122:2181 --create --replication-factor 2 --partition 6 --topic test
./bin/kafka-topics.sh --zookeeper vlnx111122:2181 --delete --topic test
./bin/kafka-topics.sh --zookeeper vlnx111122:2181 --describe --topic test
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools
https://wzktravel.github.io/2015/12/31/kafka-reassign/
http://blog.51yip.com/hadoop/2131.html
https://www.cnblogs.com/set-cookie/p/9614241.html
https://matt33.com/2018/06/16/partition-reassignment/