kafka集群磁盘满警告,新增kafka节点,重新分配partition

姬天逸
2023-12-01

本文参考了网上部分文章,并进行了说明和归纳,详情请查看参考。

官方文档参考:https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools

      kafka环境中机器磁盘告警很容易出现,原因可能是某一个topicpartition为1(或者partition不足导致的),只往一台机器上写数据,造成kafka集群空间使用不均。

下面主要使用kafka-topics.shkafka-reassign-partitions.sh来解决问题。

推荐使用kafka manager来管理kafka集群。

而kafka的partition离不开Controller 的监听,先介绍一下Controlle。

1 Kafka Controller 作用

Controller 在初始化时,会利用 ZK 的 watch 机制注册很多不同类型的监听器,当监听的事件被触发时,Controller 就会触发相应的操作。

Controller 在初始化时,会注册多种类型的监听器,主要有以下6种:

  1. 监听 /admin/reassign_partitions 节点,用于分区副本迁移的监听;
  2. 监听 /isr_change_notification 节点,用于 Partition Isr 变动的监听,;
  3. 监听 /admin/preferred_replica_election 节点,用于需要进行 Partition 最优 leader 选举的监听;
  4. 监听 /brokers/topics 节点,用于 Topic 新建的监听;
  5. 监听 /brokers/topics/TOPIC_NAME 节点,用于 Topic Partition 扩容的监听;
  6. 监听 /admin/delete_topics 节点,用于 Topic 删除的监听;
  7. 监听 /brokers/ids 节点,用于 Broker 上下线的监听。

    本文主要讲解第一部分,也就是 Controller 对 Partition 副本迁移的处理。

2 操作

2.1 修改topic的partitions

./bin/kafka-topics.sh --zookeeper vlnx111122:2181 --alter --topic test --partitions 6

此命令执行完之后即可再kafka集群其他机器中找到此topic的目录

2.2 扩容、删除机器

      只要配置zookeeper.connect为要加入的集群,再启动Kafka进程,就可以让新的机器加入到Kafka集群。但是新的机器只针对新的Topic才会起作用,在之前就已经存在的Topic的分区,不会自动的分配到新增加的物理机中。

      为了使新增加的机器可以分担系统压力,必须进行消息数据迁移。Kafka提供了kafka-reassign-partitions.sh进行数据迁移。

这个脚本提供3个命令:

  • --generate: 根据给予的Topic列表和Broker列表生成迁移计划。generate并不会真正进行消息迁移,而是将消息迁移计划计算出来,供execute命令使用。
  • --execute: 根据给予的消息迁移计划进行迁移。
  • --verify: 检查消息是否已经迁移完成。

2.3 Partition 副本迁移整体流程

Partition 的副本迁移实际上就是将分区的副本重新分配到不同的代理节点上,如果 zk 中新副本的集合与 Partition 原来的副本集合相同,那么这个副本就不需要重新分配了。

Partition 的副本迁移是通过监听 zk 的 /admin/reassign_partitions 节点触发的,Kafka 也向用户提供相应的脚本工具进行副本迁移,副本迁移的脚本使用方法如下所示:

./bin/kafka-reassign-partitions.sh --zookeeper XXX --reassignment-json-file XXX.json --execute

其中 XXX.json 为要进行 Partition 副本迁移的 json 文件,json 文件的格式如下所示:

{
    "version":1,
    "partitions":[
        {
            "topic":"__consumer_offsets",
            "partition":19,
            "replicas":[
                3,
                9,
                2
            ]
        },
        {
            "topic":"__consumer_offsets",
            "partition":26,
            "replicas":[
                2,
                6,
                4
            ]
        },
        {
            "topic":"__consumer_offsets",
            "partition":27,
            "replicas":[
                5,
                3,
                8
            ]
        }
    ]
}

这个 json 文件的意思是将 Topic __consumer_offsets Partition 19 的副本迁移到 {3, 2, 9} 上,Partition 26 的副本迁移到 {6, 2, 4} 上,Partition 27 的副本迁移到 {5, 3, 8} 上。

 

3. 示例

3.1 实例1

3.1.1 配置Kafka的server.properties

包括配置broker id等

3.1.2 启动新broker

  ~/kafka/bin/kafka-server-start.sh -daemon ~/kafka/config/server.properties

停止命令:~/kafka/bin/kafka-server-stop.sh

3.1.3 手动生成一个json文件topic.json

topic为test目前在broker id为1,2,3的机器上,现又添加了两台机器,broker id为4,5,现在想要将压力平均分散到这5台机器上。

手动生成一个json文件topic.json

{
"topics": [
    {"topic": "test"}
],
"version": 1
}

3.1.4 调用--generate生成迁移计划

,将test扩充到所有机器上

./bin/kafka-reassign-partitions.sh --zookeeper vlnx111122:2181 --topics-to-move-json-file topic.json --broker-list "1,2,3,4,5" --generate

生成类似于下方的结果

Current partition replica assignment

{"version":1,
  "partitions":[....]
}

Proposed partition reassignment configuration

{"version":1,
"partitions":[.....]
}

Current partition replica assignment表示当前的消息存储状况。

Proposed partition reassignment configuration表示迁移后的消息存储状况。

那么我们就用下部分的json进行迁移。另外,也可以自己构造个类似的json文件,同样可以进行迁移。

将迁移后的json存入一个文件reassignment.json,供--execute命令使用。

3.1.5 执行--execute进行扩容。

./bin/kafka-reassign-partitions.sh --zookeeper vlnx111122:2181 --reassignment-json-file reassignment.json --execute

Current partition replica assignment
...

Save this to use as the --reassignment-json-file option during rollback
...

3.1.6 使用--verify查看进度

./bin/kafka-reassign-partitions.sh --zookeeper vlnx111122:2181 --reassignment-json-file reassignment.json --verify

 

3.2 实例2

3.2.1 修改topic partitions数量

  1. [root@bigserver2 kafka]# ./bin/kafka-topics.sh --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --alter --topic track_pc --partitions 3  
  2. WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected  
  3. Adding partitions succeeded!  
  4.   
  5. [root@bigserver2 kafka]# bin/kafka-topics.sh --describe --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --topic track_pc  
  6. Topic:track_pc PartitionCount:3 ReplicationFactor:1 Configs:  
  7.  Topic: track_pc Partition: 0 Leader: 1 Replicas: 1 Isr: 1  
  8.  Topic: track_pc Partition: 1 Leader: 2 Replicas: 2 Isr: 2  
  9.  Topic: track_pc Partition: 2 Leader: 0 Replicas: 0 Isr: 0  

分区由原来一个变成了三个,并且集群里面的所有kafka,data目录都会产生相应的文件夹

3.2.2 生成均衡配置文件

  1. [root@bigserver2 kafka]# cat track_pc.json //创建配置文件  
  2. {  
  3.     "topics": [  
  4.         {"topic": "track_pc"}  
  5.     ],  
  6.     "version": 1  
  7. }  
  8.   
  9. [root@bigserver2 kafka]# bin/kafka-reassign-partitions.sh --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --topics-to-move-json-file track_pc.json --broker-list "0,1,2" --generate  
  10. Current partition replica assignment  
  11. {"version":1,"partitions":[{"topic":"track_pc","partition":0,"replicas":[1],"log_dirs":["any"]},{"topic":"track_pc","partition":2,"replicas":[0],"log_dirs":["any"]},{"topic":"track_pc","partition":1,"replicas":[2],"log_dirs":["any"]}]}  
  12.   
  13. Proposed partition reassignment configuration  
  14. {"version":1,"partitions":[{"topic":"track_pc","partition":0,"replicas":[0],"log_dirs":["any"]},{"topic":"track_pc","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"track_pc","partition":1,"replicas":[1],"log_dirs":["any"]}]}  

注意:0,1,2是kafka配置文件中的broker.id,在这里只是生成配置文件,并没有真正均衡

3.2.3  均衡partitions

将2中,生成的json,copy到一个json文件中,执行以下命令

  1. bin/kafka-reassign-partitions.sh --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --reassignment-json-file exec.json --execute  
  2. Current partition replica assignment  
  3.   
  4. {"version":1,"partitions":[{"topic":"track_pc","partition":0,"replicas":[1],"log_dirs":["any"]},{"topic":"track_pc","partition":2,"replicas":[0],"log_dirs":["any"]},{"topic":"track_pc","partition":1,"replicas":[2],"log_dirs":["any"]}]}  
  5.   
  6. Save this to use as the --reassignment-json-file option during rollback  
  7. Successfully started reassignment of partitions.  
  8.   
  9. [root@bigserver2 kafka]# bin/kafka-reassign-partitions.sh --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --reassignment-json-file exec.json --verify  
  10. Status of partition reassignment:  
  11. Reassignment of partition track_pc-0 is still in progress  
  12. Reassignment of partition track_pc-2 is still in progress  
  13. Reassignment of partition track_pc-1 is still in progress  
  14.   
  15. [root@bigserver2 kafka]# bin/kafka-reassign-partitions.sh --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --reassignment-json-file exec.json --verify  
  16. Status of partition reassignment:  
  17. Reassignment of partition track_pc-0 completed successfully  
  18. Reassignment of partition track_pc-2 completed successfully  
  19. Reassignment of partition track_pc-1 completed successfully  

注意:到这里均衡已结束,但是对已有的数据,并不能起到均衡作用。

 

4. broker 处理迁移的思路总结

      broker会watch "/admin/reassign_partitions"节点。当发现有迁移任务的时候,会将partiton的AR进行扩展,例如原先partiton的AR是[1, 2], 现在要迁移到[2, 3],那么partiton会先将AR扩展到[1, 2, 3],并监控ISR的变化。

      当replica-2和replica-3都跟上后,即在ISR中的时候,表明新的repica-3已经和leader数据同步了。这个时候就可以将replica-1剔除了,最后得到迁移结果是[2, 3]。即迁移是一个先增加再减少的过程。

5. 可能遇到的问题

5.1 报错

Assigned replicas (0,1) don't match the list of replicas for reassignment (1,0) for partition [testTopic,0] Reassignment of partition [testTopic,0] failed 

      在2.1.2中已经说明了,该错误是由于"/admin/reassign_partitions"节点已经被删除了,但是AR和目标迁移列表不相同报的错,一般需要看下controller的日志,看下controller在迁移过程中是不是抛出了异常。

5.2 迁移一直在进行中,不能完成

      迁移需要等目标迁移列表中的replic都跟上了leader才能完成,目前迁移列表一直跟不上,那么就不会完成。可以看下zk中“/brokers/topics/{topic}/partitions/{partiton}/state”,注意下目标迁移列表是不是在isr中,如果不在说明要迁移的replic还没有完成从leader拉取数据。具体为甚么没有拉取成功,可能是数据量比较大,拉取需要一定的时间;也可能是其他原因比如集群宕机了等,需要具体分析下

 

6.平衡集群时限制带宽,保障对用户的影响

      Kafka提供一个broker之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上限。当重新平衡集群,引导新broker,添加或移除broker时候,这是很有用的。因为它限制了这些密集型的数据操作从而保障了对用户的影响。

      有2个接口可以实现限制。最简单和最安全的是调用kafka-reassign-partitions.sh时加限制。另外kafka-configs.sh也可以直接查看和修改限制值。

例如,当执行重新平衡时,用下面的命令,它在移动分区时,将不会超过50MB/s(--throttle 50000000)。

$ bin/kafka-reassign-partitions.sh --zookeeper myhost:2181--execute --reassignment-json-file bigger-cluster.json --throttle 50000000

当你运行这个脚本,你会看到这个限制:

The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.

如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行execute命令,用相同的reassignment-json-file:

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --execute --reassignment-json-file bigger-cluster.json --throttle 700000000

  There is an existing assignment running.
  The throttle limit was set to 700000000 B/s

      一旦重新平衡完成,可以使用--verify操作验证重新平衡的状态。如果重新平衡已经完成,限制也会通过--verify命令移除。这点很重要,因为一旦重新平衡完成,并通过--veriry操作及时移除限制。否则可能会导致定期复制操作的流量也受到限制。

当--verify执行,并且重新分配已完成时,此脚本将确认限制被移除:

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --verify --reassignment-json-file bigger-cluster.json
  Status of partition reassignment:
  Reassignment of partition [my-topic,1] completed successfully
  Reassignment of partition [mytopic,0] completed successfully
  Throttle was removed.

      管理员还可以使用kafka-configs.sh验证已分配的配置。有2对限制配置用于管理限流。而限制值本身,是个broker级别的配置,用于动态属性配置:

leader.replication.throttled.rate
  follower.replication.throttled.rate

此外,还有枚举集合的限流副本:

leader.replication.throttled.replicas
  follower.replication.throttled.replicas

其中每个topic配置,所有4个配置值通过kafka-reassign-partitions.sh(下面讨论)自动分配。

查看限流配置:

$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers
  Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
  Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000

这显示了应用于复制协议的leader和follower的限制。默认情况下,2个都分配了相同的限制值。

要查看限流副本的列表:

$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics
  Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
      follower.replication.throttled.replicas=1:101,0:102

   这里我们看到leader限制被应用到broker 102上的分区1和broker 101的分区0.同样,follower限制应用到broker 101的分区1和broker 102的分区0.

      默认情况下,kafka-reassign-partitions.sh会将leader限制应用于重新平衡前存在的所有副本,任何一个副本都可能是leader。它将应用follower限制到所有移动目的地。因此,如果broker 101,102上有一个副本分区,被分配给102,103,则该分区的leader限制,将被应用到101,102,并且follower限制将仅被应用于103。

如果需要,你还可以使用kafka-configs.sh的--alter开关手动地更改限制配置。

 

7. 相关命令

  1. ./bin/kafka-console-producer.sh --broker-list vlnx111111:9092 --topic test
  2. ./bin/kafka-console-consumer.sh --zookeeper vlnx111122:2181 --topic test --from-beginning
  3. ./bin/kafka-topics.sh --zookeeper vlnx111122:2181 --list
  4. ./bin/kafka-topics.sh --zookeeper vlnx111122:2181 --create --replication-factor 2 --partition 6 --topic test
  5. ./bin/kafka-topics.sh --zookeeper vlnx111122:2181 --delete --topic test
  6. ./bin/kafka-topics.sh --zookeeper vlnx111122:2181 --describe --topic test

8 参考:

https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools

https://wzktravel.github.io/2015/12/31/kafka-reassign/

http://blog.51yip.com/hadoop/2131.html

https://www.cnblogs.com/set-cookie/p/9614241.html

https://matt33.com/2018/06/16/partition-reassignment/

https://www.orchome.com/510

 

 类似资料: